From c7fe400d4deb54918f2d42bfaa431a9ec89730c6 Mon Sep 17 00:00:00 2001 From: yuneng-jiang Date: Thu, 19 Feb 2026 16:29:43 -0800 Subject: [PATCH 1/2] [Fix] Service account visibility for team members Regular team members could not see service accounts belonging to their team. Additionally, former team members could still see service accounts they created via the created_by filter after leaving the team. - Add get_member_team_ids() to retrieve all teams a user belongs to - Regular members now see team service accounts (user_id=NULL) but not other members' personal keys - Scope created_by filter to the user's current team memberships - Add 7 unit tests covering the new visibility rules --- .../key_management_endpoints.py | 120 +++++- .../test_key_management_endpoints.py | 377 ++++++++++++++++++ 2 files changed, 492 insertions(+), 5 deletions(-) diff --git a/litellm/proxy/management_endpoints/key_management_endpoints.py b/litellm/proxy/management_endpoints/key_management_endpoints.py index 1a02120cb6..301cfd1d53 100644 --- a/litellm/proxy/management_endpoints/key_management_endpoints.py +++ b/litellm/proxy/management_endpoints/key_management_endpoints.py @@ -3898,6 +3898,47 @@ async def get_admin_team_ids( return admin_team_ids +async def get_member_team_ids( + complete_user_info: Optional[LiteLLM_UserTable], + user_api_key_dict: UserAPIKeyAuth, + prisma_client: PrismaClient, +) -> List[str]: + """ + Get all team IDs where the user is a member (any role, including admin). + + Used to determine which teams' service accounts (keys with user_id=NULL) + a regular team member can see. + """ + if complete_user_info is None: + return [] + + if not complete_user_info.teams: + return [] + + # Get all teams that user belongs to + teams: Optional[ + List[BaseModel] + ] = await prisma_client.db.litellm_teamtable.find_many( + where={"team_id": {"in": complete_user_info.teams}} + ) + if teams is None: + return [] + + teams_pydantic_obj = [LiteLLM_TeamTable(**team.model_dump()) for team in teams] + + # Return team IDs where the user is a member (any role) + member_team_ids = [ + team.team_id + for team in teams_pydantic_obj + if any( + member.user_id is not None + and member.user_id == user_api_key_dict.user_id + for member in team.members_with_roles + ) + ] + return member_team_ids + + @router.get( "/key/list", tags=["key management"], @@ -3990,6 +4031,18 @@ async def list_keys( else: admin_team_ids = None + # Compute member_team_ids when either include_team_keys or + # include_created_by_keys is True. This ensures created_by visibility + # is scoped to teams the user currently belongs to. + if include_team_keys or include_created_by_keys: + member_team_ids = await get_member_team_ids( + complete_user_info=complete_user_info, + user_api_key_dict=user_api_key_dict, + prisma_client=prisma_client, + ) + else: + member_team_ids = None + if not user_id and user_api_key_dict.user_role not in [ LitellmUserRoles.PROXY_ADMIN.value, LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY.value, @@ -4007,6 +4060,7 @@ async def list_keys( return_full_object=return_full_object, organization_id=organization_id, admin_team_ids=admin_team_ids, + member_team_ids=member_team_ids, include_created_by_keys=include_created_by_keys, sort_by=sort_by, sort_order=sort_order, @@ -4157,9 +4211,19 @@ def _build_key_filter_conditions( key_hash: Optional[str], exclude_team_id: Optional[str], admin_team_ids: Optional[List[str]], - include_created_by_keys: bool, + member_team_ids: Optional[List[str]] = None, + include_created_by_keys: bool = False, ) -> Dict[str, Union[str, Dict[str, Any], List[Dict[str, Any]]]]: - """Build filter conditions for key listing.""" + """Build filter conditions for key listing. + + Visibility rules: + - Users always see their own keys (user_id match) + - Team admins see ALL keys for their admin teams (via admin_team_ids) + - Regular team members see only service accounts (user_id=NULL) for their + teams (via member_team_ids). This prevents leaking other members' spend data. + - created_by visibility is scoped to teams the user currently belongs to, + so former members cannot see service accounts they created after leaving. + """ # Prepare filter conditions where: Dict[str, Union[str, Dict[str, Any], List[Dict[str, Any]]]] = {} where.update(_get_condition_to_filter_out_ui_session_tokens()) @@ -4185,14 +4249,55 @@ def _build_key_filter_conditions( if user_condition: or_conditions.append(user_condition) - # Add condition for created by keys if provided + # Add condition for created_by keys, scoped to user's current teams if include_created_by_keys and user_id: - or_conditions.append({"created_by": user_id}) + if member_team_ids is not None: + if member_team_ids: + # Scope created_by keys to teams user is still a member of, + # or keys that have no team (personal keys) + or_conditions.append( + { + "AND": [ + {"created_by": user_id}, + { + "OR": [ + {"team_id": {"in": member_team_ids}}, + {"team_id": None}, + ] + }, + ] + } + ) + else: + # User is not a member of any team, only show non-team created_by keys + or_conditions.append( + {"AND": [{"created_by": user_id}, {"team_id": None}]} + ) + else: + # No team membership info provided (backward compatibility for + # direct _list_key_helper callers like Prometheus) + or_conditions.append({"created_by": user_id}) - # Add condition for admin team keys if provided + # Add condition for admin team keys (admins see ALL team keys) if admin_team_ids: or_conditions.append({"team_id": {"in": admin_team_ids}}) + # Add condition for member team service accounts (members only see keys with user_id=NULL) + if member_team_ids: + # Exclude teams where user is already admin (those are covered above with full visibility) + member_only_team_ids = [ + tid for tid in member_team_ids if tid not in (admin_team_ids or []) + ] + if member_only_team_ids: + or_conditions.append( + { + "AND": [ + {"team_id": {"in": member_only_team_ids}}, + {"user_id": None}, + ] + } + ) + # Combine conditions with OR if we have multiple conditions if len(or_conditions) > 1: where = {"AND": [where, {"OR": or_conditions}]} @@ -4217,6 +4322,9 @@ async def _list_key_helper( admin_team_ids: Optional[ List[str] ] = None, # New parameter for teams where user is admin + member_team_ids: Optional[ + List[str] + ] = None, # Team IDs where user is a member (any role) - for service account visibility include_created_by_keys: bool = False, sort_by: Optional[str] = None, sort_order: str = "desc", @@ -4234,6 +4342,7 @@ async def _list_key_helper( exclude_team_id: Optional[str] # exclude a specific team_id return_full_object: bool # when true, will return UserAPIKeyAuth objects instead of just the token admin_team_ids: Optional[List[str]] # list of team IDs where the user is an admin + member_team_ids: Optional[List[str]] # list of team IDs where user is a member (for service account visibility) Returns: KeyListResponseObject @@ -4252,6 +4361,7 @@ async def _list_key_helper( key_hash=key_hash, exclude_team_id=exclude_team_id, admin_team_ids=admin_team_ids, + member_team_ids=member_team_ids, include_created_by_keys=include_created_by_keys, ) diff --git a/tests/test_litellm/proxy/management_endpoints/test_key_management_endpoints.py b/tests/test_litellm/proxy/management_endpoints/test_key_management_endpoints.py index efa7d27ec4..ffb4e95542 100644 --- a/tests/test_litellm/proxy/management_endpoints/test_key_management_endpoints.py +++ b/tests/test_litellm/proxy/management_endpoints/test_key_management_endpoints.py @@ -5780,3 +5780,380 @@ async def test_default_key_generate_params_duration(monkeypatch): assert request.duration == "180d" finally: litellm.default_key_generate_params = original_value + + +@pytest.mark.asyncio +async def test_build_key_filter_member_team_service_accounts(): + """ + Test that regular team members can see service accounts (user_id=NULL) + for their teams, but NOT other members' personal keys. + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + _build_key_filter_conditions, + ) + + user_id = "regular-member-123" + member_team_ids = ["team-A", "team-B"] + + where = _build_key_filter_conditions( + user_id=user_id, + team_id=None, + organization_id=None, + key_alias=None, + key_hash=None, + exclude_team_id=None, + admin_team_ids=None, + member_team_ids=member_team_ids, + include_created_by_keys=False, + ) + + # Should have AND with OR conditions + assert "AND" in where + or_conditions = where["AND"][1]["OR"] + + # Should have 2 conditions: user's own keys + member team service accounts + assert len(or_conditions) == 2 + + # First: user's own keys + user_cond = or_conditions[0] + assert user_cond["user_id"] == user_id + + # Second: service accounts for member teams (user_id=None AND team_id in member teams) + service_account_cond = or_conditions[1] + assert "AND" in service_account_cond + and_parts = service_account_cond["AND"] + assert {"team_id": {"in": member_team_ids}} in and_parts + assert {"user_id": None} in and_parts + + +@pytest.mark.asyncio +async def test_build_key_filter_admin_sees_all_team_keys(): + """ + Test that team admins see ALL keys for their teams (not just service accounts), + and that member_team_ids doesn't duplicate admin teams. + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + _build_key_filter_conditions, + ) + + user_id = "admin-user-123" + admin_team_ids = ["team-A"] + member_team_ids = ["team-A", "team-B"] + + where = _build_key_filter_conditions( + user_id=user_id, + team_id=None, + organization_id=None, + key_alias=None, + key_hash=None, + exclude_team_id=None, + admin_team_ids=admin_team_ids, + member_team_ids=member_team_ids, + include_created_by_keys=False, + ) + + assert "AND" in where + or_conditions = where["AND"][1]["OR"] + + # Should have 3 conditions: + # 1. user's own keys + # 2. admin team keys (all keys for team-A) + # 3. member-only service accounts (only service accounts for team-B, since team-A is already covered by admin) + assert len(or_conditions) == 3 + + # Find admin condition + admin_cond = None + service_account_cond = None + for cond in or_conditions: + if isinstance(cond.get("team_id"), dict) and "in" in cond.get("team_id", {}): + admin_cond = cond + elif "AND" in cond: + service_account_cond = cond + + assert admin_cond is not None, "Admin team condition should be present" + assert admin_cond["team_id"]["in"] == admin_team_ids + + # member-only condition should only include team-B (team-A is covered by admin) + assert service_account_cond is not None, "Service account condition should be present" + and_parts = service_account_cond["AND"] + assert {"team_id": {"in": ["team-B"]}} in and_parts + assert {"user_id": None} in and_parts + + +@pytest.mark.asyncio +async def test_build_key_filter_created_by_scoped_to_current_teams(): + """ + Test that created_by filter is scoped to teams user currently belongs to. + A former team member should NOT see service accounts they created for + a team they've left. + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + _build_key_filter_conditions, + ) + + user_id = "user-456" + # User is currently only a member of team-A (left team-B) + member_team_ids = ["team-A"] + + where = _build_key_filter_conditions( + user_id=user_id, + team_id=None, + organization_id=None, + key_alias=None, + key_hash=None, + exclude_team_id=None, + admin_team_ids=None, + member_team_ids=member_team_ids, + include_created_by_keys=True, + ) + + assert "AND" in where + or_conditions = where["AND"][1]["OR"] + + # Find the created_by condition + created_by_cond = None + for cond in or_conditions: + if "AND" in cond: + and_parts = cond["AND"] + for part in and_parts: + if isinstance(part, dict) and "created_by" in part: + created_by_cond = cond + break + + assert created_by_cond is not None, "Created by condition should be present" + + # created_by should be scoped: created_by=user AND (team_id in [team-A] OR team_id=None) + and_parts = created_by_cond["AND"] + assert {"created_by": user_id} in and_parts + + # Find the OR part that scopes to current teams + team_scope = None + for part in and_parts: + if isinstance(part, dict) and "OR" in part: + team_scope = part["OR"] + + assert team_scope is not None, "Team scope OR condition should be present" + assert {"team_id": {"in": member_team_ids}} in team_scope + assert {"team_id": None} in team_scope + + +@pytest.mark.asyncio +async def test_build_key_filter_created_by_no_teams(): + """ + Test that when user has no team memberships (empty list), created_by + only returns non-team keys (personal keys). + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + _build_key_filter_conditions, + ) + + user_id = "user-no-teams" + member_team_ids = [] # User has no team memberships + + where = _build_key_filter_conditions( + user_id=user_id, + team_id=None, + organization_id=None, + key_alias=None, + key_hash=None, + exclude_team_id=None, + admin_team_ids=None, + member_team_ids=member_team_ids, + include_created_by_keys=True, + ) + + assert "AND" in where + or_conditions = where["AND"][1]["OR"] + + # Find the created_by condition + created_by_cond = None + for cond in or_conditions: + if "AND" in cond: + and_parts = cond["AND"] + for part in and_parts: + if isinstance(part, dict) and "created_by" in part: + created_by_cond = cond + break + + assert created_by_cond is not None + and_parts = created_by_cond["AND"] + assert {"created_by": user_id} in and_parts + assert {"team_id": None} in and_parts + # Should NOT have an OR with team_id in [] - just a simple team_id=None + for part in and_parts: + if isinstance(part, dict) and "OR" in part: + pytest.fail("Should not have OR condition when member_team_ids is empty") + + +@pytest.mark.asyncio +async def test_build_key_filter_backward_compat_no_member_team_ids(): + """ + Test backward compatibility: when member_team_ids is None (not provided), + created_by filter should use the old unrestricted behavior. + This ensures direct callers of _list_key_helper (like Prometheus) still work. + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + _build_key_filter_conditions, + ) + + user_id = "user-789" + + where = _build_key_filter_conditions( + user_id=user_id, + team_id=None, + organization_id=None, + key_alias=None, + key_hash=None, + exclude_team_id=None, + admin_team_ids=None, + member_team_ids=None, # Not provided + include_created_by_keys=True, + ) + + assert "AND" in where + or_conditions = where["AND"][1]["OR"] + + # Find the created_by condition - should be simple {"created_by": user_id} + created_by_cond = None + for cond in or_conditions: + if "created_by" in cond: + created_by_cond = cond + + assert created_by_cond is not None + assert created_by_cond == {"created_by": user_id} + assert len(created_by_cond) == 1, "Should be simple created_by without team scoping" + + +@pytest.mark.asyncio +async def test_build_key_filter_admin_all_member_overlap(): + """ + Test that when user is admin of ALL teams they belong to, + no member-only service account condition is added (would be redundant). + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + _build_key_filter_conditions, + ) + + user_id = "admin-all" + admin_team_ids = ["team-A", "team-B"] + member_team_ids = ["team-A", "team-B"] + + where = _build_key_filter_conditions( + user_id=user_id, + team_id=None, + organization_id=None, + key_alias=None, + key_hash=None, + exclude_team_id=None, + admin_team_ids=admin_team_ids, + member_team_ids=member_team_ids, + include_created_by_keys=False, + ) + + assert "AND" in where + or_conditions = where["AND"][1]["OR"] + + # Should only have 2 conditions: user's own keys + admin team keys + # No member-only service account condition since all teams are admin + assert len(or_conditions) == 2 + + # Verify no AND condition with user_id=None exists (that's the member-only pattern) + for cond in or_conditions: + if "AND" in cond: + and_parts = cond["AND"] + if {"user_id": None} in and_parts: + pytest.fail( + "Should not have member-only service account condition " + "when user is admin of all teams" + ) + + +@pytest.mark.asyncio +async def test_get_member_team_ids(): + """ + Test that get_member_team_ids returns all teams where user is a member + (any role), not just admin teams. + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + get_member_team_ids, + ) + + user_id = "member-user-123" + + # Create mock user info with teams + user_info = LiteLLM_UserTable( + user_id=user_id, + teams=["team-A", "team-B", "team-C"], + ) + + user_api_key_dict = UserAPIKeyAuth( + user_role=LitellmUserRoles.INTERNAL_USER, + api_key="sk-test", + user_id=user_id, + ) + + # Mock prisma client + mock_prisma_client = AsyncMock() + + # Create mock team objects - user is admin of team-A, member of team-B, not in team-C's members list + mock_team_a = MagicMock() + mock_team_a.model_dump.return_value = { + "team_id": "team-A", + "team_alias": "Team A", + "members_with_roles": [ + {"user_id": user_id, "role": "admin", "user_email": None} + ], + "max_budget": None, + "budget_duration": None, + "budget_reset_at": None, + "tpm_limit": None, + "rpm_limit": None, + "models": [], + "blocked": False, + } + + mock_team_b = MagicMock() + mock_team_b.model_dump.return_value = { + "team_id": "team-B", + "team_alias": "Team B", + "members_with_roles": [ + {"user_id": user_id, "role": "user", "user_email": None} + ], + "max_budget": None, + "budget_duration": None, + "budget_reset_at": None, + "tpm_limit": None, + "rpm_limit": None, + "models": [], + "blocked": False, + } + + mock_team_c = MagicMock() + mock_team_c.model_dump.return_value = { + "team_id": "team-C", + "team_alias": "Team C", + "members_with_roles": [ + {"user_id": "other-user", "role": "admin", "user_email": None} + ], + "max_budget": None, + "budget_duration": None, + "budget_reset_at": None, + "tpm_limit": None, + "rpm_limit": None, + "models": [], + "blocked": False, + } + + mock_prisma_client.db.litellm_teamtable.find_many = AsyncMock( + return_value=[mock_team_a, mock_team_b, mock_team_c] + ) + + result = await get_member_team_ids( + complete_user_info=user_info, + user_api_key_dict=user_api_key_dict, + prisma_client=mock_prisma_client, + ) + + # Should return team-A and team-B (user is a member of both) + # Should NOT return team-C (user is not in members list) + assert sorted(result) == ["team-A", "team-B"] From 4475258426d0e32efb954afdc8f1c0aff5480bc3 Mon Sep 17 00:00:00 2001 From: yuneng-jiang Date: Thu, 19 Feb 2026 16:53:48 -0800 Subject: [PATCH 2/2] address greptile review feedback (greploop iteration 1) Refactor to fetch team objects once via _fetch_user_team_objects(), then derive admin and member team IDs from the shared result. Eliminates duplicate DB query between get_admin_team_ids and get_member_team_ids. --- .../key_management_endpoints.py | 114 +++++++++--------- 1 file changed, 59 insertions(+), 55 deletions(-) diff --git a/litellm/proxy/management_endpoints/key_management_endpoints.py b/litellm/proxy/management_endpoints/key_management_endpoints.py index 301cfd1d53..7e1776714f 100644 --- a/litellm/proxy/management_endpoints/key_management_endpoints.py +++ b/litellm/proxy/management_endpoints/key_management_endpoints.py @@ -3869,17 +3869,14 @@ async def validate_key_list_check( return complete_user_info -async def get_admin_team_ids( +async def _fetch_user_team_objects( complete_user_info: Optional[LiteLLM_UserTable], - user_api_key_dict: UserAPIKeyAuth, prisma_client: PrismaClient, -) -> List[str]: - """ - Get all team IDs where the user is an admin. - """ - if complete_user_info is None: +) -> List[LiteLLM_TeamTable]: + """Fetch team objects for all teams a user belongs to (single DB query).""" + if complete_user_info is None or not complete_user_info.teams: return [] - # Get all teams that user is an admin of + teams: Optional[ List[BaseModel] ] = await prisma_client.db.litellm_teamtable.find_many( @@ -3888,14 +3885,45 @@ async def get_admin_team_ids( if teams is None: return [] - teams_pydantic_obj = [LiteLLM_TeamTable(**team.model_dump()) for team in teams] + return [LiteLLM_TeamTable(**team.model_dump()) for team in teams] - admin_team_ids = [ + +def _get_admin_team_ids_from_objects( + user_api_key_dict: UserAPIKeyAuth, + team_objects: List[LiteLLM_TeamTable], +) -> List[str]: + """Filter team objects to those where the user is an admin.""" + return [ team.team_id - for team in teams_pydantic_obj + for team in team_objects if _is_user_team_admin(user_api_key_dict=user_api_key_dict, team_obj=team) ] - return admin_team_ids + + +def _get_member_team_ids_from_objects( + user_api_key_dict: UserAPIKeyAuth, + team_objects: List[LiteLLM_TeamTable], +) -> List[str]: + """Filter team objects to those where the user is a member (any role).""" + return [ + team.team_id + for team in team_objects + if any( + member.user_id is not None + and member.user_id == user_api_key_dict.user_id + for member in team.members_with_roles + ) + ] + + +async def get_admin_team_ids( + complete_user_info: Optional[LiteLLM_UserTable], + user_api_key_dict: UserAPIKeyAuth, + prisma_client: PrismaClient, +) -> List[str]: + """Get all team IDs where the user is an admin.""" + team_objects = await _fetch_user_team_objects(complete_user_info, prisma_client) + return _get_admin_team_ids_from_objects(user_api_key_dict, team_objects) async def get_member_team_ids( @@ -3909,34 +3937,8 @@ async def get_member_team_ids( Used to determine which teams' service accounts (keys with user_id=NULL) a regular team member can see. """ - if complete_user_info is None: - return [] - - if not complete_user_info.teams: - return [] - - # Get all teams that user belongs to - teams: Optional[ - List[BaseModel] - ] = await prisma_client.db.litellm_teamtable.find_many( - where={"team_id": {"in": complete_user_info.teams}} - ) - if teams is None: - return [] - - teams_pydantic_obj = [LiteLLM_TeamTable(**team.model_dump()) for team in teams] - - # Return team IDs where the user is a member (any role) - member_team_ids = [ - team.team_id - for team in teams_pydantic_obj - if any( - member.user_id is not None - and member.user_id == user_api_key_dict.user_id - for member in team.members_with_roles - ) - ] - return member_team_ids + team_objects = await _fetch_user_team_objects(complete_user_info, prisma_client) + return _get_member_team_ids_from_objects(user_api_key_dict, team_objects) @router.get( @@ -4022,27 +4024,29 @@ async def list_keys( prisma_client=prisma_client, ) - if include_team_keys: - admin_team_ids = await get_admin_team_ids( + # Fetch team objects once when needed for either admin or member filtering. + # This avoids duplicate DB queries for the same team data. + if include_team_keys or include_created_by_keys: + team_objects = await _fetch_user_team_objects( complete_user_info=complete_user_info, - user_api_key_dict=user_api_key_dict, prisma_client=prisma_client, ) + member_team_ids = _get_member_team_ids_from_objects( + user_api_key_dict=user_api_key_dict, + team_objects=team_objects, + ) + else: + team_objects = [] + member_team_ids = None + + if include_team_keys: + admin_team_ids = _get_admin_team_ids_from_objects( + user_api_key_dict=user_api_key_dict, + team_objects=team_objects, + ) else: admin_team_ids = None - # Compute member_team_ids when either include_team_keys or - # include_created_by_keys is True. This ensures created_by visibility - # is scoped to teams the user currently belongs to. - if include_team_keys or include_created_by_keys: - member_team_ids = await get_member_team_ids( - complete_user_info=complete_user_info, - user_api_key_dict=user_api_key_dict, - prisma_client=prisma_client, - ) - else: - member_team_ids = None - if not user_id and user_api_key_dict.user_role not in [ LitellmUserRoles.PROXY_ADMIN.value, LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY.value,