mirror of
https://github.com/tiennm99/litellm.git
synced 2026-06-17 16:48:54 +00:00
fix: harden /key/update authorization checks (#27878)
* fix: patch Host-header auth bypass in get_request_route Starlette reconstructs request.url from the Host header. A malformed Host like `localhost/?x=1` causes Starlette to build the full URL as `http://localhost/?x=1/health`, which url-parses to path="/". Since "/" is in LiteLLMRoutes.public_routes, all protected routes became reachable without authentication. Fix: read scope["path"] (set by uvicorn from the HTTP request line, not derivable from headers) instead of request.url.path. Sub-path deployments are handled via scope["app_root_path"] / scope["root_path"], mirroring Starlette's own base_url construction logic. Affected variants confirmed fixed: Host: localhost/?x=1 Host: localhost:4000/?x=1 Host: localhost/#test Host: localhost:4000/#test Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * style: reduce comments in route fix Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix: block credential fields in RAG ingest vector_store options Credential fields (vertex_credentials, aws_access_key_id, api_key, etc.) in ingest_options.vector_store are now rejected at the API boundary with a 400 error. Credentials must be configured server-side. Previously any authenticated user could supply a vertex_credentials dict with type=external_account pointing credential_source.file at an arbitrary path (e.g. /proc/1/environ) and token_url at an attacker-controlled server. google-auth's identity_pool.Credentials refresh() would read the file and POST its contents to the attacker. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix: block /key/update self-escalation by assigned users Non-admin users who were assigned a key (created_by != caller) could update any non-budget field — models, rpm_limit, guardrails, etc. — without admin authorization, allowing privilege self-escalation. Gate: only the key creator (created_by == caller) may edit their own key without admin check; budget changes always require admin regardless of creator status. All other callers must pass _check_key_admin_access. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix: block user-controlled api_base in RAG ingest vector_store options A user-supplied api_base in ingest_options.vector_store caused the server to forward its configured provider credentials (Gemini, OpenAI) to an attacker-controlled endpoint via SSRF. Add api_base to the blocked credential params set alongside api_key and the existing credential fields. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix: restrict /utils/transform_request to PROXY_ADMIN and apply body safety check Any authenticated internal_user could POST arbitrary provider config (aws_sts_endpoint, api_base, etc.) to /utils/transform_request and have the server forward its credentials to an attacker-controlled endpoint. - Gate the endpoint on PROXY_ADMIN role (403 for all other roles) - Call is_request_body_safe() to reject banned params even for admins - Convert ValueError from safety check to HTTP 400 Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix: apply banned-param check to /utils/transform_request Without is_request_body_safe(), any authenticated user could pass aws_sts_endpoint, api_base, or aws_web_identity_token to /utils/transform_request and have the server forward its configured provider credentials to an attacker-controlled endpoint during SDK credential resolution. Applies the same banned-param blocklist already used by LLM endpoints. Endpoint remains accessible to all authenticated users. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix: block SSRF via api_base in /prompts/test dotprompt YAML frontmatter Any frontmatter key not in ["model","input","output"] flowed into optional_params and was merged into the LLM call data dict, bypassing is_request_body_safe. An attacker with any bearer key could set api_base in YAML to redirect the outbound LLM request — including the provider API key — to an attacker-controlled host. Fix: call is_request_body_safe on the constructed data dict after optional_params are merged, before invoking ProxyBaseLLMRequestProcessing. ValueError from the banned-param check is surfaced as HTTP 400. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * Update litellm/proxy/rag_endpoints/endpoints.py Co-authored-by: veria-ai[bot] <224490171+veria-ai[bot]@users.noreply.github.com> * fix: coerce nested config strings before banned-param check _NESTED_CONFIG_KEYS descent used isinstance(nested, dict) which silently skipped litellm_embedding_config when delivered as a JSON string via multipart/form-data. Banned params (api_base, aws_sts_endpoint, etc.) nested inside the stringified value were invisible to is_request_body_safe. _NESTED_METADATA_KEYS already used _coerce_metadata_to_dict which parses JSON strings before checking. Apply the same coercion to _NESTED_CONFIG_KEYS. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix: replace substring match with prefix match in is_llm_api_route mapped_pass_through_routes used `_llm_passthrough_route in route` (substring) so any admin-only path whose URL contained a provider name (openai, anthropic, azure, bedrock, etc.) was misclassified as an LLM API route and bypassed the admin gate in non_proxy_admin_allowed_routes_check. Confirmed live: non-admin key could GET /credentials/by_name/openai (read masked provider API key) and DELETE /credentials/openai (delete credential). Fix: use exact match or startswith(prefix + "/") — the same pattern used everywhere else in RouteChecks — so only routes that actually start with a passthrough prefix are allowed through. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix: stabilize PR #27878 test failures - key_management_endpoints: extend can_skip_admin_check to team keys so team members with /key/update permission can update non-budget fields. can_team_member_execute_key_management_endpoint already validates team membership + permission and raises if unauthorized; reaching the admin check on a team key means the caller was authorized. - test: set created_by on mock key in test_update_key_non_budget_fields_allowed_for_internal_user so caller_is_creator resolves correctly (MagicMock default ≠ user_id). - auth_utils.get_request_route: guard against non-dict request.scope (e.g. MagicMock in unit tests) to prevent a MagicMock leaking into UserAPIKeyAuth.request_route and failing Pydantic validation. - ci: assign test_multipart_bypass_repro.py to the proxy-runtime shard in test-unit-proxy-db.yml to satisfy the shard-coverage check. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix(lint): add explicit str() cast in get_request_route for MyPy scope.get() returns Any|None which MyPy cannot coerce to str implicitly. Wrap both scope.get() calls in str() to satisfy the type checker. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix: guard bare-/ root_path strip + make total_spend migration idempotent auth_utils.get_request_route: when Starlette sets scope["app_root_path"] to "/" (e.g. behind some middleware), the old stripping logic would remove the leading slash from every path ("/team/new" → "team/new"), breaking route matching and causing auth to misclassify protected routes. Skip stripping when root_path is bare "/". migration: add IF NOT EXISTS to total_spend ALTER TABLE so the migration is safe to replay when a prior partial run already created the column. Without this guard, prisma migrate deploy fails on CI DBs that were partially migrated, causing all subsequent DB operations (including /team/new) to 500. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix: require creator still owns key for personal-key bypass in /key/update caller_is_creator now requires both created_by == caller AND user_id == caller. Previously checking only created_by let a demoted admin who originally created a key for another user continue editing non-budget fields on it after reassignment, bypassing _check_key_admin_access. Adds regression test: creator whose key was reassigned is blocked (403). Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> * fix: extract auth checks to fix PLR0915 + broaden max_budget assertion internal_user_endpoints._update_single_user_helper exceeded 50 statements (PLR0915). Extract authorization checks into _check_user_update_authz helper to bring statement count under the limit. test_validate_max_budget: assert "negative" (substring of both the local "cannot be negative" and the CI "non-negative finite number" messages) so the test is stable regardless of which exact wording the function uses. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: veria-ai[bot] <224490171+veria-ai[bot]@users.noreply.github.com>
This commit is contained in:
@@ -143,6 +143,7 @@ jobs:
|
||||
tests/proxy_unit_tests/test_proxy_pass_user_config.py
|
||||
tests/proxy_unit_tests/test_proxy_token_counter.py
|
||||
tests/proxy_unit_tests/test_request_size_limit_middleware.py
|
||||
tests/proxy_unit_tests/test_multipart_bypass_repro.py
|
||||
workers: 4
|
||||
dist: loadscope
|
||||
timeout: 15
|
||||
|
||||
+1
-1
@@ -1,3 +1,3 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE "LiteLLM_TeamMembership" ADD COLUMN "total_spend" DOUBLE PRECISION NOT NULL DEFAULT 0.0;
|
||||
ALTER TABLE "LiteLLM_TeamMembership" ADD COLUMN IF NOT EXISTS "total_spend" DOUBLE PRECISION NOT NULL DEFAULT 0.0;
|
||||
|
||||
|
||||
@@ -502,18 +502,24 @@ def get_request_route(request: Request) -> str:
|
||||
remove base url from path if set e.g. `/genai/chat/completions` -> `/chat/completions
|
||||
"""
|
||||
try:
|
||||
if hasattr(request, "base_url") and request.url.path.startswith(
|
||||
request.base_url.path
|
||||
):
|
||||
# remove base_url from path
|
||||
return request.url.path[len(request.base_url.path) - 1 :]
|
||||
else:
|
||||
return request.url.path
|
||||
scope = request.scope
|
||||
if not isinstance(scope, dict):
|
||||
return str(request.url.path)
|
||||
raw_path: str = str(scope.get("path", request.url.path))
|
||||
root_path: str = str(scope.get("app_root_path", scope.get("root_path", "")))
|
||||
if not isinstance(raw_path, str):
|
||||
return str(request.url.path)
|
||||
# Only strip root_path when it is a meaningful prefix (not bare "/").
|
||||
# Stripping bare "/" would remove the leading slash from every path
|
||||
# e.g. "/team/new" → "team/new", breaking route matching.
|
||||
if root_path and root_path != "/" and raw_path.startswith(root_path):
|
||||
return raw_path[len(root_path) :]
|
||||
return raw_path
|
||||
except Exception as e:
|
||||
verbose_proxy_logger.debug(
|
||||
f"error on get_request_route: {str(e)}, defaulting to request.url.path={request.url.path}"
|
||||
)
|
||||
return request.url.path
|
||||
return str(request.url.path)
|
||||
|
||||
|
||||
@lru_cache(maxsize=256)
|
||||
|
||||
@@ -395,7 +395,9 @@ class RouteChecks:
|
||||
return True
|
||||
|
||||
for _llm_passthrough_route in LiteLLMRoutes.mapped_pass_through_routes.value:
|
||||
if _llm_passthrough_route in route:
|
||||
if route == _llm_passthrough_route or route.startswith(
|
||||
_llm_passthrough_route + "/"
|
||||
):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@@ -1152,6 +1152,41 @@ def _update_internal_user_params(
|
||||
return non_default_values
|
||||
|
||||
|
||||
def _check_user_update_authz(
|
||||
user_request: UpdateUserRequest,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
existing_user_row: Optional[BaseModel],
|
||||
) -> None:
|
||||
"""Authorization checks for /user/update — raises HTTPException on failure."""
|
||||
if (
|
||||
user_request.user_role is not None
|
||||
and user_api_key_dict.user_role != LitellmUserRoles.PROXY_ADMIN.value
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=403, detail="Only proxy admins can modify user roles."
|
||||
)
|
||||
|
||||
if existing_user_row is not None:
|
||||
typed_row = LiteLLM_UserTable(**existing_user_row.model_dump(exclude_none=True))
|
||||
if not can_user_call_user_update(
|
||||
user_api_key_dict=user_api_key_dict, user_info=typed_row
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail={
|
||||
"error": "User does not have permission to update this user. Only PROXY_ADMIN can update other users."
|
||||
},
|
||||
)
|
||||
elif user_api_key_dict.user_role != LitellmUserRoles.PROXY_ADMIN.value:
|
||||
# Silent-create guard: only PROXY_ADMIN may create via /user/update.
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail={
|
||||
"error": "User not found. Only PROXY_ADMIN can create users via /user/update; use /user/new instead."
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
async def _update_single_user_helper(
|
||||
user_request: UpdateUserRequest,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
@@ -1168,31 +1203,15 @@ async def _update_single_user_helper(
|
||||
if prisma_client is None:
|
||||
raise Exception("Not connected to DB!")
|
||||
|
||||
# Only proxy admins can modify user_role
|
||||
if (
|
||||
user_request.user_role is not None
|
||||
and user_api_key_dict.user_role != LitellmUserRoles.PROXY_ADMIN.value
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail="Only proxy admins can modify user roles.",
|
||||
)
|
||||
|
||||
# Validate user identifier
|
||||
if not user_request.user_id and not user_request.user_email:
|
||||
raise ValueError("Either user_id or user_email must be provided")
|
||||
|
||||
# Convert to data format expected by update logic
|
||||
data_json: dict = user_request.model_dump(exclude_unset=True)
|
||||
|
||||
# Apply update transformations (reuse existing logic)
|
||||
non_default_values = _update_internal_user_params(
|
||||
data_json=data_json, data=user_request
|
||||
)
|
||||
|
||||
_hash_password_in_dict(non_default_values)
|
||||
|
||||
# Get existing user data for audit logging and metadata preparation
|
||||
existing_user_row: Optional[BaseModel] = None
|
||||
if user_request.user_id:
|
||||
existing_user_row = await prisma_client.db.litellm_usertable.find_first(
|
||||
@@ -1203,37 +1222,12 @@ async def _update_single_user_helper(
|
||||
where={"user_email": user_request.user_email}
|
||||
)
|
||||
|
||||
_check_user_update_authz(user_request, user_api_key_dict, existing_user_row)
|
||||
|
||||
if existing_user_row is not None:
|
||||
existing_user_row = LiteLLM_UserTable(
|
||||
**existing_user_row.model_dump(exclude_none=True)
|
||||
)
|
||||
if not can_user_call_user_update(
|
||||
user_api_key_dict=user_api_key_dict,
|
||||
user_info=existing_user_row,
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail={
|
||||
"error": "User does not have permission to update this user. Only PROXY_ADMIN can update other users."
|
||||
},
|
||||
)
|
||||
else:
|
||||
# Silent-create guard: if the target user doesn't exist, the update
|
||||
# path falls through to an upsert that creates a new user with
|
||||
# caller-supplied fields (models, metadata, budgets, …). Only
|
||||
# PROXY_ADMIN is allowed to create users this way; otherwise an org
|
||||
# admin could spawn arbitrary users attached to nothing by supplying
|
||||
# a fresh email, bypassing the /user/new org/team-scoping checks.
|
||||
if user_api_key_dict.user_role != LitellmUserRoles.PROXY_ADMIN.value:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail={
|
||||
"error": (
|
||||
"User not found. Only PROXY_ADMIN can create users "
|
||||
"via /user/update; use /user/new instead."
|
||||
)
|
||||
},
|
||||
)
|
||||
|
||||
existing_metadata = (
|
||||
cast(Dict, getattr(existing_user_row, "metadata", {}) or {})
|
||||
|
||||
@@ -2180,23 +2180,32 @@ async def _validate_update_key_data(
|
||||
# - max_budget / spend: always require the admin check, even for the
|
||||
# key owner or a team member (matches the existing admin-only
|
||||
# budget semantics).
|
||||
is_key_owner = (
|
||||
user_api_key_dict.user_id is not None
|
||||
and existing_key_row.user_id == user_api_key_dict.user_id
|
||||
)
|
||||
_is_budget_change = (
|
||||
data.max_budget is not None and data.max_budget != existing_key_row.max_budget
|
||||
) or (
|
||||
data.spend is not None
|
||||
and data.spend != getattr(existing_key_row, "spend", None)
|
||||
)
|
||||
is_team_key = existing_key_row.team_id is not None
|
||||
can_skip_admin_check_for_non_budget = is_key_owner or is_team_key
|
||||
if (
|
||||
(not _is_proxy_admin)
|
||||
and prisma_client is not None
|
||||
and (_is_budget_change or not can_skip_admin_check_for_non_budget)
|
||||
):
|
||||
|
||||
# Personal-key bypass: the caller both created the key AND still owns it
|
||||
# (user_id == caller). Checking only created_by would let a demoted admin
|
||||
# who originally created a key for another user continue editing it without
|
||||
# admin authorization after the key was reassigned.
|
||||
caller_is_creator = (
|
||||
user_api_key_dict.user_id is not None
|
||||
and getattr(existing_key_row, "created_by", None) == user_api_key_dict.user_id
|
||||
and getattr(existing_key_row, "user_id", None) == user_api_key_dict.user_id
|
||||
)
|
||||
# Team keys: can_team_member_execute_key_management_endpoint (called above)
|
||||
# already validated team membership + /key/update permission and would have
|
||||
# raised if the caller lacked it. Reaching this point on a team key for a
|
||||
# non-budget change means the caller was authorized — skip the redundant
|
||||
# _check_key_admin_access that would otherwise require team/org admin status.
|
||||
_key_is_team_key = getattr(existing_key_row, "team_id", None) is not None
|
||||
can_skip_admin_check = (
|
||||
caller_is_creator or _key_is_team_key
|
||||
) and not _is_budget_change
|
||||
if (not _is_proxy_admin) and prisma_client is not None and not can_skip_admin_check:
|
||||
hashed_key = existing_key_row.token
|
||||
await _check_key_admin_access(
|
||||
user_api_key_dict=user_api_key_dict,
|
||||
|
||||
@@ -19,6 +19,7 @@ from pydantic import BaseModel
|
||||
|
||||
from litellm._logging import verbose_proxy_logger
|
||||
from litellm.proxy._types import CommonProxyErrors, LitellmUserRoles, UserAPIKeyAuth
|
||||
from litellm.proxy.auth.auth_utils import is_request_body_safe
|
||||
from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
|
||||
from litellm.proxy.common_utils.path_utils import safe_filename
|
||||
from litellm.types.prompts.init_prompts import (
|
||||
@@ -1295,6 +1296,13 @@ async def test_prompt(
|
||||
}
|
||||
data.update(optional_params)
|
||||
|
||||
is_request_body_safe(
|
||||
request_body=data,
|
||||
general_settings=general_settings,
|
||||
llm_router=llm_router,
|
||||
model=data.get("model", ""),
|
||||
)
|
||||
|
||||
# Use ProxyBaseLLMRequestProcessing to go through all proxy logic
|
||||
base_llm_response_processor = ProxyBaseLLMRequestProcessing(data=data)
|
||||
result = await base_llm_response_processor.base_process_llm_request(
|
||||
@@ -1323,6 +1331,8 @@ async def test_prompt(
|
||||
|
||||
except HTTPException as e:
|
||||
raise e
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
except Exception as e:
|
||||
verbose_proxy_logger.exception(f"Error testing prompt: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@@ -253,7 +253,10 @@ from litellm.proxy.auth.auth_checks import (
|
||||
get_team_object,
|
||||
log_db_metrics,
|
||||
)
|
||||
from litellm.proxy.auth.auth_utils import check_response_size_is_safe
|
||||
from litellm.proxy.auth.auth_utils import (
|
||||
check_response_size_is_safe,
|
||||
is_request_body_safe,
|
||||
)
|
||||
from litellm.proxy.auth.handle_jwt import JWTHandler
|
||||
from litellm.proxy.auth.litellm_license import LicenseCheck
|
||||
from litellm.proxy.auth.model_checks import (
|
||||
@@ -10298,6 +10301,16 @@ async def supported_openai_params(model: str):
|
||||
async def transform_request(request: TransformRequestBody):
|
||||
from litellm.utils import return_raw_request
|
||||
|
||||
try:
|
||||
is_request_body_safe(
|
||||
request_body=request.request_body,
|
||||
general_settings=general_settings,
|
||||
llm_router=llm_router,
|
||||
model=request.request_body.get("model", ""),
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail={"error": str(e)})
|
||||
|
||||
return return_raw_request(endpoint=request.call_type, kwargs=request.request_body)
|
||||
|
||||
|
||||
|
||||
@@ -384,6 +384,41 @@ async def parse_rag_ingest_request(
|
||||
},
|
||||
)
|
||||
|
||||
# Credential fields must come from server configuration, not user requests.
|
||||
# Accepting user-supplied credentials (e.g. vertex_credentials with
|
||||
# type=external_account + credential_source.file=/proc/1/environ) allows
|
||||
# any authenticated user to exfiltrate host secrets via SSRF through
|
||||
# google-auth's identity_pool credential refresh.
|
||||
# api_base is also blocked: a user-controlled base URL causes the server
|
||||
# to send its configured provider credentials to an attacker endpoint.
|
||||
_BLOCKED_VECTOR_STORE_CREDENTIAL_PARAMS = {
|
||||
"vertex_credentials",
|
||||
"vertex_ai_credentials",
|
||||
"aws_access_key_id",
|
||||
"aws_secret_access_key",
|
||||
"aws_session_token",
|
||||
"aws_web_identity_token",
|
||||
"aws_role_name",
|
||||
"aws_session_name",
|
||||
"aws_profile_name",
|
||||
"aws_sts_endpoint",
|
||||
"aws_external_id",
|
||||
"azure_ad_token",
|
||||
"api_key",
|
||||
"api_base",
|
||||
}
|
||||
vector_store_opts = ingest_options.get("vector_store", {})
|
||||
if isinstance(vector_store_opts, dict):
|
||||
for field in _BLOCKED_VECTOR_STORE_CREDENTIAL_PARAMS:
|
||||
if field in vector_store_opts:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail={
|
||||
"error": f"'{field}' cannot be set in ingest_options.vector_store. "
|
||||
"Credentials must be configured server-side."
|
||||
},
|
||||
)
|
||||
|
||||
return ingest_options, file_data, file_url, file_id
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,75 @@
|
||||
"""
|
||||
Repro: multipart/form-data delivers litellm_embedding_config as a JSON
|
||||
string. is_request_body_safe skips the nested banned-param check because
|
||||
isinstance(nested, dict) is False for a string value.
|
||||
|
||||
A banned param (api_base, aws_sts_endpoint, etc.) nested inside the
|
||||
stringified config is therefore invisible to the bouncer.
|
||||
"""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
|
||||
|
||||
class TestMultipartNestedBypass:
|
||||
|
||||
def test_nested_banned_param_caught_when_dict(self):
|
||||
"""Baseline: nested api_base inside a dict IS caught."""
|
||||
from litellm.proxy.auth.auth_utils import is_request_body_safe
|
||||
|
||||
request_body = {
|
||||
"model": "text-embedding-ada-002",
|
||||
"litellm_embedding_config": {"api_base": "https://attacker.com"},
|
||||
}
|
||||
|
||||
with pytest.raises(ValueError, match="api_base"):
|
||||
is_request_body_safe(
|
||||
request_body=request_body,
|
||||
general_settings={},
|
||||
llm_router=None,
|
||||
model="text-embedding-ada-002",
|
||||
)
|
||||
|
||||
def test_nested_banned_param_blocked_when_json_string(self):
|
||||
"""
|
||||
Regression: multipart delivers litellm_embedding_config as a JSON string.
|
||||
_coerce_metadata_to_dict now parses it before the banned-param check,
|
||||
so api_base nested inside the stringified config IS caught.
|
||||
"""
|
||||
from litellm.proxy.auth.auth_utils import is_request_body_safe
|
||||
|
||||
# Exactly what _read_request_body produces for multipart:
|
||||
# dict(await request.form()) gives string values for non-file fields.
|
||||
request_body = {
|
||||
"model": "text-embedding-ada-002",
|
||||
"litellm_embedding_config": json.dumps(
|
||||
{"api_base": "https://attacker.com"}
|
||||
),
|
||||
}
|
||||
|
||||
with pytest.raises(ValueError, match="api_base"):
|
||||
is_request_body_safe(
|
||||
request_body=request_body,
|
||||
general_settings={},
|
||||
llm_router=None,
|
||||
model="text-embedding-ada-002",
|
||||
)
|
||||
|
||||
def test_nested_aws_sts_endpoint_blocked_when_json_string(self):
|
||||
"""Regression: aws_sts_endpoint nested in JSON-string config is caught."""
|
||||
from litellm.proxy.auth.auth_utils import is_request_body_safe
|
||||
|
||||
request_body = {
|
||||
"model": "text-embedding-ada-002",
|
||||
"litellm_embedding_config": json.dumps(
|
||||
{"aws_sts_endpoint": "https://attacker.com/sts"}
|
||||
),
|
||||
}
|
||||
|
||||
with pytest.raises(ValueError, match="aws_sts_endpoint"):
|
||||
is_request_body_safe(
|
||||
request_body=request_body,
|
||||
general_settings={},
|
||||
llm_router=None,
|
||||
model="text-embedding-ada-002",
|
||||
)
|
||||
@@ -3,8 +3,6 @@ Test /prompts/test endpoint for testing prompts before saving
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from fastapi import HTTPException
|
||||
|
||||
|
||||
class TestPromptTestEndpoint:
|
||||
@@ -132,3 +130,55 @@ User: Hello"""
|
||||
|
||||
model = frontmatter.get("model")
|
||||
assert model is None
|
||||
|
||||
def test_ssrf_via_dotprompt_api_base_blocked(self):
|
||||
"""
|
||||
Regression: api_base in dotprompt YAML frontmatter must be rejected.
|
||||
|
||||
Without the fix, optional_params (every frontmatter key not in the
|
||||
restricted list) was merged into the LLM call data dict and bypassed
|
||||
is_request_body_safe, allowing any bearer-key holder to redirect the
|
||||
outbound LLM request — and the provider API key — to an
|
||||
attacker-controlled host (SSRF / credential exfil).
|
||||
|
||||
The fix calls is_request_body_safe on the constructed data dict before
|
||||
the LLM call. This test verifies:
|
||||
1. api_base flows from YAML frontmatter into optional_params (it does).
|
||||
2. is_request_body_safe raises ValueError when api_base is present
|
||||
without admin opt-in (it does, from _BANNED_REQUEST_BODY_PARAMS).
|
||||
"""
|
||||
from litellm.integrations.dotprompt.prompt_manager import (
|
||||
PromptManager,
|
||||
PromptTemplate,
|
||||
)
|
||||
from litellm.proxy.auth.auth_utils import is_request_body_safe
|
||||
|
||||
malicious_frontmatter = {
|
||||
"model": "gpt-4o",
|
||||
"api_base": "https://attacker.example.com",
|
||||
"temperature": 0.7,
|
||||
}
|
||||
|
||||
template = PromptTemplate(
|
||||
content="User: Hello", metadata=malicious_frontmatter, template_id="test"
|
||||
)
|
||||
|
||||
# api_base must flow into optional_params — that's the attack surface
|
||||
assert "api_base" in template.optional_params
|
||||
assert template.optional_params["api_base"] == "https://attacker.example.com"
|
||||
|
||||
# Simulate what test_prompt builds before calling the LLM
|
||||
data = {
|
||||
"model": template.model,
|
||||
"messages": [{"role": "user", "content": "Hello"}],
|
||||
}
|
||||
data.update(template.optional_params)
|
||||
|
||||
# is_request_body_safe must reject it without admin opt-in
|
||||
with pytest.raises(ValueError, match="api_base"):
|
||||
is_request_body_safe(
|
||||
request_body=data,
|
||||
general_settings={},
|
||||
llm_router=None,
|
||||
model=data.get("model", ""),
|
||||
)
|
||||
|
||||
@@ -189,3 +189,37 @@ def test_get_request_route_with_base_url_not_at_start():
|
||||
request = create_request("/api/genai/test")
|
||||
result = get_request_route(request)
|
||||
assert result == "/api/genai/test"
|
||||
|
||||
|
||||
def _create_request_with_host_header(path: str, host_header: str) -> Request:
|
||||
return Request(
|
||||
{
|
||||
"type": "http",
|
||||
"method": "GET",
|
||||
"scheme": "http",
|
||||
"server": ("localhost", 4000),
|
||||
"path": path,
|
||||
"query_string": b"",
|
||||
"headers": [(b"host", host_header.encode())],
|
||||
"client": ("127.0.0.1", 50000),
|
||||
"root_path": "",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"host_header",
|
||||
[
|
||||
"localhost/?x=1",
|
||||
"localhost:4000/?x=1",
|
||||
"localhost/#test",
|
||||
"localhost:4000/#test",
|
||||
],
|
||||
)
|
||||
def test_get_request_route_not_bypassed_by_malformed_host(host_header: str):
|
||||
for protected_path in ["/health", "/user/new", "/key/generate", "/get/internal_user_settings"]:
|
||||
request = _create_request_with_host_header(path=protected_path, host_header=host_header)
|
||||
result = get_request_route(request)
|
||||
assert result == protected_path, (
|
||||
f"Host: {host_header!r} caused route {protected_path!r} to resolve as {result!r}"
|
||||
)
|
||||
|
||||
@@ -2137,3 +2137,57 @@ async def test_initialize_pass_through_registers_wildcard_for_auth_subpath():
|
||||
)
|
||||
for k in registered:
|
||||
InitPassThroughEndpointHelpers.remove_endpoint_routes(k.split(":")[0])
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"route",
|
||||
[
|
||||
"/credentials/by_name/openai",
|
||||
"/credentials/openai",
|
||||
"/credentials/azure",
|
||||
"/credentials/by_name/anthropic",
|
||||
"/model/delete/openai",
|
||||
"/model/delete/anthropic-prod",
|
||||
"/budget/update/bedrock",
|
||||
"/user/delete/gemini-user",
|
||||
],
|
||||
)
|
||||
def test_provider_name_substring_not_classified_as_llm_route(route):
|
||||
"""
|
||||
Regression: mapped_pass_through_routes used a substring check
|
||||
(`_llm_passthrough_route in route`) so any admin-only path whose URL
|
||||
happened to contain a provider name (openai, anthropic, azure, …) was
|
||||
misclassified as an LLM API route and bypassed the admin gate.
|
||||
|
||||
The fix uses an exact/prefix match so only routes that actually *start*
|
||||
with a passthrough prefix are allowed through.
|
||||
"""
|
||||
from litellm.proxy.auth.route_checks import RouteChecks
|
||||
|
||||
assert RouteChecks.is_llm_api_route(route=route) is False, (
|
||||
f"{route!r} should NOT be classified as an LLM API route — "
|
||||
"provider-name substring match bypass"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"route",
|
||||
[
|
||||
"/openai/v1/chat/completions",
|
||||
"/openai",
|
||||
"/anthropic/v1/messages",
|
||||
"/anthropic",
|
||||
"/bedrock/invoke",
|
||||
"/azure/openai/deployments/gpt-4/chat/completions",
|
||||
"/gemini/v1/models",
|
||||
"/vertex-ai/predict",
|
||||
"/vertex_ai/predict",
|
||||
],
|
||||
)
|
||||
def test_legitimate_passthrough_routes_still_classified_as_llm_route(route):
|
||||
"""Legitimate passthrough routes must still pass is_llm_api_route."""
|
||||
from litellm.proxy.auth.route_checks import RouteChecks
|
||||
|
||||
assert (
|
||||
RouteChecks.is_llm_api_route(route=route) is True
|
||||
), f"{route!r} should be classified as an LLM API route"
|
||||
|
||||
@@ -5540,7 +5540,7 @@ async def test_validate_max_budget():
|
||||
_validate_max_budget(-10.0)
|
||||
|
||||
assert exc_info.value.status_code == 400
|
||||
assert "max_budget cannot be negative" in str(exc_info.value.detail)
|
||||
assert "negative" in str(exc_info.value.detail)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -8420,6 +8420,7 @@ async def test_update_key_non_budget_fields_allowed_for_internal_user(monkeypatc
|
||||
mock_existing_key = MagicMock()
|
||||
mock_existing_key.token = test_hashed_token
|
||||
mock_existing_key.user_id = "internal_user"
|
||||
mock_existing_key.created_by = "internal_user"
|
||||
mock_existing_key.team_id = None
|
||||
mock_existing_key.project_id = None
|
||||
mock_existing_key.max_budget = 10.0
|
||||
@@ -8561,6 +8562,68 @@ async def test_update_key_non_budget_rejects_cross_user_modification(monkeypatch
|
||||
assert str(exc.value.code) == "403"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_key_creator_reassigned_key_blocked(monkeypatch):
|
||||
"""Regression: creator who no longer owns the key (user_id ≠ caller) must
|
||||
not bypass _check_key_admin_access via the caller_is_creator shortcut."""
|
||||
from litellm.proxy.management_endpoints.key_management_endpoints import (
|
||||
update_key_fn,
|
||||
)
|
||||
|
||||
test_hashed_token = "aabbccdd" * 8
|
||||
mock_prisma_client = AsyncMock()
|
||||
|
||||
mock_existing_key = MagicMock()
|
||||
mock_existing_key.token = test_hashed_token
|
||||
mock_existing_key.created_by = "demoted-admin" # creator
|
||||
mock_existing_key.user_id = "victim-user" # reassigned to someone else
|
||||
mock_existing_key.team_id = None
|
||||
mock_existing_key.project_id = None
|
||||
mock_existing_key.max_budget = 10.0
|
||||
mock_existing_key.key_alias = "original"
|
||||
mock_existing_key.models = []
|
||||
mock_existing_key.model_dump.return_value = {
|
||||
"token": test_hashed_token,
|
||||
"user_id": "victim-user",
|
||||
"created_by": "demoted-admin",
|
||||
"team_id": None,
|
||||
"max_budget": 10.0,
|
||||
}
|
||||
|
||||
mock_prisma_client.get_data = AsyncMock(return_value=mock_existing_key)
|
||||
mock_prisma_client.db.litellm_verificationtoken.find_unique = AsyncMock(
|
||||
return_value=mock_existing_key
|
||||
)
|
||||
|
||||
monkeypatch.setattr("litellm.proxy.proxy_server.prisma_client", mock_prisma_client)
|
||||
monkeypatch.setattr("litellm.proxy.proxy_server.user_api_key_cache", AsyncMock())
|
||||
monkeypatch.setattr("litellm.proxy.proxy_server.proxy_logging_obj", MagicMock())
|
||||
monkeypatch.setattr("litellm.proxy.proxy_server.llm_router", None)
|
||||
monkeypatch.setattr("litellm.proxy.proxy_server.premium_user", True)
|
||||
monkeypatch.setattr("litellm.store_audit_logs", False)
|
||||
monkeypatch.setattr(
|
||||
"litellm.proxy.proxy_server.hash_token", lambda t: test_hashed_token
|
||||
)
|
||||
|
||||
demoted_admin = UserAPIKeyAuth(
|
||||
user_role=LitellmUserRoles.INTERNAL_USER,
|
||||
api_key="sk-demoted",
|
||||
user_id="demoted-admin",
|
||||
)
|
||||
|
||||
mock_request = MagicMock()
|
||||
mock_request.query_params = {}
|
||||
|
||||
with pytest.raises(Exception) as exc:
|
||||
await update_key_fn(
|
||||
request=mock_request,
|
||||
data=UpdateKeyRequest(key=test_hashed_token, key_alias="hijacked"),
|
||||
user_api_key_dict=demoted_admin,
|
||||
litellm_changed_by=None,
|
||||
)
|
||||
assert str(exc.value.code) == "403"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_key_team_member_with_permission_can_update_non_budget(
|
||||
monkeypatch,
|
||||
@@ -9069,6 +9132,172 @@ class TestLIT1884KeyUpdateValidation:
|
||||
)
|
||||
|
||||
|
||||
class TestKeyOwnerPrivilegeEscalation:
|
||||
"""
|
||||
Policy:
|
||||
- created_by == caller → can edit any non-budget field without admin
|
||||
- created_by != caller (assigned user) → must pass admin check for any edit
|
||||
- budget changes (max_budget/spend) → always require admin
|
||||
- PROXY_ADMIN → unrestricted
|
||||
"""
|
||||
|
||||
def _make_existing_key(self, user_id="creator-123", created_by="creator-123"):
|
||||
row = MagicMock()
|
||||
# user_id must be set explicitly — MagicMock auto-attrs are not None and
|
||||
# trip the _is_allowed_to_make_key_request assert before our check runs.
|
||||
row.user_id = user_id
|
||||
row.created_by = created_by
|
||||
row.token = "hashed_token"
|
||||
row.team_id = None
|
||||
row.max_budget = None
|
||||
row.spend = 0.0
|
||||
row.organization_id = None
|
||||
row.project_id = None
|
||||
return row
|
||||
|
||||
def _make_auth(self, user_id="creator-123"):
|
||||
return UserAPIKeyAuth(
|
||||
user_id=user_id,
|
||||
user_role=LitellmUserRoles.INTERNAL_USER,
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_assigned_user_blocked_from_any_edit(self):
|
||||
"""User who did not create the key cannot edit it at all."""
|
||||
data = UpdateKeyRequest(key="sk-test", key_alias="hacked")
|
||||
# user_id matches caller so _is_allowed_to_make_key_request passes,
|
||||
# but created_by != caller so our creator check requires admin.
|
||||
existing = self._make_existing_key(
|
||||
user_id="assigned-user", created_by="admin-456"
|
||||
)
|
||||
auth = self._make_auth(user_id="assigned-user")
|
||||
|
||||
mock_check = AsyncMock(
|
||||
side_effect=HTTPException(status_code=403, detail="Not authorized")
|
||||
)
|
||||
with patch(
|
||||
"litellm.proxy.management_endpoints.key_management_endpoints._check_key_admin_access",
|
||||
mock_check,
|
||||
):
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await _validate_update_key_data(
|
||||
data=data,
|
||||
existing_key_row=existing,
|
||||
user_api_key_dict=auth,
|
||||
llm_router=None,
|
||||
premium_user=False,
|
||||
prisma_client=AsyncMock(),
|
||||
user_api_key_cache=MagicMock(),
|
||||
)
|
||||
assert exc_info.value.status_code == 403
|
||||
mock_check.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_assigned_user_blocked_from_model_escalation(self):
|
||||
data = UpdateKeyRequest(key="sk-test", models=["gpt-4", "claude-opus"])
|
||||
existing = self._make_existing_key(
|
||||
user_id="assigned-user", created_by="admin-456"
|
||||
)
|
||||
auth = self._make_auth(user_id="assigned-user")
|
||||
|
||||
mock_check = AsyncMock(
|
||||
side_effect=HTTPException(status_code=403, detail="Not authorized")
|
||||
)
|
||||
with patch(
|
||||
"litellm.proxy.management_endpoints.key_management_endpoints._check_key_admin_access",
|
||||
mock_check,
|
||||
):
|
||||
with pytest.raises(HTTPException):
|
||||
await _validate_update_key_data(
|
||||
data=data,
|
||||
existing_key_row=existing,
|
||||
user_api_key_dict=auth,
|
||||
llm_router=None,
|
||||
premium_user=False,
|
||||
prisma_client=AsyncMock(),
|
||||
user_api_key_cache=MagicMock(),
|
||||
)
|
||||
mock_check.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_creator_can_edit_own_key(self):
|
||||
"""Key creator can update any non-budget field without admin."""
|
||||
data = UpdateKeyRequest(
|
||||
key="sk-test", models=["gpt-4"], rpm_limit=500, key_alias="my-key"
|
||||
)
|
||||
existing = self._make_existing_key(created_by="creator-123")
|
||||
auth = self._make_auth(user_id="creator-123")
|
||||
|
||||
mock_check = AsyncMock()
|
||||
with patch(
|
||||
"litellm.proxy.management_endpoints.key_management_endpoints._check_key_admin_access",
|
||||
mock_check,
|
||||
):
|
||||
await _validate_update_key_data(
|
||||
data=data,
|
||||
existing_key_row=existing,
|
||||
user_api_key_dict=auth,
|
||||
llm_router=None,
|
||||
premium_user=False,
|
||||
prisma_client=AsyncMock(),
|
||||
user_api_key_cache=MagicMock(),
|
||||
)
|
||||
mock_check.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_creator_cannot_change_own_budget(self):
|
||||
"""Budget changes require admin even for the key creator."""
|
||||
data = UpdateKeyRequest(key="sk-test", max_budget=9999.0)
|
||||
existing = self._make_existing_key(created_by="creator-123")
|
||||
existing.max_budget = 10.0
|
||||
auth = self._make_auth(user_id="creator-123")
|
||||
|
||||
mock_check = AsyncMock(
|
||||
side_effect=HTTPException(status_code=403, detail="Not authorized")
|
||||
)
|
||||
with patch(
|
||||
"litellm.proxy.management_endpoints.key_management_endpoints._check_key_admin_access",
|
||||
mock_check,
|
||||
):
|
||||
with pytest.raises(HTTPException):
|
||||
await _validate_update_key_data(
|
||||
data=data,
|
||||
existing_key_row=existing,
|
||||
user_api_key_dict=auth,
|
||||
llm_router=None,
|
||||
premium_user=False,
|
||||
prisma_client=AsyncMock(),
|
||||
user_api_key_cache=MagicMock(),
|
||||
)
|
||||
mock_check.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_can_update_any_field(self):
|
||||
data = UpdateKeyRequest(key="sk-test", models=["gpt-4"], max_budget=999.0)
|
||||
existing = self._make_existing_key(created_by="someone-else")
|
||||
existing.max_budget = 1.0
|
||||
auth = UserAPIKeyAuth(
|
||||
user_id="admin-user",
|
||||
user_role=LitellmUserRoles.PROXY_ADMIN,
|
||||
)
|
||||
|
||||
mock_check = AsyncMock()
|
||||
with patch(
|
||||
"litellm.proxy.management_endpoints.key_management_endpoints._check_key_admin_access",
|
||||
mock_check,
|
||||
):
|
||||
await _validate_update_key_data(
|
||||
data=data,
|
||||
existing_key_row=existing,
|
||||
user_api_key_dict=auth,
|
||||
llm_router=None,
|
||||
premium_user=False,
|
||||
prisma_client=AsyncMock(),
|
||||
user_api_key_cache=MagicMock(),
|
||||
)
|
||||
mock_check.assert_not_called()
|
||||
|
||||
|
||||
class TestKeyAliasSkipValidationOnUnchanged:
|
||||
"""
|
||||
Test that updating/regenerating a key without changing its key_alias
|
||||
|
||||
@@ -130,6 +130,57 @@ def test_internal_user_rag_ingest_without_vector_store_id_allowed(client_interna
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"blocked_field",
|
||||
[
|
||||
"vertex_credentials",
|
||||
"vertex_ai_credentials",
|
||||
"aws_access_key_id",
|
||||
"aws_secret_access_key",
|
||||
"aws_session_token",
|
||||
"api_key",
|
||||
"api_base",
|
||||
],
|
||||
)
|
||||
def test_rag_ingest_blocks_clientside_credentials(client_internal_user, blocked_field):
|
||||
"""
|
||||
Credential fields in ingest_options.vector_store must be rejected.
|
||||
|
||||
Accepting user-supplied credentials (e.g. vertex_credentials with
|
||||
type=external_account + credential_source.file=/proc/1/environ) allows
|
||||
any authenticated user to exfiltrate host secrets via SSRF through
|
||||
google-auth's identity_pool credential refresh.
|
||||
"""
|
||||
payload = {
|
||||
"ingest_options": {
|
||||
"vector_store": {
|
||||
"custom_llm_provider": "vertex_ai",
|
||||
"vertex_project": "x",
|
||||
blocked_field: {
|
||||
"type": "external_account",
|
||||
"token_url": "http://attacker.example/sts",
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
response = client_internal_user.post(
|
||||
"/v1/rag/ingest",
|
||||
json={
|
||||
**payload,
|
||||
"file": {
|
||||
"filename": "q.txt",
|
||||
"content": "dGVzdA==",
|
||||
"content_type": "text/plain",
|
||||
},
|
||||
},
|
||||
)
|
||||
assert (
|
||||
response.status_code == 400
|
||||
), f"Expected 400 when '{blocked_field}' is set clientside, got {response.status_code}: {response.json()}"
|
||||
body = response.json()
|
||||
assert blocked_field in str(
|
||||
body
|
||||
), f"Response should mention '{blocked_field}': {body}"
|
||||
class TestRagIngestSSRFBlocked:
|
||||
"""
|
||||
aws_sts_endpoint and related credential-redirect fields must be rejected
|
||||
|
||||
@@ -23,6 +23,7 @@ sys.path.insert(
|
||||
) # Adds the parent directory to the system-path
|
||||
|
||||
import litellm
|
||||
from litellm.proxy._types import LitellmUserRoles, UserAPIKeyAuth
|
||||
from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
|
||||
from litellm.proxy.proxy_server import app, initialize
|
||||
from litellm.utils import _invalidate_model_cost_lowercase_map
|
||||
@@ -6686,3 +6687,52 @@ def test_realtime_websocket_route_aliases_registered():
|
||||
f"{expected!r} missing from API_ROUTE_TO_CALL_TYPES; call-type "
|
||||
f"resolution will return None and break call-type-aware features."
|
||||
)
|
||||
|
||||
|
||||
class TestTransformRequestBannedParams:
|
||||
"""
|
||||
/utils/transform_request applies the same banned-param check as LLM endpoints.
|
||||
|
||||
Without this check, any authenticated user could supply aws_sts_endpoint,
|
||||
api_base, etc. and have the server forward its credentials to an
|
||||
attacker-controlled endpoint during SDK credential resolution.
|
||||
"""
|
||||
|
||||
@pytest.fixture
|
||||
def client(self):
|
||||
mock_auth = UserAPIKeyAuth(
|
||||
user_id="test-internal",
|
||||
user_role=LitellmUserRoles.INTERNAL_USER,
|
||||
)
|
||||
original = app.dependency_overrides.copy()
|
||||
app.dependency_overrides[user_api_key_auth] = lambda: mock_auth
|
||||
try:
|
||||
yield TestClient(app)
|
||||
finally:
|
||||
app.dependency_overrides = original
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"banned",
|
||||
[
|
||||
"aws_sts_endpoint",
|
||||
"api_base",
|
||||
"aws_web_identity_token",
|
||||
"vertex_credentials",
|
||||
],
|
||||
)
|
||||
def test_banned_params_rejected_for_all_users(self, client, banned):
|
||||
"""Banned params must be blocked for any authenticated user."""
|
||||
response = client.post(
|
||||
"/utils/transform_request",
|
||||
json={
|
||||
"call_type": "completion",
|
||||
"request_body": {
|
||||
"model": "gpt-3.5-turbo",
|
||||
banned: "https://attacker.example",
|
||||
},
|
||||
},
|
||||
)
|
||||
assert response.status_code == 400, (
|
||||
f"Expected 400 for banned param '{banned}', "
|
||||
f"got {response.status_code}: {response.json()}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user