fix: replace deprecated .dict() with .model_dump() in streaming_handler (#17359)

Replace Pydantic v1 `.dict()` method with v2 `.model_dump()` to fix
PydanticDeprecatedSince20 warnings. The `.dict()` method is deprecated
in Pydantic v2 and will be removed in v3.

Fixes #5987
This commit is contained in:
Jonathan Yang
2025-12-03 07:12:55 +01:00
committed by GitHub
parent 17faea96bb
commit 43dd9e4a90
+30 -23
View File
@@ -96,9 +96,9 @@ class CustomStreamWrapper:
self.system_fingerprint: Optional[str] = None
self.received_finish_reason: Optional[str] = None
self.intermittent_finish_reason: Optional[str] = (
None # finish reasons that show up mid-stream
)
self.intermittent_finish_reason: Optional[
str
] = None # finish reasons that show up mid-stream
self.special_tokens = [
"<|assistant|>",
"<|system|>",
@@ -735,7 +735,7 @@ class CustomStreamWrapper:
and completion_obj["function_call"] is not None
)
or (
"tool_calls" in model_response.choices[0].delta
"tool_calls" in model_response.choices[0].delta
and model_response.choices[0].delta["tool_calls"] is not None
)
or (
@@ -889,7 +889,6 @@ class CustomStreamWrapper:
## check if openai/azure chunk
original_chunk = response_obj.get("original_chunk", None)
if original_chunk:
if len(original_chunk.choices) > 0:
choices = []
for choice in original_chunk.choices:
@@ -906,7 +905,6 @@ class CustomStreamWrapper:
print_verbose(f"choices in streaming: {choices}")
setattr(model_response, "choices", choices)
else:
return
model_response.system_fingerprint = (
original_chunk.system_fingerprint
@@ -1435,9 +1433,9 @@ class CustomStreamWrapper:
_json_delta = delta.model_dump()
print_verbose(f"_json_delta: {_json_delta}")
if "role" not in _json_delta or _json_delta["role"] is None:
_json_delta["role"] = (
"assistant" # mistral's api returns role as None
)
_json_delta[
"role"
] = "assistant" # mistral's api returns role as None
if "tool_calls" in _json_delta and isinstance(
_json_delta["tool_calls"], list
):
@@ -1533,7 +1531,7 @@ class CustomStreamWrapper:
async def _call_post_streaming_deployment_hook(self, chunk):
"""
Call the post-call streaming deployment hook for callbacks.
This allows callbacks to modify streaming chunks before they're returned.
"""
try:
@@ -1544,15 +1542,17 @@ class CustomStreamWrapper:
# Get request kwargs from logging object
request_data = self.logging_obj.model_call_details
call_type_str = self.logging_obj.call_type
try:
typed_call_type = CallTypes(call_type_str)
except ValueError:
typed_call_type = None
# Call hooks for all callbacks
for callback in litellm.callbacks:
if isinstance(callback, CustomLogger) and hasattr(callback, "async_post_call_streaming_deployment_hook"):
if isinstance(callback, CustomLogger) and hasattr(
callback, "async_post_call_streaming_deployment_hook"
):
result = await callback.async_post_call_streaming_deployment_hook(
request_data=request_data,
response_chunk=chunk,
@@ -1560,11 +1560,14 @@ class CustomStreamWrapper:
)
if result is not None:
chunk = result
return chunk
except Exception as e:
from litellm._logging import verbose_logger
verbose_logger.exception(f"Error in post-call streaming deployment hook: {str(e)}")
verbose_logger.exception(
f"Error in post-call streaming deployment hook: {str(e)}"
)
return chunk
def cache_streaming_response(self, processed_chunk, cache_hit: bool):
@@ -1687,7 +1690,7 @@ class CustomStreamWrapper:
response, "usage"
): # remove usage from chunk, only send on final chunk
# Convert the object to a dictionary
obj_dict = response.dict()
obj_dict = response.model_dump()
# Remove an attribute (e.g., 'attr2')
if "usage" in obj_dict:
@@ -1852,7 +1855,7 @@ class CustomStreamWrapper:
processed_chunk, "usage"
): # remove usage from chunk, only send on final chunk
# Convert the object to a dictionary
obj_dict = processed_chunk.dict()
obj_dict = processed_chunk.model_dump()
# Remove an attribute (e.g., 'attr2')
if "usage" in obj_dict:
@@ -1872,11 +1875,15 @@ class CustomStreamWrapper:
if self.sent_last_chunk is True and self.stream_options is None:
usage = calculate_total_usage(chunks=self.chunks)
processed_chunk._hidden_params["usage"] = usage
# Call post-call streaming deployment hook for final chunk
if self.sent_last_chunk is True:
processed_chunk = await self._call_post_streaming_deployment_hook(processed_chunk)
processed_chunk = (
await self._call_post_streaming_deployment_hook(
processed_chunk
)
)
return processed_chunk
raise StopAsyncIteration
else: # temporary patch for non-aiohttp async calls
@@ -1890,9 +1897,9 @@ class CustomStreamWrapper:
chunk = next(self.completion_stream)
if chunk is not None and chunk != b"":
print_verbose(f"PROCESSED CHUNK PRE CHUNK CREATOR: {chunk}")
processed_chunk: Optional[ModelResponseStream] = (
self.chunk_creator(chunk=chunk)
)
processed_chunk: Optional[
ModelResponseStream
] = self.chunk_creator(chunk=chunk)
print_verbose(
f"PROCESSED CHUNK POST CHUNK CREATOR: {processed_chunk}"
)