diff --git a/litellm/constants.py b/litellm/constants.py index c5da7770e8..c8248f548a 100644 --- a/litellm/constants.py +++ b/litellm/constants.py @@ -26,6 +26,7 @@ REDIS_UPDATE_BUFFER_KEY = "litellm_spend_update_buffer" REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_spend_update_buffer" MAX_REDIS_BUFFER_DEQUEUE_COUNT = 100 MAX_SIZE_IN_MEMORY_QUEUE = 10000 +MAX_IN_MEMORY_QUEUE_FLUSH_COUNT = 1000 ############################################################################################### MINIMUM_PROMPT_CACHE_TOKEN_COUNT = ( 1024 # minimum number of tokens to cache a prompt by Anthropic diff --git a/litellm/proxy/db/db_transaction_queue/base_update_queue.py b/litellm/proxy/db/db_transaction_queue/base_update_queue.py index 11a20c4804..234b8eff8a 100644 --- a/litellm/proxy/db/db_transaction_queue/base_update_queue.py +++ b/litellm/proxy/db/db_transaction_queue/base_update_queue.py @@ -4,7 +4,7 @@ Base class for in memory buffer for database transactions import asyncio from litellm._logging import verbose_proxy_logger -from litellm.constants import MAX_SIZE_IN_MEMORY_QUEUE +from litellm.constants import MAX_IN_MEMORY_QUEUE_FLUSH_COUNT, MAX_SIZE_IN_MEMORY_QUEUE class BaseUpdateQueue: @@ -23,5 +23,11 @@ class BaseUpdateQueue: """Get all updates from the queue.""" updates = [] while not self.update_queue.empty(): + if len(updates) >= MAX_IN_MEMORY_QUEUE_FLUSH_COUNT: + # circuit breaker to ensure we're not stuck dequeuing updates + verbose_proxy_logger.warning( + "Max in memory queue flush count reached, stopping flush" + ) + break updates.append(await self.update_queue.get()) return updates