diff --git a/litellm/litellm_core_utils/streaming_handler.py b/litellm/litellm_core_utils/streaming_handler.py index 4d8e109d88..a7f460fab5 100644 --- a/litellm/litellm_core_utils/streaming_handler.py +++ b/litellm/litellm_core_utils/streaming_handler.py @@ -96,9 +96,9 @@ class CustomStreamWrapper: self.system_fingerprint: Optional[str] = None self.received_finish_reason: Optional[str] = None - self.intermittent_finish_reason: Optional[str] = ( - None # finish reasons that show up mid-stream - ) + self.intermittent_finish_reason: Optional[ + str + ] = None # finish reasons that show up mid-stream self.special_tokens = [ "<|assistant|>", "<|system|>", @@ -735,7 +735,7 @@ class CustomStreamWrapper: and completion_obj["function_call"] is not None ) or ( - "tool_calls" in model_response.choices[0].delta + "tool_calls" in model_response.choices[0].delta and model_response.choices[0].delta["tool_calls"] is not None ) or ( @@ -889,7 +889,6 @@ class CustomStreamWrapper: ## check if openai/azure chunk original_chunk = response_obj.get("original_chunk", None) if original_chunk: - if len(original_chunk.choices) > 0: choices = [] for choice in original_chunk.choices: @@ -906,7 +905,6 @@ class CustomStreamWrapper: print_verbose(f"choices in streaming: {choices}") setattr(model_response, "choices", choices) else: - return model_response.system_fingerprint = ( original_chunk.system_fingerprint @@ -1435,9 +1433,9 @@ class CustomStreamWrapper: _json_delta = delta.model_dump() print_verbose(f"_json_delta: {_json_delta}") if "role" not in _json_delta or _json_delta["role"] is None: - _json_delta["role"] = ( - "assistant" # mistral's api returns role as None - ) + _json_delta[ + "role" + ] = "assistant" # mistral's api returns role as None if "tool_calls" in _json_delta and isinstance( _json_delta["tool_calls"], list ): @@ -1533,7 +1531,7 @@ class CustomStreamWrapper: async def _call_post_streaming_deployment_hook(self, chunk): """ Call the post-call streaming deployment hook for callbacks. - + This allows callbacks to modify streaming chunks before they're returned. """ try: @@ -1544,15 +1542,17 @@ class CustomStreamWrapper: # Get request kwargs from logging object request_data = self.logging_obj.model_call_details call_type_str = self.logging_obj.call_type - + try: typed_call_type = CallTypes(call_type_str) except ValueError: typed_call_type = None - + # Call hooks for all callbacks for callback in litellm.callbacks: - if isinstance(callback, CustomLogger) and hasattr(callback, "async_post_call_streaming_deployment_hook"): + if isinstance(callback, CustomLogger) and hasattr( + callback, "async_post_call_streaming_deployment_hook" + ): result = await callback.async_post_call_streaming_deployment_hook( request_data=request_data, response_chunk=chunk, @@ -1560,11 +1560,14 @@ class CustomStreamWrapper: ) if result is not None: chunk = result - + return chunk except Exception as e: from litellm._logging import verbose_logger - verbose_logger.exception(f"Error in post-call streaming deployment hook: {str(e)}") + + verbose_logger.exception( + f"Error in post-call streaming deployment hook: {str(e)}" + ) return chunk def cache_streaming_response(self, processed_chunk, cache_hit: bool): @@ -1687,7 +1690,7 @@ class CustomStreamWrapper: response, "usage" ): # remove usage from chunk, only send on final chunk # Convert the object to a dictionary - obj_dict = response.dict() + obj_dict = response.model_dump() # Remove an attribute (e.g., 'attr2') if "usage" in obj_dict: @@ -1852,7 +1855,7 @@ class CustomStreamWrapper: processed_chunk, "usage" ): # remove usage from chunk, only send on final chunk # Convert the object to a dictionary - obj_dict = processed_chunk.dict() + obj_dict = processed_chunk.model_dump() # Remove an attribute (e.g., 'attr2') if "usage" in obj_dict: @@ -1872,11 +1875,15 @@ class CustomStreamWrapper: if self.sent_last_chunk is True and self.stream_options is None: usage = calculate_total_usage(chunks=self.chunks) processed_chunk._hidden_params["usage"] = usage - + # Call post-call streaming deployment hook for final chunk if self.sent_last_chunk is True: - processed_chunk = await self._call_post_streaming_deployment_hook(processed_chunk) - + processed_chunk = ( + await self._call_post_streaming_deployment_hook( + processed_chunk + ) + ) + return processed_chunk raise StopAsyncIteration else: # temporary patch for non-aiohttp async calls @@ -1890,9 +1897,9 @@ class CustomStreamWrapper: chunk = next(self.completion_stream) if chunk is not None and chunk != b"": print_verbose(f"PROCESSED CHUNK PRE CHUNK CREATOR: {chunk}") - processed_chunk: Optional[ModelResponseStream] = ( - self.chunk_creator(chunk=chunk) - ) + processed_chunk: Optional[ + ModelResponseStream + ] = self.chunk_creator(chunk=chunk) print_verbose( f"PROCESSED CHUNK POST CHUNK CREATOR: {processed_chunk}" )