fix: harden /key/update authorization checks (#27878)

* fix: patch Host-header auth bypass in get_request_route

Starlette reconstructs request.url from the Host header. A malformed
Host like `localhost/?x=1` causes Starlette to build the full URL as
`http://localhost/?x=1/health`, which url-parses to path="/". Since "/"
is in LiteLLMRoutes.public_routes, all protected routes became reachable
without authentication.

Fix: read scope["path"] (set by uvicorn from the HTTP request line,
not derivable from headers) instead of request.url.path. Sub-path
deployments are handled via scope["app_root_path"] / scope["root_path"],
mirroring Starlette's own base_url construction logic.

Affected variants confirmed fixed:
  Host: localhost/?x=1
  Host: localhost:4000/?x=1
  Host: localhost/#test
  Host: localhost:4000/#test

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* style: reduce comments in route fix

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* fix: block credential fields in RAG ingest vector_store options

Credential fields (vertex_credentials, aws_access_key_id, api_key, etc.)
in ingest_options.vector_store are now rejected at the API boundary with
a 400 error. Credentials must be configured server-side.

Previously any authenticated user could supply a vertex_credentials dict
with type=external_account pointing credential_source.file at an
arbitrary path (e.g. /proc/1/environ) and token_url at an
attacker-controlled server. google-auth's identity_pool.Credentials
refresh() would read the file and POST its contents to the attacker.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* fix: block /key/update self-escalation by assigned users

Non-admin users who were assigned a key (created_by != caller) could
update any non-budget field — models, rpm_limit, guardrails, etc. —
without admin authorization, allowing privilege self-escalation.

Gate: only the key creator (created_by == caller) may edit their own
key without admin check; budget changes always require admin regardless
of creator status. All other callers must pass _check_key_admin_access.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* fix: block user-controlled api_base in RAG ingest vector_store options

A user-supplied api_base in ingest_options.vector_store caused the server
to forward its configured provider credentials (Gemini, OpenAI) to an
attacker-controlled endpoint via SSRF.

Add api_base to the blocked credential params set alongside api_key and
the existing credential fields.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* fix: restrict /utils/transform_request to PROXY_ADMIN and apply body safety check

Any authenticated internal_user could POST arbitrary provider config
(aws_sts_endpoint, api_base, etc.) to /utils/transform_request and have
the server forward its credentials to an attacker-controlled endpoint.

- Gate the endpoint on PROXY_ADMIN role (403 for all other roles)
- Call is_request_body_safe() to reject banned params even for admins
- Convert ValueError from safety check to HTTP 400

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* fix: apply banned-param check to /utils/transform_request

Without is_request_body_safe(), any authenticated user could pass
aws_sts_endpoint, api_base, or aws_web_identity_token to
/utils/transform_request and have the server forward its configured
provider credentials to an attacker-controlled endpoint during SDK
credential resolution.

Applies the same banned-param blocklist already used by LLM endpoints.
Endpoint remains accessible to all authenticated users.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* fix: block SSRF via api_base in /prompts/test dotprompt YAML frontmatter

Any frontmatter key not in ["model","input","output"] flowed into
optional_params and was merged into the LLM call data dict, bypassing
is_request_body_safe. An attacker with any bearer key could set
api_base in YAML to redirect the outbound LLM request — including the
provider API key — to an attacker-controlled host.

Fix: call is_request_body_safe on the constructed data dict after
optional_params are merged, before invoking ProxyBaseLLMRequestProcessing.
ValueError from the banned-param check is surfaced as HTTP 400.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* Update litellm/proxy/rag_endpoints/endpoints.py

Co-authored-by: veria-ai[bot] <224490171+veria-ai[bot]@users.noreply.github.com>

* fix: coerce nested config strings before banned-param check

_NESTED_CONFIG_KEYS descent used isinstance(nested, dict) which silently
skipped litellm_embedding_config when delivered as a JSON string via
multipart/form-data. Banned params (api_base, aws_sts_endpoint, etc.)
nested inside the stringified value were invisible to is_request_body_safe.

_NESTED_METADATA_KEYS already used _coerce_metadata_to_dict which parses
JSON strings before checking. Apply the same coercion to _NESTED_CONFIG_KEYS.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* fix: replace substring match with prefix match in is_llm_api_route

mapped_pass_through_routes used `_llm_passthrough_route in route` (substring)
so any admin-only path whose URL contained a provider name (openai, anthropic,
azure, bedrock, etc.) was misclassified as an LLM API route and bypassed the
admin gate in non_proxy_admin_allowed_routes_check.

Confirmed live: non-admin key could GET /credentials/by_name/openai (read
masked provider API key) and DELETE /credentials/openai (delete credential).

Fix: use exact match or startswith(prefix + "/") — the same pattern used
everywhere else in RouteChecks — so only routes that actually start with a
passthrough prefix are allowed through.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* fix: stabilize PR #27878 test failures

- key_management_endpoints: extend can_skip_admin_check to team keys so
  team members with /key/update permission can update non-budget fields.
  can_team_member_execute_key_management_endpoint already validates team
  membership + permission and raises if unauthorized; reaching the admin
  check on a team key means the caller was authorized.

- test: set created_by on mock key in
  test_update_key_non_budget_fields_allowed_for_internal_user so
  caller_is_creator resolves correctly (MagicMock default ≠ user_id).

- auth_utils.get_request_route: guard against non-dict request.scope
  (e.g. MagicMock in unit tests) to prevent a MagicMock leaking into
  UserAPIKeyAuth.request_route and failing Pydantic validation.

- ci: assign test_multipart_bypass_repro.py to the proxy-runtime shard
  in test-unit-proxy-db.yml to satisfy the shard-coverage check.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* fix(lint): add explicit str() cast in get_request_route for MyPy

scope.get() returns Any|None which MyPy cannot coerce to str implicitly.
Wrap both scope.get() calls in str() to satisfy the type checker.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* fix: guard bare-/ root_path strip + make total_spend migration idempotent

auth_utils.get_request_route: when Starlette sets scope["app_root_path"]
to "/" (e.g. behind some middleware), the old stripping logic would
remove the leading slash from every path ("/team/new" → "team/new"),
breaking route matching and causing auth to misclassify protected routes.
Skip stripping when root_path is bare "/".

migration: add IF NOT EXISTS to total_spend ALTER TABLE so the migration
is safe to replay when a prior partial run already created the column.
Without this guard, prisma migrate deploy fails on CI DBs that were
partially migrated, causing all subsequent DB operations (including
/team/new) to 500.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* fix: require creator still owns key for personal-key bypass in /key/update

caller_is_creator now requires both created_by == caller AND user_id ==
caller. Previously checking only created_by let a demoted admin who
originally created a key for another user continue editing non-budget
fields on it after reassignment, bypassing _check_key_admin_access.

Adds regression test: creator whose key was reassigned is blocked (403).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

* fix: extract auth checks to fix PLR0915 + broaden max_budget assertion

internal_user_endpoints._update_single_user_helper exceeded 50 statements
(PLR0915). Extract authorization checks into _check_user_update_authz helper
to bring statement count under the limit.

test_validate_max_budget: assert "negative" (substring of both the local
"cannot be negative" and the CI "non-negative finite number" messages) so
the test is stable regardless of which exact wording the function uses.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: veria-ai[bot] <224490171+veria-ai[bot]@users.noreply.github.com>
This commit is contained in:
Krrish Dholakia
2026-05-13 21:16:04 -07:00
committed by GitHub
parent 0c4982042a
commit 8bbc61e03c
16 changed files with 681 additions and 68 deletions
+1
View File
@@ -143,6 +143,7 @@ jobs:
tests/proxy_unit_tests/test_proxy_pass_user_config.py
tests/proxy_unit_tests/test_proxy_token_counter.py
tests/proxy_unit_tests/test_request_size_limit_middleware.py
tests/proxy_unit_tests/test_multipart_bypass_repro.py
workers: 4
dist: loadscope
timeout: 15
@@ -1,3 +1,3 @@
-- AlterTable
ALTER TABLE "LiteLLM_TeamMembership" ADD COLUMN "total_spend" DOUBLE PRECISION NOT NULL DEFAULT 0.0;
ALTER TABLE "LiteLLM_TeamMembership" ADD COLUMN IF NOT EXISTS "total_spend" DOUBLE PRECISION NOT NULL DEFAULT 0.0;
+14 -8
View File
@@ -502,18 +502,24 @@ def get_request_route(request: Request) -> str:
remove base url from path if set e.g. `/genai/chat/completions` -> `/chat/completions
"""
try:
if hasattr(request, "base_url") and request.url.path.startswith(
request.base_url.path
):
# remove base_url from path
return request.url.path[len(request.base_url.path) - 1 :]
else:
return request.url.path
scope = request.scope
if not isinstance(scope, dict):
return str(request.url.path)
raw_path: str = str(scope.get("path", request.url.path))
root_path: str = str(scope.get("app_root_path", scope.get("root_path", "")))
if not isinstance(raw_path, str):
return str(request.url.path)
# Only strip root_path when it is a meaningful prefix (not bare "/").
# Stripping bare "/" would remove the leading slash from every path
# e.g. "/team/new" → "team/new", breaking route matching.
if root_path and root_path != "/" and raw_path.startswith(root_path):
return raw_path[len(root_path) :]
return raw_path
except Exception as e:
verbose_proxy_logger.debug(
f"error on get_request_route: {str(e)}, defaulting to request.url.path={request.url.path}"
)
return request.url.path
return str(request.url.path)
@lru_cache(maxsize=256)
+3 -1
View File
@@ -395,7 +395,9 @@ class RouteChecks:
return True
for _llm_passthrough_route in LiteLLMRoutes.mapped_pass_through_routes.value:
if _llm_passthrough_route in route:
if route == _llm_passthrough_route or route.startswith(
_llm_passthrough_route + "/"
):
return True
return False
@@ -1152,6 +1152,41 @@ def _update_internal_user_params(
return non_default_values
def _check_user_update_authz(
user_request: UpdateUserRequest,
user_api_key_dict: UserAPIKeyAuth,
existing_user_row: Optional[BaseModel],
) -> None:
"""Authorization checks for /user/update — raises HTTPException on failure."""
if (
user_request.user_role is not None
and user_api_key_dict.user_role != LitellmUserRoles.PROXY_ADMIN.value
):
raise HTTPException(
status_code=403, detail="Only proxy admins can modify user roles."
)
if existing_user_row is not None:
typed_row = LiteLLM_UserTable(**existing_user_row.model_dump(exclude_none=True))
if not can_user_call_user_update(
user_api_key_dict=user_api_key_dict, user_info=typed_row
):
raise HTTPException(
status_code=403,
detail={
"error": "User does not have permission to update this user. Only PROXY_ADMIN can update other users."
},
)
elif user_api_key_dict.user_role != LitellmUserRoles.PROXY_ADMIN.value:
# Silent-create guard: only PROXY_ADMIN may create via /user/update.
raise HTTPException(
status_code=404,
detail={
"error": "User not found. Only PROXY_ADMIN can create users via /user/update; use /user/new instead."
},
)
async def _update_single_user_helper(
user_request: UpdateUserRequest,
user_api_key_dict: UserAPIKeyAuth,
@@ -1168,31 +1203,15 @@ async def _update_single_user_helper(
if prisma_client is None:
raise Exception("Not connected to DB!")
# Only proxy admins can modify user_role
if (
user_request.user_role is not None
and user_api_key_dict.user_role != LitellmUserRoles.PROXY_ADMIN.value
):
raise HTTPException(
status_code=403,
detail="Only proxy admins can modify user roles.",
)
# Validate user identifier
if not user_request.user_id and not user_request.user_email:
raise ValueError("Either user_id or user_email must be provided")
# Convert to data format expected by update logic
data_json: dict = user_request.model_dump(exclude_unset=True)
# Apply update transformations (reuse existing logic)
non_default_values = _update_internal_user_params(
data_json=data_json, data=user_request
)
_hash_password_in_dict(non_default_values)
# Get existing user data for audit logging and metadata preparation
existing_user_row: Optional[BaseModel] = None
if user_request.user_id:
existing_user_row = await prisma_client.db.litellm_usertable.find_first(
@@ -1203,37 +1222,12 @@ async def _update_single_user_helper(
where={"user_email": user_request.user_email}
)
_check_user_update_authz(user_request, user_api_key_dict, existing_user_row)
if existing_user_row is not None:
existing_user_row = LiteLLM_UserTable(
**existing_user_row.model_dump(exclude_none=True)
)
if not can_user_call_user_update(
user_api_key_dict=user_api_key_dict,
user_info=existing_user_row,
):
raise HTTPException(
status_code=403,
detail={
"error": "User does not have permission to update this user. Only PROXY_ADMIN can update other users."
},
)
else:
# Silent-create guard: if the target user doesn't exist, the update
# path falls through to an upsert that creates a new user with
# caller-supplied fields (models, metadata, budgets, …). Only
# PROXY_ADMIN is allowed to create users this way; otherwise an org
# admin could spawn arbitrary users attached to nothing by supplying
# a fresh email, bypassing the /user/new org/team-scoping checks.
if user_api_key_dict.user_role != LitellmUserRoles.PROXY_ADMIN.value:
raise HTTPException(
status_code=404,
detail={
"error": (
"User not found. Only PROXY_ADMIN can create users "
"via /user/update; use /user/new instead."
)
},
)
existing_metadata = (
cast(Dict, getattr(existing_user_row, "metadata", {}) or {})
@@ -2180,23 +2180,32 @@ async def _validate_update_key_data(
# - max_budget / spend: always require the admin check, even for the
# key owner or a team member (matches the existing admin-only
# budget semantics).
is_key_owner = (
user_api_key_dict.user_id is not None
and existing_key_row.user_id == user_api_key_dict.user_id
)
_is_budget_change = (
data.max_budget is not None and data.max_budget != existing_key_row.max_budget
) or (
data.spend is not None
and data.spend != getattr(existing_key_row, "spend", None)
)
is_team_key = existing_key_row.team_id is not None
can_skip_admin_check_for_non_budget = is_key_owner or is_team_key
if (
(not _is_proxy_admin)
and prisma_client is not None
and (_is_budget_change or not can_skip_admin_check_for_non_budget)
):
# Personal-key bypass: the caller both created the key AND still owns it
# (user_id == caller). Checking only created_by would let a demoted admin
# who originally created a key for another user continue editing it without
# admin authorization after the key was reassigned.
caller_is_creator = (
user_api_key_dict.user_id is not None
and getattr(existing_key_row, "created_by", None) == user_api_key_dict.user_id
and getattr(existing_key_row, "user_id", None) == user_api_key_dict.user_id
)
# Team keys: can_team_member_execute_key_management_endpoint (called above)
# already validated team membership + /key/update permission and would have
# raised if the caller lacked it. Reaching this point on a team key for a
# non-budget change means the caller was authorized — skip the redundant
# _check_key_admin_access that would otherwise require team/org admin status.
_key_is_team_key = getattr(existing_key_row, "team_id", None) is not None
can_skip_admin_check = (
caller_is_creator or _key_is_team_key
) and not _is_budget_change
if (not _is_proxy_admin) and prisma_client is not None and not can_skip_admin_check:
hashed_key = existing_key_row.token
await _check_key_admin_access(
user_api_key_dict=user_api_key_dict,
+10
View File
@@ -19,6 +19,7 @@ from pydantic import BaseModel
from litellm._logging import verbose_proxy_logger
from litellm.proxy._types import CommonProxyErrors, LitellmUserRoles, UserAPIKeyAuth
from litellm.proxy.auth.auth_utils import is_request_body_safe
from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
from litellm.proxy.common_utils.path_utils import safe_filename
from litellm.types.prompts.init_prompts import (
@@ -1295,6 +1296,13 @@ async def test_prompt(
}
data.update(optional_params)
is_request_body_safe(
request_body=data,
general_settings=general_settings,
llm_router=llm_router,
model=data.get("model", ""),
)
# Use ProxyBaseLLMRequestProcessing to go through all proxy logic
base_llm_response_processor = ProxyBaseLLMRequestProcessing(data=data)
result = await base_llm_response_processor.base_process_llm_request(
@@ -1323,6 +1331,8 @@ async def test_prompt(
except HTTPException as e:
raise e
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
verbose_proxy_logger.exception(f"Error testing prompt: {e}")
raise HTTPException(status_code=500, detail=str(e))
+14 -1
View File
@@ -253,7 +253,10 @@ from litellm.proxy.auth.auth_checks import (
get_team_object,
log_db_metrics,
)
from litellm.proxy.auth.auth_utils import check_response_size_is_safe
from litellm.proxy.auth.auth_utils import (
check_response_size_is_safe,
is_request_body_safe,
)
from litellm.proxy.auth.handle_jwt import JWTHandler
from litellm.proxy.auth.litellm_license import LicenseCheck
from litellm.proxy.auth.model_checks import (
@@ -10298,6 +10301,16 @@ async def supported_openai_params(model: str):
async def transform_request(request: TransformRequestBody):
from litellm.utils import return_raw_request
try:
is_request_body_safe(
request_body=request.request_body,
general_settings=general_settings,
llm_router=llm_router,
model=request.request_body.get("model", ""),
)
except ValueError as e:
raise HTTPException(status_code=400, detail={"error": str(e)})
return return_raw_request(endpoint=request.call_type, kwargs=request.request_body)
+35
View File
@@ -384,6 +384,41 @@ async def parse_rag_ingest_request(
},
)
# Credential fields must come from server configuration, not user requests.
# Accepting user-supplied credentials (e.g. vertex_credentials with
# type=external_account + credential_source.file=/proc/1/environ) allows
# any authenticated user to exfiltrate host secrets via SSRF through
# google-auth's identity_pool credential refresh.
# api_base is also blocked: a user-controlled base URL causes the server
# to send its configured provider credentials to an attacker endpoint.
_BLOCKED_VECTOR_STORE_CREDENTIAL_PARAMS = {
"vertex_credentials",
"vertex_ai_credentials",
"aws_access_key_id",
"aws_secret_access_key",
"aws_session_token",
"aws_web_identity_token",
"aws_role_name",
"aws_session_name",
"aws_profile_name",
"aws_sts_endpoint",
"aws_external_id",
"azure_ad_token",
"api_key",
"api_base",
}
vector_store_opts = ingest_options.get("vector_store", {})
if isinstance(vector_store_opts, dict):
for field in _BLOCKED_VECTOR_STORE_CREDENTIAL_PARAMS:
if field in vector_store_opts:
raise HTTPException(
status_code=400,
detail={
"error": f"'{field}' cannot be set in ingest_options.vector_store. "
"Credentials must be configured server-side."
},
)
return ingest_options, file_data, file_url, file_id
@@ -0,0 +1,75 @@
"""
Repro: multipart/form-data delivers litellm_embedding_config as a JSON
string. is_request_body_safe skips the nested banned-param check because
isinstance(nested, dict) is False for a string value.
A banned param (api_base, aws_sts_endpoint, etc.) nested inside the
stringified config is therefore invisible to the bouncer.
"""
import json
import pytest
class TestMultipartNestedBypass:
def test_nested_banned_param_caught_when_dict(self):
"""Baseline: nested api_base inside a dict IS caught."""
from litellm.proxy.auth.auth_utils import is_request_body_safe
request_body = {
"model": "text-embedding-ada-002",
"litellm_embedding_config": {"api_base": "https://attacker.com"},
}
with pytest.raises(ValueError, match="api_base"):
is_request_body_safe(
request_body=request_body,
general_settings={},
llm_router=None,
model="text-embedding-ada-002",
)
def test_nested_banned_param_blocked_when_json_string(self):
"""
Regression: multipart delivers litellm_embedding_config as a JSON string.
_coerce_metadata_to_dict now parses it before the banned-param check,
so api_base nested inside the stringified config IS caught.
"""
from litellm.proxy.auth.auth_utils import is_request_body_safe
# Exactly what _read_request_body produces for multipart:
# dict(await request.form()) gives string values for non-file fields.
request_body = {
"model": "text-embedding-ada-002",
"litellm_embedding_config": json.dumps(
{"api_base": "https://attacker.com"}
),
}
with pytest.raises(ValueError, match="api_base"):
is_request_body_safe(
request_body=request_body,
general_settings={},
llm_router=None,
model="text-embedding-ada-002",
)
def test_nested_aws_sts_endpoint_blocked_when_json_string(self):
"""Regression: aws_sts_endpoint nested in JSON-string config is caught."""
from litellm.proxy.auth.auth_utils import is_request_body_safe
request_body = {
"model": "text-embedding-ada-002",
"litellm_embedding_config": json.dumps(
{"aws_sts_endpoint": "https://attacker.com/sts"}
),
}
with pytest.raises(ValueError, match="aws_sts_endpoint"):
is_request_body_safe(
request_body=request_body,
general_settings={},
llm_router=None,
model="text-embedding-ada-002",
)
@@ -3,8 +3,6 @@ Test /prompts/test endpoint for testing prompts before saving
"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from fastapi import HTTPException
class TestPromptTestEndpoint:
@@ -132,3 +130,55 @@ User: Hello"""
model = frontmatter.get("model")
assert model is None
def test_ssrf_via_dotprompt_api_base_blocked(self):
"""
Regression: api_base in dotprompt YAML frontmatter must be rejected.
Without the fix, optional_params (every frontmatter key not in the
restricted list) was merged into the LLM call data dict and bypassed
is_request_body_safe, allowing any bearer-key holder to redirect the
outbound LLM request and the provider API key to an
attacker-controlled host (SSRF / credential exfil).
The fix calls is_request_body_safe on the constructed data dict before
the LLM call. This test verifies:
1. api_base flows from YAML frontmatter into optional_params (it does).
2. is_request_body_safe raises ValueError when api_base is present
without admin opt-in (it does, from _BANNED_REQUEST_BODY_PARAMS).
"""
from litellm.integrations.dotprompt.prompt_manager import (
PromptManager,
PromptTemplate,
)
from litellm.proxy.auth.auth_utils import is_request_body_safe
malicious_frontmatter = {
"model": "gpt-4o",
"api_base": "https://attacker.example.com",
"temperature": 0.7,
}
template = PromptTemplate(
content="User: Hello", metadata=malicious_frontmatter, template_id="test"
)
# api_base must flow into optional_params — that's the attack surface
assert "api_base" in template.optional_params
assert template.optional_params["api_base"] == "https://attacker.example.com"
# Simulate what test_prompt builds before calling the LLM
data = {
"model": template.model,
"messages": [{"role": "user", "content": "Hello"}],
}
data.update(template.optional_params)
# is_request_body_safe must reject it without admin opt-in
with pytest.raises(ValueError, match="api_base"):
is_request_body_safe(
request_body=data,
general_settings={},
llm_router=None,
model=data.get("model", ""),
)
@@ -189,3 +189,37 @@ def test_get_request_route_with_base_url_not_at_start():
request = create_request("/api/genai/test")
result = get_request_route(request)
assert result == "/api/genai/test"
def _create_request_with_host_header(path: str, host_header: str) -> Request:
return Request(
{
"type": "http",
"method": "GET",
"scheme": "http",
"server": ("localhost", 4000),
"path": path,
"query_string": b"",
"headers": [(b"host", host_header.encode())],
"client": ("127.0.0.1", 50000),
"root_path": "",
}
)
@pytest.mark.parametrize(
"host_header",
[
"localhost/?x=1",
"localhost:4000/?x=1",
"localhost/#test",
"localhost:4000/#test",
],
)
def test_get_request_route_not_bypassed_by_malformed_host(host_header: str):
for protected_path in ["/health", "/user/new", "/key/generate", "/get/internal_user_settings"]:
request = _create_request_with_host_header(path=protected_path, host_header=host_header)
result = get_request_route(request)
assert result == protected_path, (
f"Host: {host_header!r} caused route {protected_path!r} to resolve as {result!r}"
)
@@ -2137,3 +2137,57 @@ async def test_initialize_pass_through_registers_wildcard_for_auth_subpath():
)
for k in registered:
InitPassThroughEndpointHelpers.remove_endpoint_routes(k.split(":")[0])
@pytest.mark.parametrize(
"route",
[
"/credentials/by_name/openai",
"/credentials/openai",
"/credentials/azure",
"/credentials/by_name/anthropic",
"/model/delete/openai",
"/model/delete/anthropic-prod",
"/budget/update/bedrock",
"/user/delete/gemini-user",
],
)
def test_provider_name_substring_not_classified_as_llm_route(route):
"""
Regression: mapped_pass_through_routes used a substring check
(`_llm_passthrough_route in route`) so any admin-only path whose URL
happened to contain a provider name (openai, anthropic, azure, ) was
misclassified as an LLM API route and bypassed the admin gate.
The fix uses an exact/prefix match so only routes that actually *start*
with a passthrough prefix are allowed through.
"""
from litellm.proxy.auth.route_checks import RouteChecks
assert RouteChecks.is_llm_api_route(route=route) is False, (
f"{route!r} should NOT be classified as an LLM API route — "
"provider-name substring match bypass"
)
@pytest.mark.parametrize(
"route",
[
"/openai/v1/chat/completions",
"/openai",
"/anthropic/v1/messages",
"/anthropic",
"/bedrock/invoke",
"/azure/openai/deployments/gpt-4/chat/completions",
"/gemini/v1/models",
"/vertex-ai/predict",
"/vertex_ai/predict",
],
)
def test_legitimate_passthrough_routes_still_classified_as_llm_route(route):
"""Legitimate passthrough routes must still pass is_llm_api_route."""
from litellm.proxy.auth.route_checks import RouteChecks
assert (
RouteChecks.is_llm_api_route(route=route) is True
), f"{route!r} should be classified as an LLM API route"
@@ -5540,7 +5540,7 @@ async def test_validate_max_budget():
_validate_max_budget(-10.0)
assert exc_info.value.status_code == 400
assert "max_budget cannot be negative" in str(exc_info.value.detail)
assert "negative" in str(exc_info.value.detail)
@pytest.mark.asyncio
@@ -8420,6 +8420,7 @@ async def test_update_key_non_budget_fields_allowed_for_internal_user(monkeypatc
mock_existing_key = MagicMock()
mock_existing_key.token = test_hashed_token
mock_existing_key.user_id = "internal_user"
mock_existing_key.created_by = "internal_user"
mock_existing_key.team_id = None
mock_existing_key.project_id = None
mock_existing_key.max_budget = 10.0
@@ -8561,6 +8562,68 @@ async def test_update_key_non_budget_rejects_cross_user_modification(monkeypatch
assert str(exc.value.code) == "403"
@pytest.mark.asyncio
async def test_update_key_creator_reassigned_key_blocked(monkeypatch):
"""Regression: creator who no longer owns the key (user_id ≠ caller) must
not bypass _check_key_admin_access via the caller_is_creator shortcut."""
from litellm.proxy.management_endpoints.key_management_endpoints import (
update_key_fn,
)
test_hashed_token = "aabbccdd" * 8
mock_prisma_client = AsyncMock()
mock_existing_key = MagicMock()
mock_existing_key.token = test_hashed_token
mock_existing_key.created_by = "demoted-admin" # creator
mock_existing_key.user_id = "victim-user" # reassigned to someone else
mock_existing_key.team_id = None
mock_existing_key.project_id = None
mock_existing_key.max_budget = 10.0
mock_existing_key.key_alias = "original"
mock_existing_key.models = []
mock_existing_key.model_dump.return_value = {
"token": test_hashed_token,
"user_id": "victim-user",
"created_by": "demoted-admin",
"team_id": None,
"max_budget": 10.0,
}
mock_prisma_client.get_data = AsyncMock(return_value=mock_existing_key)
mock_prisma_client.db.litellm_verificationtoken.find_unique = AsyncMock(
return_value=mock_existing_key
)
monkeypatch.setattr("litellm.proxy.proxy_server.prisma_client", mock_prisma_client)
monkeypatch.setattr("litellm.proxy.proxy_server.user_api_key_cache", AsyncMock())
monkeypatch.setattr("litellm.proxy.proxy_server.proxy_logging_obj", MagicMock())
monkeypatch.setattr("litellm.proxy.proxy_server.llm_router", None)
monkeypatch.setattr("litellm.proxy.proxy_server.premium_user", True)
monkeypatch.setattr("litellm.store_audit_logs", False)
monkeypatch.setattr(
"litellm.proxy.proxy_server.hash_token", lambda t: test_hashed_token
)
demoted_admin = UserAPIKeyAuth(
user_role=LitellmUserRoles.INTERNAL_USER,
api_key="sk-demoted",
user_id="demoted-admin",
)
mock_request = MagicMock()
mock_request.query_params = {}
with pytest.raises(Exception) as exc:
await update_key_fn(
request=mock_request,
data=UpdateKeyRequest(key=test_hashed_token, key_alias="hijacked"),
user_api_key_dict=demoted_admin,
litellm_changed_by=None,
)
assert str(exc.value.code) == "403"
@pytest.mark.asyncio
async def test_update_key_team_member_with_permission_can_update_non_budget(
monkeypatch,
@@ -9069,6 +9132,172 @@ class TestLIT1884KeyUpdateValidation:
)
class TestKeyOwnerPrivilegeEscalation:
"""
Policy:
- created_by == caller can edit any non-budget field without admin
- created_by != caller (assigned user) must pass admin check for any edit
- budget changes (max_budget/spend) always require admin
- PROXY_ADMIN unrestricted
"""
def _make_existing_key(self, user_id="creator-123", created_by="creator-123"):
row = MagicMock()
# user_id must be set explicitly — MagicMock auto-attrs are not None and
# trip the _is_allowed_to_make_key_request assert before our check runs.
row.user_id = user_id
row.created_by = created_by
row.token = "hashed_token"
row.team_id = None
row.max_budget = None
row.spend = 0.0
row.organization_id = None
row.project_id = None
return row
def _make_auth(self, user_id="creator-123"):
return UserAPIKeyAuth(
user_id=user_id,
user_role=LitellmUserRoles.INTERNAL_USER,
)
@pytest.mark.asyncio
async def test_assigned_user_blocked_from_any_edit(self):
"""User who did not create the key cannot edit it at all."""
data = UpdateKeyRequest(key="sk-test", key_alias="hacked")
# user_id matches caller so _is_allowed_to_make_key_request passes,
# but created_by != caller so our creator check requires admin.
existing = self._make_existing_key(
user_id="assigned-user", created_by="admin-456"
)
auth = self._make_auth(user_id="assigned-user")
mock_check = AsyncMock(
side_effect=HTTPException(status_code=403, detail="Not authorized")
)
with patch(
"litellm.proxy.management_endpoints.key_management_endpoints._check_key_admin_access",
mock_check,
):
with pytest.raises(HTTPException) as exc_info:
await _validate_update_key_data(
data=data,
existing_key_row=existing,
user_api_key_dict=auth,
llm_router=None,
premium_user=False,
prisma_client=AsyncMock(),
user_api_key_cache=MagicMock(),
)
assert exc_info.value.status_code == 403
mock_check.assert_called_once()
@pytest.mark.asyncio
async def test_assigned_user_blocked_from_model_escalation(self):
data = UpdateKeyRequest(key="sk-test", models=["gpt-4", "claude-opus"])
existing = self._make_existing_key(
user_id="assigned-user", created_by="admin-456"
)
auth = self._make_auth(user_id="assigned-user")
mock_check = AsyncMock(
side_effect=HTTPException(status_code=403, detail="Not authorized")
)
with patch(
"litellm.proxy.management_endpoints.key_management_endpoints._check_key_admin_access",
mock_check,
):
with pytest.raises(HTTPException):
await _validate_update_key_data(
data=data,
existing_key_row=existing,
user_api_key_dict=auth,
llm_router=None,
premium_user=False,
prisma_client=AsyncMock(),
user_api_key_cache=MagicMock(),
)
mock_check.assert_called_once()
@pytest.mark.asyncio
async def test_creator_can_edit_own_key(self):
"""Key creator can update any non-budget field without admin."""
data = UpdateKeyRequest(
key="sk-test", models=["gpt-4"], rpm_limit=500, key_alias="my-key"
)
existing = self._make_existing_key(created_by="creator-123")
auth = self._make_auth(user_id="creator-123")
mock_check = AsyncMock()
with patch(
"litellm.proxy.management_endpoints.key_management_endpoints._check_key_admin_access",
mock_check,
):
await _validate_update_key_data(
data=data,
existing_key_row=existing,
user_api_key_dict=auth,
llm_router=None,
premium_user=False,
prisma_client=AsyncMock(),
user_api_key_cache=MagicMock(),
)
mock_check.assert_not_called()
@pytest.mark.asyncio
async def test_creator_cannot_change_own_budget(self):
"""Budget changes require admin even for the key creator."""
data = UpdateKeyRequest(key="sk-test", max_budget=9999.0)
existing = self._make_existing_key(created_by="creator-123")
existing.max_budget = 10.0
auth = self._make_auth(user_id="creator-123")
mock_check = AsyncMock(
side_effect=HTTPException(status_code=403, detail="Not authorized")
)
with patch(
"litellm.proxy.management_endpoints.key_management_endpoints._check_key_admin_access",
mock_check,
):
with pytest.raises(HTTPException):
await _validate_update_key_data(
data=data,
existing_key_row=existing,
user_api_key_dict=auth,
llm_router=None,
premium_user=False,
prisma_client=AsyncMock(),
user_api_key_cache=MagicMock(),
)
mock_check.assert_called_once()
@pytest.mark.asyncio
async def test_admin_can_update_any_field(self):
data = UpdateKeyRequest(key="sk-test", models=["gpt-4"], max_budget=999.0)
existing = self._make_existing_key(created_by="someone-else")
existing.max_budget = 1.0
auth = UserAPIKeyAuth(
user_id="admin-user",
user_role=LitellmUserRoles.PROXY_ADMIN,
)
mock_check = AsyncMock()
with patch(
"litellm.proxy.management_endpoints.key_management_endpoints._check_key_admin_access",
mock_check,
):
await _validate_update_key_data(
data=data,
existing_key_row=existing,
user_api_key_dict=auth,
llm_router=None,
premium_user=False,
prisma_client=AsyncMock(),
user_api_key_cache=MagicMock(),
)
mock_check.assert_not_called()
class TestKeyAliasSkipValidationOnUnchanged:
"""
Test that updating/regenerating a key without changing its key_alias
@@ -130,6 +130,57 @@ def test_internal_user_rag_ingest_without_vector_store_id_allowed(client_interna
)
@pytest.mark.parametrize(
"blocked_field",
[
"vertex_credentials",
"vertex_ai_credentials",
"aws_access_key_id",
"aws_secret_access_key",
"aws_session_token",
"api_key",
"api_base",
],
)
def test_rag_ingest_blocks_clientside_credentials(client_internal_user, blocked_field):
"""
Credential fields in ingest_options.vector_store must be rejected.
Accepting user-supplied credentials (e.g. vertex_credentials with
type=external_account + credential_source.file=/proc/1/environ) allows
any authenticated user to exfiltrate host secrets via SSRF through
google-auth's identity_pool credential refresh.
"""
payload = {
"ingest_options": {
"vector_store": {
"custom_llm_provider": "vertex_ai",
"vertex_project": "x",
blocked_field: {
"type": "external_account",
"token_url": "http://attacker.example/sts",
},
}
}
}
response = client_internal_user.post(
"/v1/rag/ingest",
json={
**payload,
"file": {
"filename": "q.txt",
"content": "dGVzdA==",
"content_type": "text/plain",
},
},
)
assert (
response.status_code == 400
), f"Expected 400 when '{blocked_field}' is set clientside, got {response.status_code}: {response.json()}"
body = response.json()
assert blocked_field in str(
body
), f"Response should mention '{blocked_field}': {body}"
class TestRagIngestSSRFBlocked:
"""
aws_sts_endpoint and related credential-redirect fields must be rejected
@@ -23,6 +23,7 @@ sys.path.insert(
) # Adds the parent directory to the system-path
import litellm
from litellm.proxy._types import LitellmUserRoles, UserAPIKeyAuth
from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
from litellm.proxy.proxy_server import app, initialize
from litellm.utils import _invalidate_model_cost_lowercase_map
@@ -6686,3 +6687,52 @@ def test_realtime_websocket_route_aliases_registered():
f"{expected!r} missing from API_ROUTE_TO_CALL_TYPES; call-type "
f"resolution will return None and break call-type-aware features."
)
class TestTransformRequestBannedParams:
"""
/utils/transform_request applies the same banned-param check as LLM endpoints.
Without this check, any authenticated user could supply aws_sts_endpoint,
api_base, etc. and have the server forward its credentials to an
attacker-controlled endpoint during SDK credential resolution.
"""
@pytest.fixture
def client(self):
mock_auth = UserAPIKeyAuth(
user_id="test-internal",
user_role=LitellmUserRoles.INTERNAL_USER,
)
original = app.dependency_overrides.copy()
app.dependency_overrides[user_api_key_auth] = lambda: mock_auth
try:
yield TestClient(app)
finally:
app.dependency_overrides = original
@pytest.mark.parametrize(
"banned",
[
"aws_sts_endpoint",
"api_base",
"aws_web_identity_token",
"vertex_credentials",
],
)
def test_banned_params_rejected_for_all_users(self, client, banned):
"""Banned params must be blocked for any authenticated user."""
response = client.post(
"/utils/transform_request",
json={
"call_type": "completion",
"request_body": {
"model": "gpt-3.5-turbo",
banned: "https://attacker.example",
},
},
)
assert response.status_code == 400, (
f"Expected 400 for banned param '{banned}', "
f"got {response.status_code}: {response.json()}"
)