mirror of
https://github.com/tiennm99/litellm.git
synced 2026-06-17 18:48:36 +00:00
278c9babc6
* fix(test): add missing mocks for test_streamable_http_mcp_handler_mock
The test was missing mocks for extract_mcp_auth_context and set_auth_context,
causing the handler to fail silently in the except block instead of reaching
session_manager.handle_request. This mirrors the fix already applied to the
sibling test_sse_mcp_handler_mock.
Co-authored-by: Ishaan Jaff <ishaan-jaff@users.noreply.github.com>
* fix(ci): route OpenAI models through chat completions in pass-through tests
The test_anthropic_messages_openai_model_streaming_cost_injection test fails
because the OpenAI Responses API returns 400 for requests routed through the
Anthropic Messages endpoint. Setting LITELLM_USE_CHAT_COMPLETIONS_URL_FOR_ANTHROPIC_MESSAGES=true
routes OpenAI models through the stable chat completions path instead.
Cost injection still works since it happens at the proxy level.
Co-authored-by: Ishaan Jaff <ishaan-jaff@users.noreply.github.com>
* fix(ci): fix assemblyai custom auth and router wildcard test flakiness
1. custom_auth_basic.py: Add user_role='proxy_admin' so the custom auth
user can access management endpoints like /key/generate. The test
test_assemblyai_transcribe_with_non_admin_key was hidden behind an
earlier -x failure and was never reached before.
2. test_router_utils.py: Add flaky(retries=3) and increase sleep from 1s
to 2s for test_router_get_model_group_usage_wildcard_routes. The async
callback needs time to write usage to cache, and 1s is insufficient on
slower CI hardware.
Co-authored-by: Ishaan Jaff <ishaan-jaff@users.noreply.github.com>
* ci: retrigger CI pipeline
Co-authored-by: Ishaan Jaff <ishaan-jaff@users.noreply.github.com>
* fix(mypy): use LitellmUserRoles enum instead of raw string in custom_auth_basic
Fixes mypy error: Argument 'user_role' has incompatible type 'str'; expected 'LitellmUserRoles | None'
Co-authored-by: Ishaan Jaff <ishaan-jaff@users.noreply.github.com>
* fix: don't close HTTP/SDK clients on LLMClientCache eviction (#22926)
* fix: don't close HTTP/SDK clients on LLMClientCache eviction
Removing the _remove_key override that eagerly called aclose()/close()
on evicted clients. Evicted clients may still be held by in-flight
streaming requests; closing them causes:
RuntimeError: Cannot send a request, as the client has been closed.
This is a regression from commit fb72979432. Clients that are no longer
referenced will be garbage-collected naturally. Explicit shutdown cleanup
happens via close_litellm_async_clients().
Fixes production crashes after the 1-hour cache TTL expires.
* test: update LLMClientCache unit tests for no-close-on-eviction behavior
Flip the assertions: evicted clients must NOT be closed. Replace
test_remove_key_closes_async_client → test_remove_key_does_not_close_async_client
and equivalents for sync/eviction paths.
Add test_remove_key_removes_plain_values for non-client cache entries.
Remove test_background_tasks_cleaned_up_after_completion (no more _background_tasks).
Remove test_remove_key_no_event_loop variant that depended on old behavior.
* test: add e2e tests for OpenAI SDK client surviving cache eviction
Add two new e2e tests using real AsyncOpenAI clients:
- test_evicted_openai_sdk_client_stays_usable: verifies size-based eviction
doesn't close the client
- test_ttl_expired_openai_sdk_client_stays_usable: verifies TTL expiry
eviction doesn't close the client
Both tests sleep after eviction so any create_task()-based close would
have time to run, making the regression detectable.
Also expand the module docstring to explain why the sleep is required.
* docs(AGENTS.md): add rule — never close HTTP/SDK clients on cache eviction
* docs(CLAUDE.md): add HTTP client cache safety guideline
* [Fix] Install bsdmainutils for column command in security scans
The security_scans.sh script uses `column` to format vulnerability
output, but the package wasn't installed in the CI environment.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix: handle string callback values in prometheus multiproc setup
When callbacks are configured as a plain string (e.g., `callbacks: "my_callback"`)
instead of a list, the proxy crashes on startup with:
TypeError: can only concatenate str (not "list") to str
Normalize each callback setting to a list before concatenating.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* bump: version 1.82.2 → 1.82.3
* fix(test): update test_startup_fails_when_db_setup_fails for opt-in enforcement
The --enforce_prisma_migration_check flag is now required to trigger
sys.exit(1) on DB migration failure, after #23675 flipped the default
behavior to warn-and-continue.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix(cost_calculator): use model name for per-request custom pricing when router_model_id has no pricing
When custom pricing is passed as per-request kwargs (input_cost_per_token/output_cost_per_token),
completion() registers pricing under the model name, but _select_model_name_for_cost_calc was
selecting the router deployment hash (which has no pricing data), causing response_cost to be 0.0.
Now checks whether the router_model_id entry actually has pricing before preferring it.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
---------
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: Ishaan Jaff <ishaan-jaff@users.noreply.github.com>
Co-authored-by: Ishaan Jaff <ishaanjaffer0324@gmail.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
567 lines
18 KiB
Python
567 lines
18 KiB
Python
#### What this tests ####
|
|
# This tests utils used by llm router -> like llmrouter.get_settings()
|
|
|
|
import sys, os, time
|
|
import traceback, asyncio
|
|
import pytest
|
|
|
|
sys.path.insert(
|
|
0, os.path.abspath("../..")
|
|
) # Adds the parent directory to the system path
|
|
import litellm
|
|
from litellm import Router
|
|
from litellm.router import Deployment, LiteLLM_Params
|
|
from litellm.types.router import ModelInfo
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from collections import defaultdict
|
|
from dotenv import load_dotenv
|
|
from unittest.mock import patch, MagicMock, AsyncMock
|
|
|
|
load_dotenv()
|
|
|
|
|
|
def test_returned_settings():
|
|
# this tests if the router raises an exception when invalid params are set
|
|
# in this test both deployments have bad keys - Keep this test. It validates if the router raises the most recent exception
|
|
litellm.set_verbose = True
|
|
import openai
|
|
|
|
try:
|
|
print("testing if router raises an exception")
|
|
model_list = [
|
|
{
|
|
"model_name": "gpt-3.5-turbo", # openai model name
|
|
"litellm_params": { # params for litellm completion/embedding call
|
|
"model": "azure/gpt-4.1-mini",
|
|
"api_key": "bad-key",
|
|
"api_version": os.getenv("AZURE_API_VERSION"),
|
|
"api_base": os.getenv("AZURE_API_BASE"),
|
|
},
|
|
"tpm": 240000,
|
|
"rpm": 1800,
|
|
},
|
|
{
|
|
"model_name": "gpt-3.5-turbo", # openai model name
|
|
"litellm_params": { #
|
|
"model": "gpt-3.5-turbo",
|
|
"api_key": "bad-key",
|
|
},
|
|
"tpm": 240000,
|
|
"rpm": 1800,
|
|
},
|
|
]
|
|
router = Router(
|
|
model_list=model_list,
|
|
redis_host=os.getenv("REDIS_HOST"),
|
|
redis_password=os.getenv("REDIS_PASSWORD"),
|
|
redis_port=int(os.getenv("REDIS_PORT")),
|
|
routing_strategy="latency-based-routing",
|
|
routing_strategy_args={"ttl": 10},
|
|
set_verbose=False,
|
|
num_retries=3,
|
|
retry_after=5,
|
|
allowed_fails=1,
|
|
cooldown_time=30,
|
|
) # type: ignore
|
|
|
|
settings = router.get_settings()
|
|
print(settings)
|
|
|
|
"""
|
|
routing_strategy: "simple-shuffle"
|
|
routing_strategy_args: {"ttl": 10} # Average the last 10 calls to compute avg latency per model
|
|
allowed_fails: 1
|
|
num_retries: 3
|
|
retry_after: 5 # seconds to wait before retrying a failed request
|
|
cooldown_time: 30 # seconds to cooldown a deployment after failure
|
|
"""
|
|
assert settings["routing_strategy"] == "latency-based-routing"
|
|
assert settings["routing_strategy_args"]["ttl"] == 10
|
|
assert settings["allowed_fails"] == 1
|
|
assert settings["num_retries"] == 3
|
|
assert settings["retry_after"] == 5
|
|
assert settings["cooldown_time"] == 30
|
|
|
|
except Exception:
|
|
print(traceback.format_exc())
|
|
pytest.fail("An error occurred - " + traceback.format_exc())
|
|
|
|
|
|
from litellm.types.utils import CallTypes
|
|
|
|
|
|
def test_update_kwargs_before_fallbacks_unit_test():
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gpt-3.5-turbo",
|
|
"litellm_params": {
|
|
"model": "azure/gpt-4.1-mini",
|
|
"api_key": "bad-key",
|
|
"api_version": os.getenv("AZURE_API_VERSION"),
|
|
"api_base": os.getenv("AZURE_API_BASE"),
|
|
},
|
|
}
|
|
],
|
|
)
|
|
|
|
kwargs = {"messages": [{"role": "user", "content": "write 1 sentence poem"}]}
|
|
|
|
router._update_kwargs_before_fallbacks(
|
|
model="gpt-3.5-turbo",
|
|
kwargs=kwargs,
|
|
)
|
|
|
|
assert kwargs["litellm_trace_id"] is not None
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"call_type",
|
|
[
|
|
CallTypes.acompletion,
|
|
CallTypes.atext_completion,
|
|
CallTypes.aembedding,
|
|
CallTypes.arerank,
|
|
CallTypes.atranscription,
|
|
],
|
|
)
|
|
@pytest.mark.asyncio
|
|
async def test_update_kwargs_before_fallbacks(call_type):
|
|
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gpt-3.5-turbo",
|
|
"litellm_params": {
|
|
"model": "azure/gpt-4.1-mini",
|
|
"api_key": "bad-key",
|
|
"api_version": os.getenv("AZURE_API_VERSION"),
|
|
"api_base": os.getenv("AZURE_API_BASE"),
|
|
},
|
|
}
|
|
],
|
|
)
|
|
|
|
if call_type.value.startswith("a"):
|
|
with patch.object(router, "async_function_with_fallbacks") as mock_client:
|
|
if call_type.value == "acompletion":
|
|
input_kwarg = {
|
|
"messages": [{"role": "user", "content": "Hello, how are you?"}],
|
|
}
|
|
elif (
|
|
call_type.value == "atext_completion"
|
|
or call_type.value == "aimage_generation"
|
|
):
|
|
input_kwarg = {
|
|
"prompt": "Hello, how are you?",
|
|
}
|
|
elif call_type.value == "aembedding" or call_type.value == "arerank":
|
|
input_kwarg = {
|
|
"input": "Hello, how are you?",
|
|
}
|
|
elif call_type.value == "atranscription":
|
|
input_kwarg = {
|
|
"file": "path/to/file",
|
|
}
|
|
else:
|
|
input_kwarg = {}
|
|
|
|
await getattr(router, call_type.value)(
|
|
model="gpt-3.5-turbo",
|
|
**input_kwarg,
|
|
)
|
|
|
|
mock_client.assert_called_once()
|
|
|
|
print(mock_client.call_args.kwargs)
|
|
assert mock_client.call_args.kwargs["litellm_trace_id"] is not None
|
|
|
|
|
|
def test_router_get_model_info_wildcard_routes():
|
|
os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True"
|
|
litellm.model_cost = litellm.get_model_cost_map(url="")
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gemini/*",
|
|
"litellm_params": {"model": "gemini/*"},
|
|
"model_info": {"id": 1},
|
|
},
|
|
]
|
|
)
|
|
model_info = router.get_router_model_info(
|
|
deployment=None, received_model_name="gemini/gemini-1.5-flash", id="1"
|
|
)
|
|
print(model_info)
|
|
assert model_info is not None
|
|
assert model_info["tpm"] is not None
|
|
assert model_info["rpm"] is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.flaky(retries=3, delay=1)
|
|
async def test_router_get_model_group_usage_wildcard_routes():
|
|
os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True"
|
|
litellm.model_cost = litellm.get_model_cost_map(url="")
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gemini/*",
|
|
"litellm_params": {"model": "gemini/*"},
|
|
"model_info": {"id": 1},
|
|
},
|
|
]
|
|
)
|
|
|
|
resp = await router.acompletion(
|
|
model="gemini/gemini-1.5-flash",
|
|
messages=[{"role": "user", "content": "Hello, how are you?"}],
|
|
mock_response="Hello, I'm good.",
|
|
)
|
|
print(resp)
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
tpm, rpm = await router.get_model_group_usage(model_group="gemini/gemini-1.5-flash")
|
|
|
|
assert tpm is not None, "tpm is None"
|
|
assert rpm is not None, "rpm is None"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_call_router_callbacks_on_success():
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gemini/*",
|
|
"litellm_params": {"model": "gemini/*"},
|
|
"model_info": {"id": 1},
|
|
},
|
|
]
|
|
)
|
|
|
|
with patch.object(
|
|
router.cache, "async_increment_cache_pipeline", new=AsyncMock()
|
|
) as mock_callback:
|
|
await router.acompletion(
|
|
model="gemini/gemini-1.5-flash",
|
|
messages=[{"role": "user", "content": "Hello, how are you?"}],
|
|
mock_response="Hello, I'm good.",
|
|
)
|
|
await asyncio.sleep(1)
|
|
assert mock_callback.call_count == 1
|
|
|
|
increment_list = mock_callback.call_args_list[0].kwargs["increment_list"]
|
|
assert len(increment_list) == 2
|
|
|
|
for increment in increment_list:
|
|
if "tpm" in increment["key"]:
|
|
assert increment["key"].startswith(
|
|
"global_router:1:gemini/gemini-1.5-flash:tpm"
|
|
)
|
|
assert increment["increment_value"] == 30
|
|
elif "rpm" in increment["key"]:
|
|
assert increment["key"].startswith(
|
|
"global_router:1:gemini/gemini-1.5-flash:rpm"
|
|
)
|
|
assert increment["increment_value"] == 1
|
|
|
|
@pytest.mark.serial
|
|
@pytest.mark.asyncio
|
|
async def test_call_router_callbacks_on_failure():
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gemini/*",
|
|
"litellm_params": {"model": "gemini/*"},
|
|
"model_info": {"id": 1},
|
|
},
|
|
]
|
|
)
|
|
|
|
with patch.object(
|
|
router.cache, "async_increment_cache", new=AsyncMock()
|
|
) as mock_callback:
|
|
with pytest.raises(litellm.RateLimitError):
|
|
await router.acompletion(
|
|
model="gemini/gemini-1.5-flash",
|
|
messages=[{"role": "user", "content": "Hello, how are you?"}],
|
|
mock_response="litellm.RateLimitError",
|
|
num_retries=0,
|
|
)
|
|
await asyncio.sleep(3)
|
|
print(mock_callback.call_args_list)
|
|
assert mock_callback.call_count == 1
|
|
|
|
assert (
|
|
mock_callback.call_args_list[0]
|
|
.kwargs["key"]
|
|
.startswith("global_router:1:gemini/gemini-1.5-flash:rpm")
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_router_model_group_headers():
|
|
os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True"
|
|
litellm.model_cost = litellm.get_model_cost_map(url="")
|
|
from litellm.types.utils import OPENAI_RESPONSE_HEADERS
|
|
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gemini/*",
|
|
"litellm_params": {"model": "gemini/*"},
|
|
"model_info": {"id": 1},
|
|
}
|
|
]
|
|
)
|
|
|
|
for _ in range(2):
|
|
resp = await router.acompletion(
|
|
model="gemini/gemini-1.5-flash",
|
|
messages=[{"role": "user", "content": "Hello, how are you?"}],
|
|
mock_response="Hello, I'm good.",
|
|
)
|
|
await asyncio.sleep(1)
|
|
|
|
assert (
|
|
resp._hidden_params["additional_headers"]["x-litellm-model-group"]
|
|
== "gemini/gemini-1.5-flash"
|
|
)
|
|
|
|
assert "x-ratelimit-remaining-requests" in resp._hidden_params["additional_headers"]
|
|
assert "x-ratelimit-remaining-tokens" in resp._hidden_params["additional_headers"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_remaining_model_group_usage():
|
|
os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True"
|
|
litellm.model_cost = litellm.get_model_cost_map(url="")
|
|
from litellm.types.utils import OPENAI_RESPONSE_HEADERS
|
|
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gemini/*",
|
|
"litellm_params": {"model": "gemini/*"},
|
|
"model_info": {"id": 1},
|
|
}
|
|
]
|
|
)
|
|
for _ in range(2):
|
|
resp = await router.acompletion(
|
|
model="gemini/gemini-1.5-flash",
|
|
messages=[{"role": "user", "content": "Hello, how are you?"}],
|
|
mock_response="Hello, I'm good.",
|
|
)
|
|
assert (
|
|
"x-ratelimit-remaining-tokens" in resp._hidden_params["additional_headers"]
|
|
)
|
|
assert (
|
|
"x-ratelimit-remaining-requests"
|
|
in resp._hidden_params["additional_headers"]
|
|
)
|
|
await asyncio.sleep(1)
|
|
|
|
remaining_usage = await router.get_remaining_model_group_usage(
|
|
model_group="gemini/gemini-1.5-flash"
|
|
)
|
|
assert remaining_usage is not None
|
|
assert "x-ratelimit-remaining-requests" in remaining_usage
|
|
assert "x-ratelimit-remaining-tokens" in remaining_usage
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"potential_access_group, expected_result",
|
|
[("gemini-models", True), ("gemini-models-2", False), ("gemini/*", False)],
|
|
)
|
|
def test_router_get_model_access_groups(potential_access_group, expected_result):
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gemini/*",
|
|
"litellm_params": {"model": "gemini/*"},
|
|
"model_info": {"id": 1, "access_groups": ["gemini-models"]},
|
|
},
|
|
]
|
|
)
|
|
access_groups = router._is_model_access_group_for_wildcard_route(
|
|
model_access_group=potential_access_group
|
|
)
|
|
assert access_groups == expected_result
|
|
|
|
|
|
def test_router_redis_cache():
|
|
router = Router(
|
|
model_list=[{"model_name": "gemini/*", "litellm_params": {"model": "gemini/*"}}]
|
|
)
|
|
|
|
redis_cache = MagicMock()
|
|
|
|
router._update_redis_cache(cache=redis_cache)
|
|
|
|
assert router.cache.redis_cache == redis_cache
|
|
|
|
|
|
def test_router_handle_clientside_credential():
|
|
deployment = {
|
|
"model_name": "gemini/*",
|
|
"litellm_params": {"model": "gemini/*"},
|
|
"model_info": {
|
|
"id": "1",
|
|
},
|
|
}
|
|
router = Router(model_list=[deployment])
|
|
|
|
new_deployment = router._handle_clientside_credential(
|
|
deployment=deployment,
|
|
kwargs={
|
|
"api_key": "123",
|
|
"metadata": {"model_group": "gemini/gemini-1.5-flash"},
|
|
},
|
|
function_name="acompletion",
|
|
)
|
|
|
|
assert new_deployment.litellm_params.api_key == "123"
|
|
assert len(router.get_model_list()) == 2
|
|
|
|
|
|
def test_router_get_async_openai_model_client():
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gemini/*",
|
|
"litellm_params": {
|
|
"model": "gemini/*",
|
|
"api_base": "https://api.gemini.com",
|
|
},
|
|
}
|
|
]
|
|
)
|
|
model_client = router._get_async_openai_model_client(
|
|
deployment=MagicMock(), kwargs={}
|
|
)
|
|
assert model_client is None
|
|
|
|
|
|
def test_router_get_deployment_credentials():
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gemini/*",
|
|
"litellm_params": {"model": "gemini/*", "api_key": "123"},
|
|
"model_info": {"id": "1"},
|
|
}
|
|
]
|
|
)
|
|
credentials = router.get_deployment_credentials(model_id="1")
|
|
assert credentials is not None
|
|
assert credentials["api_key"] == "123"
|
|
|
|
|
|
def test_router_get_deployment_credentials_with_provider():
|
|
"""
|
|
Test that get_deployment_credentials_with_provider returns credentials with provider info.
|
|
"""
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gpt-4o",
|
|
"litellm_params": {
|
|
"model": "gpt-4o",
|
|
"api_key": "sk-test-123",
|
|
"api_base": "https://api.openai.com/v1",
|
|
},
|
|
"model_info": {"id": "openai-deployment-1"},
|
|
},
|
|
{
|
|
"model_name": "claude-3",
|
|
"litellm_params": {
|
|
"model": "anthropic/claude-3-sonnet",
|
|
"api_key": "sk-ant-123",
|
|
},
|
|
"model_info": {"id": "anthropic-deployment-1"},
|
|
},
|
|
]
|
|
)
|
|
|
|
# Test getting credentials by model_id
|
|
credentials = router.get_deployment_credentials_with_provider(model_id="openai-deployment-1")
|
|
assert credentials is not None
|
|
assert credentials["api_key"] == "sk-test-123"
|
|
assert credentials["custom_llm_provider"] == "openai"
|
|
assert credentials["api_base"] == "https://api.openai.com/v1"
|
|
|
|
# Test getting credentials by model_group_name (model_name)
|
|
credentials2 = router.get_deployment_credentials_with_provider(model_id="claude-3")
|
|
assert credentials2 is not None
|
|
assert credentials2["api_key"] == "sk-ant-123"
|
|
assert credentials2["custom_llm_provider"] == "anthropic"
|
|
|
|
# Test with non-existent model
|
|
credentials3 = router.get_deployment_credentials_with_provider(model_id="non-existent")
|
|
assert credentials3 is None
|
|
|
|
|
|
def test_router_get_deployment_credentials_with_provider_wildcard():
|
|
"""
|
|
Test that get_deployment_credentials_with_provider handles wildcard patterns.
|
|
|
|
When a model like openai/gpt-4o is requested and the config has openai/*,
|
|
the method should resolve the wildcard pattern and return credentials.
|
|
"""
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "openai/*",
|
|
"litellm_params": {
|
|
"model": "openai/*",
|
|
"api_key": "sk-wildcard-123",
|
|
"api_base": "https://api.openai.com/v1",
|
|
},
|
|
"model_info": {"id": "openai-wildcard-deployment"},
|
|
},
|
|
{
|
|
"model_name": "anthropic/*",
|
|
"litellm_params": {
|
|
"model": "anthropic/*",
|
|
"api_key": "sk-ant-wildcard-456",
|
|
},
|
|
"model_info": {"id": "anthropic-wildcard-deployment"},
|
|
},
|
|
]
|
|
)
|
|
|
|
# Test wildcard pattern matching for OpenAI
|
|
credentials = router.get_deployment_credentials_with_provider(model_id="openai/gpt-4o")
|
|
assert credentials is not None
|
|
assert credentials["api_key"] == "sk-wildcard-123"
|
|
assert credentials["custom_llm_provider"] == "openai"
|
|
assert credentials["api_base"] == "https://api.openai.com/v1"
|
|
|
|
# Test wildcard pattern matching for Anthropic
|
|
credentials2 = router.get_deployment_credentials_with_provider(model_id="anthropic/claude-3-opus")
|
|
assert credentials2 is not None
|
|
assert credentials2["api_key"] == "sk-ant-wildcard-456"
|
|
assert credentials2["custom_llm_provider"] == "anthropic"
|
|
|
|
# Test with non-matching model
|
|
credentials3 = router.get_deployment_credentials_with_provider(model_id="vertex_ai/gemini-pro")
|
|
assert credentials3 is None
|
|
|
|
|
|
def test_router_get_deployment_model_info():
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gemini/*",
|
|
"litellm_params": {"model": "gemini/*"},
|
|
"model_info": {"id": "1"},
|
|
}
|
|
]
|
|
)
|
|
model_info = router.get_deployment_model_info(
|
|
model_id="1", model_name="gemini/gemini-1.5-flash"
|
|
)
|
|
assert model_info is not None
|