Merge pull request #21627 from BerriAI/litellm_service_key_vis

[Fix] Service Account Visibility for Team Members
This commit is contained in:
yuneng-jiang
2026-02-19 17:09:33 -08:00
committed by GitHub
2 changed files with 511 additions and 20 deletions
@@ -3869,17 +3869,14 @@ async def validate_key_list_check(
return complete_user_info
async def get_admin_team_ids(
async def _fetch_user_team_objects(
complete_user_info: Optional[LiteLLM_UserTable],
user_api_key_dict: UserAPIKeyAuth,
prisma_client: PrismaClient,
) -> List[str]:
"""
Get all team IDs where the user is an admin.
"""
if complete_user_info is None:
) -> List[LiteLLM_TeamTable]:
"""Fetch team objects for all teams a user belongs to (single DB query)."""
if complete_user_info is None or not complete_user_info.teams:
return []
# Get all teams that user is an admin of
teams: Optional[
List[BaseModel]
] = await prisma_client.db.litellm_teamtable.find_many(
@@ -3888,14 +3885,60 @@ async def get_admin_team_ids(
if teams is None:
return []
teams_pydantic_obj = [LiteLLM_TeamTable(**team.model_dump()) for team in teams]
return [LiteLLM_TeamTable(**team.model_dump()) for team in teams]
admin_team_ids = [
def _get_admin_team_ids_from_objects(
user_api_key_dict: UserAPIKeyAuth,
team_objects: List[LiteLLM_TeamTable],
) -> List[str]:
"""Filter team objects to those where the user is an admin."""
return [
team.team_id
for team in teams_pydantic_obj
for team in team_objects
if _is_user_team_admin(user_api_key_dict=user_api_key_dict, team_obj=team)
]
return admin_team_ids
def _get_member_team_ids_from_objects(
user_api_key_dict: UserAPIKeyAuth,
team_objects: List[LiteLLM_TeamTable],
) -> List[str]:
"""Filter team objects to those where the user is a member (any role)."""
return [
team.team_id
for team in team_objects
if any(
member.user_id is not None
and member.user_id == user_api_key_dict.user_id
for member in team.members_with_roles
)
]
async def get_admin_team_ids(
complete_user_info: Optional[LiteLLM_UserTable],
user_api_key_dict: UserAPIKeyAuth,
prisma_client: PrismaClient,
) -> List[str]:
"""Get all team IDs where the user is an admin."""
team_objects = await _fetch_user_team_objects(complete_user_info, prisma_client)
return _get_admin_team_ids_from_objects(user_api_key_dict, team_objects)
async def get_member_team_ids(
complete_user_info: Optional[LiteLLM_UserTable],
user_api_key_dict: UserAPIKeyAuth,
prisma_client: PrismaClient,
) -> List[str]:
"""
Get all team IDs where the user is a member (any role, including admin).
Used to determine which teams' service accounts (keys with user_id=NULL)
a regular team member can see.
"""
team_objects = await _fetch_user_team_objects(complete_user_info, prisma_client)
return _get_member_team_ids_from_objects(user_api_key_dict, team_objects)
@router.get(
@@ -3981,12 +4024,26 @@ async def list_keys(
prisma_client=prisma_client,
)
if include_team_keys:
admin_team_ids = await get_admin_team_ids(
# Fetch team objects once when needed for either admin or member filtering.
# This avoids duplicate DB queries for the same team data.
if include_team_keys or include_created_by_keys:
team_objects = await _fetch_user_team_objects(
complete_user_info=complete_user_info,
user_api_key_dict=user_api_key_dict,
prisma_client=prisma_client,
)
member_team_ids = _get_member_team_ids_from_objects(
user_api_key_dict=user_api_key_dict,
team_objects=team_objects,
)
else:
team_objects = []
member_team_ids = None
if include_team_keys:
admin_team_ids = _get_admin_team_ids_from_objects(
user_api_key_dict=user_api_key_dict,
team_objects=team_objects,
)
else:
admin_team_ids = None
@@ -4007,6 +4064,7 @@ async def list_keys(
return_full_object=return_full_object,
organization_id=organization_id,
admin_team_ids=admin_team_ids,
member_team_ids=member_team_ids,
include_created_by_keys=include_created_by_keys,
sort_by=sort_by,
sort_order=sort_order,
@@ -4157,9 +4215,19 @@ def _build_key_filter_conditions(
key_hash: Optional[str],
exclude_team_id: Optional[str],
admin_team_ids: Optional[List[str]],
include_created_by_keys: bool,
member_team_ids: Optional[List[str]] = None,
include_created_by_keys: bool = False,
) -> Dict[str, Union[str, Dict[str, Any], List[Dict[str, Any]]]]:
"""Build filter conditions for key listing."""
"""Build filter conditions for key listing.
Visibility rules:
- Users always see their own keys (user_id match)
- Team admins see ALL keys for their admin teams (via admin_team_ids)
- Regular team members see only service accounts (user_id=NULL) for their
teams (via member_team_ids). This prevents leaking other members' spend data.
- created_by visibility is scoped to teams the user currently belongs to,
so former members cannot see service accounts they created after leaving.
"""
# Prepare filter conditions
where: Dict[str, Union[str, Dict[str, Any], List[Dict[str, Any]]]] = {}
where.update(_get_condition_to_filter_out_ui_session_tokens())
@@ -4185,14 +4253,55 @@ def _build_key_filter_conditions(
if user_condition:
or_conditions.append(user_condition)
# Add condition for created by keys if provided
# Add condition for created_by keys, scoped to user's current teams
if include_created_by_keys and user_id:
or_conditions.append({"created_by": user_id})
if member_team_ids is not None:
if member_team_ids:
# Scope created_by keys to teams user is still a member of,
# or keys that have no team (personal keys)
or_conditions.append(
{
"AND": [
{"created_by": user_id},
{
"OR": [
{"team_id": {"in": member_team_ids}},
{"team_id": None},
]
},
]
}
)
else:
# User is not a member of any team, only show non-team created_by keys
or_conditions.append(
{"AND": [{"created_by": user_id}, {"team_id": None}]}
)
else:
# No team membership info provided (backward compatibility for
# direct _list_key_helper callers like Prometheus)
or_conditions.append({"created_by": user_id})
# Add condition for admin team keys if provided
# Add condition for admin team keys (admins see ALL team keys)
if admin_team_ids:
or_conditions.append({"team_id": {"in": admin_team_ids}})
# Add condition for member team service accounts (members only see keys with user_id=NULL)
if member_team_ids:
# Exclude teams where user is already admin (those are covered above with full visibility)
member_only_team_ids = [
tid for tid in member_team_ids if tid not in (admin_team_ids or [])
]
if member_only_team_ids:
or_conditions.append(
{
"AND": [
{"team_id": {"in": member_only_team_ids}},
{"user_id": None},
]
}
)
# Combine conditions with OR if we have multiple conditions
if len(or_conditions) > 1:
where = {"AND": [where, {"OR": or_conditions}]}
@@ -4217,6 +4326,9 @@ async def _list_key_helper(
admin_team_ids: Optional[
List[str]
] = None, # New parameter for teams where user is admin
member_team_ids: Optional[
List[str]
] = None, # Team IDs where user is a member (any role) - for service account visibility
include_created_by_keys: bool = False,
sort_by: Optional[str] = None,
sort_order: str = "desc",
@@ -4234,6 +4346,7 @@ async def _list_key_helper(
exclude_team_id: Optional[str] # exclude a specific team_id
return_full_object: bool # when true, will return UserAPIKeyAuth objects instead of just the token
admin_team_ids: Optional[List[str]] # list of team IDs where the user is an admin
member_team_ids: Optional[List[str]] # list of team IDs where user is a member (for service account visibility)
Returns:
KeyListResponseObject
@@ -4252,6 +4365,7 @@ async def _list_key_helper(
key_hash=key_hash,
exclude_team_id=exclude_team_id,
admin_team_ids=admin_team_ids,
member_team_ids=member_team_ids,
include_created_by_keys=include_created_by_keys,
)
@@ -5780,3 +5780,380 @@ async def test_default_key_generate_params_duration(monkeypatch):
assert request.duration == "180d"
finally:
litellm.default_key_generate_params = original_value
@pytest.mark.asyncio
async def test_build_key_filter_member_team_service_accounts():
"""
Test that regular team members can see service accounts (user_id=NULL)
for their teams, but NOT other members' personal keys.
"""
from litellm.proxy.management_endpoints.key_management_endpoints import (
_build_key_filter_conditions,
)
user_id = "regular-member-123"
member_team_ids = ["team-A", "team-B"]
where = _build_key_filter_conditions(
user_id=user_id,
team_id=None,
organization_id=None,
key_alias=None,
key_hash=None,
exclude_team_id=None,
admin_team_ids=None,
member_team_ids=member_team_ids,
include_created_by_keys=False,
)
# Should have AND with OR conditions
assert "AND" in where
or_conditions = where["AND"][1]["OR"]
# Should have 2 conditions: user's own keys + member team service accounts
assert len(or_conditions) == 2
# First: user's own keys
user_cond = or_conditions[0]
assert user_cond["user_id"] == user_id
# Second: service accounts for member teams (user_id=None AND team_id in member teams)
service_account_cond = or_conditions[1]
assert "AND" in service_account_cond
and_parts = service_account_cond["AND"]
assert {"team_id": {"in": member_team_ids}} in and_parts
assert {"user_id": None} in and_parts
@pytest.mark.asyncio
async def test_build_key_filter_admin_sees_all_team_keys():
"""
Test that team admins see ALL keys for their teams (not just service accounts),
and that member_team_ids doesn't duplicate admin teams.
"""
from litellm.proxy.management_endpoints.key_management_endpoints import (
_build_key_filter_conditions,
)
user_id = "admin-user-123"
admin_team_ids = ["team-A"]
member_team_ids = ["team-A", "team-B"]
where = _build_key_filter_conditions(
user_id=user_id,
team_id=None,
organization_id=None,
key_alias=None,
key_hash=None,
exclude_team_id=None,
admin_team_ids=admin_team_ids,
member_team_ids=member_team_ids,
include_created_by_keys=False,
)
assert "AND" in where
or_conditions = where["AND"][1]["OR"]
# Should have 3 conditions:
# 1. user's own keys
# 2. admin team keys (all keys for team-A)
# 3. member-only service accounts (only service accounts for team-B, since team-A is already covered by admin)
assert len(or_conditions) == 3
# Find admin condition
admin_cond = None
service_account_cond = None
for cond in or_conditions:
if isinstance(cond.get("team_id"), dict) and "in" in cond.get("team_id", {}):
admin_cond = cond
elif "AND" in cond:
service_account_cond = cond
assert admin_cond is not None, "Admin team condition should be present"
assert admin_cond["team_id"]["in"] == admin_team_ids
# member-only condition should only include team-B (team-A is covered by admin)
assert service_account_cond is not None, "Service account condition should be present"
and_parts = service_account_cond["AND"]
assert {"team_id": {"in": ["team-B"]}} in and_parts
assert {"user_id": None} in and_parts
@pytest.mark.asyncio
async def test_build_key_filter_created_by_scoped_to_current_teams():
"""
Test that created_by filter is scoped to teams user currently belongs to.
A former team member should NOT see service accounts they created for
a team they've left.
"""
from litellm.proxy.management_endpoints.key_management_endpoints import (
_build_key_filter_conditions,
)
user_id = "user-456"
# User is currently only a member of team-A (left team-B)
member_team_ids = ["team-A"]
where = _build_key_filter_conditions(
user_id=user_id,
team_id=None,
organization_id=None,
key_alias=None,
key_hash=None,
exclude_team_id=None,
admin_team_ids=None,
member_team_ids=member_team_ids,
include_created_by_keys=True,
)
assert "AND" in where
or_conditions = where["AND"][1]["OR"]
# Find the created_by condition
created_by_cond = None
for cond in or_conditions:
if "AND" in cond:
and_parts = cond["AND"]
for part in and_parts:
if isinstance(part, dict) and "created_by" in part:
created_by_cond = cond
break
assert created_by_cond is not None, "Created by condition should be present"
# created_by should be scoped: created_by=user AND (team_id in [team-A] OR team_id=None)
and_parts = created_by_cond["AND"]
assert {"created_by": user_id} in and_parts
# Find the OR part that scopes to current teams
team_scope = None
for part in and_parts:
if isinstance(part, dict) and "OR" in part:
team_scope = part["OR"]
assert team_scope is not None, "Team scope OR condition should be present"
assert {"team_id": {"in": member_team_ids}} in team_scope
assert {"team_id": None} in team_scope
@pytest.mark.asyncio
async def test_build_key_filter_created_by_no_teams():
"""
Test that when user has no team memberships (empty list), created_by
only returns non-team keys (personal keys).
"""
from litellm.proxy.management_endpoints.key_management_endpoints import (
_build_key_filter_conditions,
)
user_id = "user-no-teams"
member_team_ids = [] # User has no team memberships
where = _build_key_filter_conditions(
user_id=user_id,
team_id=None,
organization_id=None,
key_alias=None,
key_hash=None,
exclude_team_id=None,
admin_team_ids=None,
member_team_ids=member_team_ids,
include_created_by_keys=True,
)
assert "AND" in where
or_conditions = where["AND"][1]["OR"]
# Find the created_by condition
created_by_cond = None
for cond in or_conditions:
if "AND" in cond:
and_parts = cond["AND"]
for part in and_parts:
if isinstance(part, dict) and "created_by" in part:
created_by_cond = cond
break
assert created_by_cond is not None
and_parts = created_by_cond["AND"]
assert {"created_by": user_id} in and_parts
assert {"team_id": None} in and_parts
# Should NOT have an OR with team_id in [] - just a simple team_id=None
for part in and_parts:
if isinstance(part, dict) and "OR" in part:
pytest.fail("Should not have OR condition when member_team_ids is empty")
@pytest.mark.asyncio
async def test_build_key_filter_backward_compat_no_member_team_ids():
"""
Test backward compatibility: when member_team_ids is None (not provided),
created_by filter should use the old unrestricted behavior.
This ensures direct callers of _list_key_helper (like Prometheus) still work.
"""
from litellm.proxy.management_endpoints.key_management_endpoints import (
_build_key_filter_conditions,
)
user_id = "user-789"
where = _build_key_filter_conditions(
user_id=user_id,
team_id=None,
organization_id=None,
key_alias=None,
key_hash=None,
exclude_team_id=None,
admin_team_ids=None,
member_team_ids=None, # Not provided
include_created_by_keys=True,
)
assert "AND" in where
or_conditions = where["AND"][1]["OR"]
# Find the created_by condition - should be simple {"created_by": user_id}
created_by_cond = None
for cond in or_conditions:
if "created_by" in cond:
created_by_cond = cond
assert created_by_cond is not None
assert created_by_cond == {"created_by": user_id}
assert len(created_by_cond) == 1, "Should be simple created_by without team scoping"
@pytest.mark.asyncio
async def test_build_key_filter_admin_all_member_overlap():
"""
Test that when user is admin of ALL teams they belong to,
no member-only service account condition is added (would be redundant).
"""
from litellm.proxy.management_endpoints.key_management_endpoints import (
_build_key_filter_conditions,
)
user_id = "admin-all"
admin_team_ids = ["team-A", "team-B"]
member_team_ids = ["team-A", "team-B"]
where = _build_key_filter_conditions(
user_id=user_id,
team_id=None,
organization_id=None,
key_alias=None,
key_hash=None,
exclude_team_id=None,
admin_team_ids=admin_team_ids,
member_team_ids=member_team_ids,
include_created_by_keys=False,
)
assert "AND" in where
or_conditions = where["AND"][1]["OR"]
# Should only have 2 conditions: user's own keys + admin team keys
# No member-only service account condition since all teams are admin
assert len(or_conditions) == 2
# Verify no AND condition with user_id=None exists (that's the member-only pattern)
for cond in or_conditions:
if "AND" in cond:
and_parts = cond["AND"]
if {"user_id": None} in and_parts:
pytest.fail(
"Should not have member-only service account condition "
"when user is admin of all teams"
)
@pytest.mark.asyncio
async def test_get_member_team_ids():
"""
Test that get_member_team_ids returns all teams where user is a member
(any role), not just admin teams.
"""
from litellm.proxy.management_endpoints.key_management_endpoints import (
get_member_team_ids,
)
user_id = "member-user-123"
# Create mock user info with teams
user_info = LiteLLM_UserTable(
user_id=user_id,
teams=["team-A", "team-B", "team-C"],
)
user_api_key_dict = UserAPIKeyAuth(
user_role=LitellmUserRoles.INTERNAL_USER,
api_key="sk-test",
user_id=user_id,
)
# Mock prisma client
mock_prisma_client = AsyncMock()
# Create mock team objects - user is admin of team-A, member of team-B, not in team-C's members list
mock_team_a = MagicMock()
mock_team_a.model_dump.return_value = {
"team_id": "team-A",
"team_alias": "Team A",
"members_with_roles": [
{"user_id": user_id, "role": "admin", "user_email": None}
],
"max_budget": None,
"budget_duration": None,
"budget_reset_at": None,
"tpm_limit": None,
"rpm_limit": None,
"models": [],
"blocked": False,
}
mock_team_b = MagicMock()
mock_team_b.model_dump.return_value = {
"team_id": "team-B",
"team_alias": "Team B",
"members_with_roles": [
{"user_id": user_id, "role": "user", "user_email": None}
],
"max_budget": None,
"budget_duration": None,
"budget_reset_at": None,
"tpm_limit": None,
"rpm_limit": None,
"models": [],
"blocked": False,
}
mock_team_c = MagicMock()
mock_team_c.model_dump.return_value = {
"team_id": "team-C",
"team_alias": "Team C",
"members_with_roles": [
{"user_id": "other-user", "role": "admin", "user_email": None}
],
"max_budget": None,
"budget_duration": None,
"budget_reset_at": None,
"tpm_limit": None,
"rpm_limit": None,
"models": [],
"blocked": False,
}
mock_prisma_client.db.litellm_teamtable.find_many = AsyncMock(
return_value=[mock_team_a, mock_team_b, mock_team_c]
)
result = await get_member_team_ids(
complete_user_info=user_info,
user_api_key_dict=user_api_key_dict,
prisma_client=mock_prisma_client,
)
# Should return team-A and team-B (user is a member of both)
# Should NOT return team-C (user is not in members list)
assert sorted(result) == ["team-A", "team-B"]