import sys, os import traceback from dotenv import load_dotenv load_dotenv() import os, io, asyncio # this file is to test litellm/proxy sys.path.insert( 0, os.path.abspath("../..") ) # Adds the parent directory to the system path import pytest, time import litellm from litellm import embedding, completion, completion_cost, Timeout from litellm import RateLimitError import importlib, inspect # test /chat/completion request to the proxy from fastapi.testclient import TestClient from fastapi import FastAPI from litellm.proxy.proxy_server import ( router, save_worker_config, initialize, ) # Replace with the actual module where your FastAPI router is defined filepath = os.path.dirname(os.path.abspath(__file__)) python_file_path = f"{filepath}/test_configs/custom_callbacks.py" @pytest.fixture def client(): filepath = os.path.dirname(os.path.abspath(__file__)) config_fp = f"{filepath}/test_configs/test_custom_logger.yaml" app = FastAPI() asyncio.run(initialize(config=config_fp)) app.include_router(router) # Include your router in the test app return TestClient(app) # Your bearer token token = os.getenv("PROXY_MASTER_KEY") headers = {"Authorization": f"Bearer {token}"} print("Testing proxy custom logger") @pytest.mark.skipif( os.environ.get("OPENAI_API_KEY") is None, reason="OPENAI_API_KEY not set - skipping integration test", ) def test_embedding(client): try: litellm.set_verbose = False from litellm.proxy.types_utils.utils import get_instance_fn my_custom_logger = get_instance_fn( value="custom_callbacks.my_custom_logger", config_file_path=python_file_path ) print("id of initialized custom logger", id(my_custom_logger)) litellm.callbacks = [my_custom_logger] # Your test data print("initialized proxy") # import the initialized custom logger print(litellm.callbacks) # assert len(litellm.callbacks) == 1 # assert litellm is initialized with 1 callback print("my_custom_logger", my_custom_logger) assert my_custom_logger.async_success_embedding is False test_data = {"model": "azure-embedding-model", "input": ["hello"]} response = client.post("/embeddings", json=test_data, headers=headers) print("made request", response.status_code, response.text) print( "vars my custom logger /embeddings", vars(my_custom_logger), "id", id(my_custom_logger), ) assert ( my_custom_logger.async_success_embedding is True ) # checks if the status of async_success is True, only the async_log_success_event can set this to true assert ( my_custom_logger.async_embedding_kwargs["model"] == "text-embedding-ada-002" ) # checks if kwargs passed to async_log_success_event are correct kwargs = my_custom_logger.async_embedding_kwargs litellm_params = kwargs.get("litellm_params") # Test 1: Verify metadata is populated correctly metadata = litellm_params.get("metadata", None) print("\n\n Metadata in custom logger kwargs", litellm_params.get("metadata")) assert metadata is not None, "metadata should be present in litellm_params" assert "user_api_key" in metadata, "user_api_key should be in metadata" assert "headers" in metadata, "headers should be in metadata" # Test 2: Verify proxy_server_request contains the original request details proxy_server_request = litellm_params.get("proxy_server_request") assert proxy_server_request is not None, "proxy_server_request should exist" assert ( proxy_server_request.get("url") == "http://testserver/embeddings" ), "url should match" assert proxy_server_request.get("method") == "POST", "method should be POST" assert "headers" in proxy_server_request, "headers should be present" assert "body" in proxy_server_request, "body should be present" # Test 3: Verify request body contains the original input data body = proxy_server_request["body"] assert ( body.get("model") == "azure-embedding-model" ), "model should match original request" assert body.get("input") == ["hello"], "input should match original request" # Test 4: Verify model_info is populated model_info = litellm_params.get("model_info") assert model_info is not None, "model_info should exist" assert model_info.get("mode") == "embedding", "mode should be embedding" assert model_info.get("id") == "hello", "id should match" assert ( model_info.get("input_cost_per_token") == 0.002 ), "input cost should match" result = response.json() print(f"Received response: {result}") print("Passed Embedding custom logger on proxy!") except Exception as e: pytest.fail(f"LiteLLM Proxy test failed. Exception {str(e)}") @pytest.mark.skipif( os.environ.get("OPENAI_API_KEY") is None, reason="OPENAI_API_KEY not set - skipping integration test", ) def test_chat_completion(client): try: # Your test data litellm.set_verbose = False from litellm.proxy.types_utils.utils import get_instance_fn my_custom_logger = get_instance_fn( value="custom_callbacks.my_custom_logger", config_file_path=python_file_path ) print("id of initialized custom logger", id(my_custom_logger)) litellm.callbacks = [my_custom_logger] # import the initialized custom logger print(litellm.callbacks) # assert len(litellm.callbacks) == 1 # assert litellm is initialized with 1 callback print("LiteLLM Callbacks", litellm.callbacks) print("my_custom_logger", my_custom_logger) assert my_custom_logger.async_success == False test_data = { "model": "Azure OpenAI GPT-4 Canada", "messages": [ {"role": "user", "content": "write a litellm poem"}, ], "max_tokens": 10, } response = client.post("/chat/completions", json=test_data, headers=headers) print("made request", response.status_code, response.text) print("LiteLLM Callbacks", litellm.callbacks) time.sleep(1) # sleep while waiting for callback to run print( "my_custom_logger in /chat/completions", my_custom_logger, "id", id(my_custom_logger), ) print("vars my custom logger, ", vars(my_custom_logger)) assert ( my_custom_logger.async_success == True ) # checks if the status of async_success is True, only the async_log_success_event can set this to true assert ( my_custom_logger.async_completion_kwargs["model"] == "gpt-4.1-nano" ) # checks if kwargs passed to async_log_success_event are correct print( "\n\n Custom Logger Async Completion args", my_custom_logger.async_completion_kwargs, ) litellm_params = my_custom_logger.async_completion_kwargs.get("litellm_params") # Test 1: Verify metadata is populated correctly metadata = litellm_params.get("metadata", None) print("\n\n Metadata in custom logger kwargs", litellm_params.get("metadata")) assert metadata is not None, "metadata should be present" assert "user_api_key" in metadata, "user_api_key should be in metadata" assert ( "user_api_key_metadata" in metadata ), "user_api_key_metadata should be in metadata" assert "headers" in metadata, "headers should be in metadata" # Test 2: Verify model_info is populated config_model_info = litellm_params.get("model_info") assert config_model_info is not None, "model_info should exist" assert config_model_info.get("id") == "gm", "model id should match" assert config_model_info.get("mode") == "chat", "mode should be chat" assert ( config_model_info.get("input_cost_per_token") == 0.0002 ), "input cost should match" # Test 3: Verify proxy_server_request contains request details proxy_server_request_object = litellm_params.get("proxy_server_request") assert ( proxy_server_request_object is not None ), "proxy_server_request should exist" assert ( proxy_server_request_object.get("url") == "http://testserver/chat/completions" ), "url should match" assert ( proxy_server_request_object.get("method") == "POST" ), "method should be POST" # Test 4: Verify authorization is not leaked in logged headers assert ( "authorization" not in proxy_server_request_object["headers"] ), "authorization should not be in headers" # Test 5: Verify request body contains original input data body = proxy_server_request_object.get("body", {}) assert ( body.get("model") == "Azure OpenAI GPT-4 Canada" ), "model should match original request" assert body.get("messages") == [ {"role": "user", "content": "write a litellm poem"} ], "messages should match" assert body.get("max_tokens") == 10, "max_tokens should match" result = response.json() print(f"Received response: {result}") print("\nPassed /chat/completions with Custom Logger!") except Exception as e: pytest.fail(f"LiteLLM Proxy test failed. Exception {str(e)}") @pytest.mark.skipif( os.environ.get("OPENAI_API_KEY") is None, reason="OPENAI_API_KEY not set - skipping integration test", ) def test_chat_completion_stream(client): try: # Your test data litellm.set_verbose = False from litellm.proxy.types_utils.utils import get_instance_fn my_custom_logger = get_instance_fn( value="custom_callbacks.my_custom_logger", config_file_path=python_file_path ) print("id of initialized custom logger", id(my_custom_logger)) litellm.callbacks = [my_custom_logger] import json print("initialized proxy") # import the initialized custom logger print(litellm.callbacks) print("LiteLLM Callbacks", litellm.callbacks) print("my_custom_logger", my_custom_logger) assert ( my_custom_logger.streaming_response_obj == None ) # no streaming response obj is set pre call test_data = { "model": "Azure OpenAI GPT-4 Canada", "messages": [ {"role": "user", "content": "write 1 line poem about LiteLLM"}, ], "max_tokens": 40, "stream": True, # streaming call } response = client.post("/chat/completions", json=test_data, headers=headers) print("made request", response.status_code, response.text) complete_response = "" for line in response.iter_lines(): if line: # Process the streaming data line here print("\n\n Line", line) print(line) line = str(line) json_data = line.replace("data: ", "") if "[DONE]" in json_data: break # Parse the JSON string data = json.loads(json_data) print("\n\n decode_data", data) # Access the content of choices[0]['message']['content'] content = data["choices"][0]["delta"].get("content", None) or "" # Process the content as needed print("Content:", content) complete_response += content print("\n\nHERE is the complete streaming response string", complete_response) print("\n\nHERE IS the streaming Response from callback\n\n") print(my_custom_logger.streaming_response_obj) import time time.sleep(0.5) streamed_response = my_custom_logger.streaming_response_obj assert ( complete_response == streamed_response["choices"][0]["message"]["content"] ) except Exception as e: pytest.fail(f"LiteLLM Proxy test failed. Exception {str(e)}")