Files
litellm/tests/logging_callback_tests/test_langfuse_e2e_test.py
T
Krish Dholakia 010a4d44af Fix passing standard optional params (#11124)
* fix(main.py): use processed non-default-params as standard input params for langfuse

Fixes https://github.com/BerriAI/litellm/issues/11072

 Fixes https://github.com/BerriAI/litellm/issues/11096

* fix(main.py): rename variable to be more accurate

* test(test_langfuse_e2e_test.py): add router unit test for langfuse e2e testing

Prevent https://github.com/BerriAI/litellm/issues/11072 from happening again

* build: update lock

* fix(utils.py): refactor optional params function

make it easier to get the standardized non default params

* fix(utils.py): improve process non default params function

* fix(main.py): include provider specific params in processed non default params used in logging

ensures user can see any provider specific params on langfuse

 ensures user can see any provider specific params on langfus e
2025-05-24 12:12:31 -07:00

422 lines
16 KiB
Python

import asyncio
import copy
import json
import logging
import os
import sys
from typing import Any, Optional
from unittest.mock import MagicMock, patch
import threading
logging.basicConfig(level=logging.DEBUG)
sys.path.insert(0, os.path.abspath("../.."))
import litellm
from litellm import completion
from litellm.caching import InMemoryCache
litellm.num_retries = 3
litellm.success_callback = ["langfuse"]
os.environ["LANGFUSE_DEBUG"] = "True"
import time
import pytest
def assert_langfuse_request_matches_expected(
actual_request_body: dict,
expected_file_name: str,
trace_id: Optional[str] = None,
):
"""
Helper function to compare actual Langfuse request body with expected JSON file.
Args:
actual_request_body (dict): The actual request body received from the API call
expected_file_name (str): Name of the JSON file containing expected request body (e.g., "transcription.json")
"""
# Get the current directory and read the expected request body
pwd = os.path.dirname(os.path.realpath(__file__))
expected_body_path = os.path.join(
pwd, "langfuse_expected_request_body", expected_file_name
)
with open(expected_body_path, "r") as f:
expected_request_body = json.load(f)
# Filter out events that don't match the trace_id
if trace_id:
actual_request_body["batch"] = [
item
for item in actual_request_body["batch"]
if (item["type"] == "trace-create" and item["body"].get("id") == trace_id)
or (
item["type"] == "generation-create"
and item["body"].get("traceId") == trace_id
)
]
print(
"actual_request_body after filtering", json.dumps(actual_request_body, indent=4)
)
# Replace dynamic values in actual request body
for item in actual_request_body["batch"]:
# Replace IDs with expected IDs
if item["type"] == "trace-create":
item["id"] = expected_request_body["batch"][0]["id"]
item["body"]["id"] = expected_request_body["batch"][0]["body"]["id"]
item["timestamp"] = expected_request_body["batch"][0]["timestamp"]
item["body"]["timestamp"] = expected_request_body["batch"][0]["body"][
"timestamp"
]
elif item["type"] == "generation-create":
item["id"] = expected_request_body["batch"][1]["id"]
item["body"]["id"] = expected_request_body["batch"][1]["body"]["id"]
item["timestamp"] = expected_request_body["batch"][1]["timestamp"]
item["body"]["startTime"] = expected_request_body["batch"][1]["body"][
"startTime"
]
item["body"]["endTime"] = expected_request_body["batch"][1]["body"][
"endTime"
]
item["body"]["completionStartTime"] = expected_request_body["batch"][1][
"body"
]["completionStartTime"]
if trace_id is None:
print("popping traceId")
item["body"].pop("traceId")
else:
item["body"]["traceId"] = trace_id
expected_request_body["batch"][1]["body"]["traceId"] = trace_id
# Replace SDK version with expected version
actual_request_body["batch"][0]["body"].pop("release", None)
actual_request_body["metadata"]["sdk_version"] = expected_request_body["metadata"][
"sdk_version"
]
# replace "public_key" with expected public key
actual_request_body["metadata"]["public_key"] = expected_request_body["metadata"][
"public_key"
]
actual_request_body["batch"][1]["body"]["metadata"] = expected_request_body[
"batch"
][1]["body"]["metadata"]
actual_request_body["metadata"]["sdk_integration"] = expected_request_body[
"metadata"
]["sdk_integration"]
actual_request_body["metadata"]["batch_size"] = expected_request_body["metadata"][
"batch_size"
]
# Assert the entire request body matches
assert (
actual_request_body == expected_request_body
), f"Difference in request bodies: {json.dumps(actual_request_body, indent=2)} != {json.dumps(expected_request_body, indent=2)}"
class TestLangfuseLogging:
@pytest.fixture
async def mock_setup(self):
"""Common setup for Langfuse logging tests"""
import uuid
from unittest.mock import AsyncMock, patch
import httpx
# Create a mock Response object
mock_response = AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.json.return_value = {"status": "success"}
# Create mock for httpx.Client.post
mock_post = AsyncMock()
mock_post.return_value = mock_response
litellm.set_verbose = True
litellm.success_callback = ["langfuse"]
return {"trace_id": f"litellm-test-{str(uuid.uuid4())}", "mock_post": mock_post}
async def _verify_langfuse_call(
self,
mock_post,
expected_file_name: str,
trace_id: str,
):
"""Helper method to verify Langfuse API calls"""
await asyncio.sleep(3)
# Verify the call
assert mock_post.call_count >= 1
url = mock_post.call_args[0][0]
request_body = mock_post.call_args[1].get("content")
# Parse the JSON string into a dict for assertions
actual_request_body = json.loads(request_body)
print("\nMocked Request Details:")
print(f"URL: {url}")
print(f"Request Body: {json.dumps(actual_request_body, indent=4)}")
assert url == "https://us.cloud.langfuse.com/api/public/ingestion"
assert_langfuse_request_matches_expected(
actual_request_body,
expected_file_name,
trace_id,
)
@pytest.mark.asyncio
async def test_langfuse_logging_completion(self, mock_setup):
"""Test Langfuse logging for chat completion"""
setup = await mock_setup # Await the fixture
with patch("httpx.Client.post", setup["mock_post"]):
await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello!"}],
mock_response="Hello! How can I assist you today?",
metadata={"trace_id": setup["trace_id"]},
)
await self._verify_langfuse_call(
setup["mock_post"], "completion.json", setup["trace_id"]
)
@pytest.mark.asyncio
async def test_langfuse_logging_completion_with_tags(self, mock_setup):
"""Test Langfuse logging for chat completion with tags"""
setup = await mock_setup # Await the fixture
with patch("httpx.Client.post", setup["mock_post"]):
await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello!"}],
mock_response="Hello! How can I assist you today?",
metadata={
"trace_id": setup["trace_id"],
"tags": ["test_tag", "test_tag_2"],
},
)
await self._verify_langfuse_call(
setup["mock_post"], "completion_with_tags.json", setup["trace_id"]
)
@pytest.mark.asyncio
async def test_langfuse_logging_completion_with_tags_stream(self, mock_setup):
"""Test Langfuse logging for chat completion with tags"""
setup = await mock_setup # Await the fixture
with patch("httpx.Client.post", setup["mock_post"]):
await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello!"}],
mock_response="Hello! How can I assist you today?",
metadata={
"trace_id": setup["trace_id"],
"tags": ["test_tag_stream", "test_tag_2_stream"],
},
)
await self._verify_langfuse_call(
setup["mock_post"],
"completion_with_tags_stream.json",
setup["trace_id"],
)
@pytest.mark.asyncio
async def test_langfuse_logging_completion_with_langfuse_metadata(self, mock_setup):
"""Test Langfuse logging for chat completion with metadata for langfuse"""
setup = await mock_setup # Await the fixture
with patch("httpx.Client.post", setup["mock_post"]):
await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello!"}],
mock_response="Hello! How can I assist you today?",
metadata={
"trace_id": setup["trace_id"],
"tags": ["test_tag", "test_tag_2"],
"generation_name": "test_generation_name",
"parent_observation_id": "test_parent_observation_id",
"version": "test_version",
"trace_user_id": "test_user_id",
"session_id": "test_session_id",
"trace_name": "test_trace_name",
"trace_metadata": {"test_key": "test_value"},
"trace_version": "test_trace_version",
"trace_release": "test_trace_release",
},
)
await self._verify_langfuse_call(
setup["mock_post"],
"completion_with_langfuse_metadata.json",
setup["trace_id"],
)
@pytest.mark.asyncio
async def test_langfuse_logging_with_non_serializable_metadata(self, mock_setup):
"""Test Langfuse logging with metadata that requires preparation (Pydantic models, sets, etc)"""
from pydantic import BaseModel
from typing import Set
import datetime
class UserPreferences(BaseModel):
favorite_colors: Set[str]
last_login: datetime.datetime
settings: dict
setup = await mock_setup
test_metadata = {
"user_prefs": UserPreferences(
favorite_colors={"red", "blue"},
last_login=datetime.datetime.now(),
settings={"theme": "dark", "notifications": True},
),
"nested_set": {
"inner_set": {1, 2, 3},
"inner_pydantic": UserPreferences(
favorite_colors={"green", "yellow"},
last_login=datetime.datetime.now(),
settings={"theme": "light"},
),
},
"trace_id": setup["trace_id"],
}
with patch("httpx.Client.post", setup["mock_post"]):
response = await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello!"}],
mock_response="Hello! How can I assist you today?",
metadata=test_metadata,
)
await self._verify_langfuse_call(
setup["mock_post"],
"completion_with_complex_metadata.json",
setup["trace_id"],
)
@pytest.mark.asyncio
@pytest.mark.parametrize(
"test_metadata, response_json_file",
[
({"a": 1, "b": 2, "c": 3}, "simple_metadata.json"),
(
{"a": {"nested_a": 1}, "b": {"nested_b": 2}},
"nested_metadata.json",
),
({"a": [1, 2, 3], "b": {4, 5, 6}}, "simple_metadata2.json"),
(
{"a": (1, 2), "b": frozenset([3, 4]), "c": {"d": [5, 6]}},
"simple_metadata3.json",
),
({"lock": threading.Lock()}, "metadata_with_lock.json"),
({"func": lambda x: x + 1}, "metadata_with_function.json"),
(
{
"int": 42,
"str": "hello",
"list": [1, 2, 3],
"set": {4, 5},
"dict": {"nested": "value"},
"non_copyable": threading.Lock(),
"function": print,
},
"complex_metadata.json",
),
(
{"list": ["list", "not", "a", "dict"]},
"complex_metadata_2.json",
),
({}, "empty_metadata.json"),
],
)
@pytest.mark.flaky(retries=6, delay=1)
async def test_langfuse_logging_with_various_metadata_types(
self, mock_setup, test_metadata, response_json_file
):
"""Test Langfuse logging with various metadata types including non-serializable objects"""
import threading
setup = await mock_setup
if test_metadata is not None:
test_metadata["trace_id"] = setup["trace_id"]
with patch("httpx.Client.post", setup["mock_post"]):
await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello!"}],
mock_response="Hello! How can I assist you today?",
metadata=test_metadata,
)
await self._verify_langfuse_call(
setup["mock_post"],
response_json_file,
setup["trace_id"],
)
@pytest.mark.asyncio
async def test_langfuse_logging_completion_with_malformed_llm_response(
self, mock_setup
):
"""Test Langfuse logging for chat completion with malformed LLM response"""
setup = await mock_setup # Await the fixture
litellm._turn_on_debug()
with patch("httpx.Client.post", setup["mock_post"]):
mock_response = litellm.ModelResponse(
choices=[],
usage=litellm.Usage(
prompt_tokens=10,
completion_tokens=10,
total_tokens=20,
),
model="gpt-3.5-turbo",
object="chat.completion",
created=1723081200,
).model_dump()
await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello!"}],
mock_response=mock_response,
metadata={"trace_id": setup["trace_id"]},
)
await self._verify_langfuse_call(
setup["mock_post"], "completion_with_no_choices.json", setup["trace_id"]
)
@pytest.mark.asyncio
async def test_langfuse_logging_with_router(self, mock_setup):
"""Test Langfuse logging with router"""
setup = await mock_setup # Await the fixture
litellm._turn_on_debug()
router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"mock_response": "Hello! How can I assist you today?",
"api_key": "test_api_key",
}
}
]
)
with patch("httpx.Client.post", setup["mock_post"]):
mock_response = litellm.ModelResponse(
choices=[],
usage=litellm.Usage(
prompt_tokens=10,
completion_tokens=10,
total_tokens=20,
),
model="gpt-3.5-turbo",
object="chat.completion",
created=1723081200,
).model_dump()
await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello!"}],
mock_response=mock_response,
metadata={"trace_id": setup["trace_id"]},
)
await self._verify_langfuse_call(
setup["mock_post"], "completion_with_router.json", setup["trace_id"]
)