diff --git a/litellm/proxy/management_endpoints/key_management_endpoints.py b/litellm/proxy/management_endpoints/key_management_endpoints.py index 1a02120cb6..7e1776714f 100644 --- a/litellm/proxy/management_endpoints/key_management_endpoints.py +++ b/litellm/proxy/management_endpoints/key_management_endpoints.py @@ -3869,17 +3869,14 @@ async def validate_key_list_check( return complete_user_info -async def get_admin_team_ids( +async def _fetch_user_team_objects( complete_user_info: Optional[LiteLLM_UserTable], - user_api_key_dict: UserAPIKeyAuth, prisma_client: PrismaClient, -) -> List[str]: - """ - Get all team IDs where the user is an admin. - """ - if complete_user_info is None: +) -> List[LiteLLM_TeamTable]: + """Fetch team objects for all teams a user belongs to (single DB query).""" + if complete_user_info is None or not complete_user_info.teams: return [] - # Get all teams that user is an admin of + teams: Optional[ List[BaseModel] ] = await prisma_client.db.litellm_teamtable.find_many( @@ -3888,14 +3885,60 @@ async def get_admin_team_ids( if teams is None: return [] - teams_pydantic_obj = [LiteLLM_TeamTable(**team.model_dump()) for team in teams] + return [LiteLLM_TeamTable(**team.model_dump()) for team in teams] - admin_team_ids = [ + +def _get_admin_team_ids_from_objects( + user_api_key_dict: UserAPIKeyAuth, + team_objects: List[LiteLLM_TeamTable], +) -> List[str]: + """Filter team objects to those where the user is an admin.""" + return [ team.team_id - for team in teams_pydantic_obj + for team in team_objects if _is_user_team_admin(user_api_key_dict=user_api_key_dict, team_obj=team) ] - return admin_team_ids + + +def _get_member_team_ids_from_objects( + user_api_key_dict: UserAPIKeyAuth, + team_objects: List[LiteLLM_TeamTable], +) -> List[str]: + """Filter team objects to those where the user is a member (any role).""" + return [ + team.team_id + for team in team_objects + if any( + member.user_id is not None + and member.user_id == user_api_key_dict.user_id + for member in team.members_with_roles + ) + ] + + +async def get_admin_team_ids( + complete_user_info: Optional[LiteLLM_UserTable], + user_api_key_dict: UserAPIKeyAuth, + prisma_client: PrismaClient, +) -> List[str]: + """Get all team IDs where the user is an admin.""" + team_objects = await _fetch_user_team_objects(complete_user_info, prisma_client) + return _get_admin_team_ids_from_objects(user_api_key_dict, team_objects) + + +async def get_member_team_ids( + complete_user_info: Optional[LiteLLM_UserTable], + user_api_key_dict: UserAPIKeyAuth, + prisma_client: PrismaClient, +) -> List[str]: + """ + Get all team IDs where the user is a member (any role, including admin). + + Used to determine which teams' service accounts (keys with user_id=NULL) + a regular team member can see. + """ + team_objects = await _fetch_user_team_objects(complete_user_info, prisma_client) + return _get_member_team_ids_from_objects(user_api_key_dict, team_objects) @router.get( @@ -3981,12 +4024,26 @@ async def list_keys( prisma_client=prisma_client, ) - if include_team_keys: - admin_team_ids = await get_admin_team_ids( + # Fetch team objects once when needed for either admin or member filtering. + # This avoids duplicate DB queries for the same team data. + if include_team_keys or include_created_by_keys: + team_objects = await _fetch_user_team_objects( complete_user_info=complete_user_info, - user_api_key_dict=user_api_key_dict, prisma_client=prisma_client, ) + member_team_ids = _get_member_team_ids_from_objects( + user_api_key_dict=user_api_key_dict, + team_objects=team_objects, + ) + else: + team_objects = [] + member_team_ids = None + + if include_team_keys: + admin_team_ids = _get_admin_team_ids_from_objects( + user_api_key_dict=user_api_key_dict, + team_objects=team_objects, + ) else: admin_team_ids = None @@ -4007,6 +4064,7 @@ async def list_keys( return_full_object=return_full_object, organization_id=organization_id, admin_team_ids=admin_team_ids, + member_team_ids=member_team_ids, include_created_by_keys=include_created_by_keys, sort_by=sort_by, sort_order=sort_order, @@ -4157,9 +4215,19 @@ def _build_key_filter_conditions( key_hash: Optional[str], exclude_team_id: Optional[str], admin_team_ids: Optional[List[str]], - include_created_by_keys: bool, + member_team_ids: Optional[List[str]] = None, + include_created_by_keys: bool = False, ) -> Dict[str, Union[str, Dict[str, Any], List[Dict[str, Any]]]]: - """Build filter conditions for key listing.""" + """Build filter conditions for key listing. + + Visibility rules: + - Users always see their own keys (user_id match) + - Team admins see ALL keys for their admin teams (via admin_team_ids) + - Regular team members see only service accounts (user_id=NULL) for their + teams (via member_team_ids). This prevents leaking other members' spend data. + - created_by visibility is scoped to teams the user currently belongs to, + so former members cannot see service accounts they created after leaving. + """ # Prepare filter conditions where: Dict[str, Union[str, Dict[str, Any], List[Dict[str, Any]]]] = {} where.update(_get_condition_to_filter_out_ui_session_tokens()) @@ -4185,14 +4253,55 @@ def _build_key_filter_conditions( if user_condition: or_conditions.append(user_condition) - # Add condition for created by keys if provided + # Add condition for created_by keys, scoped to user's current teams if include_created_by_keys and user_id: - or_conditions.append({"created_by": user_id}) + if member_team_ids is not None: + if member_team_ids: + # Scope created_by keys to teams user is still a member of, + # or keys that have no team (personal keys) + or_conditions.append( + { + "AND": [ + {"created_by": user_id}, + { + "OR": [ + {"team_id": {"in": member_team_ids}}, + {"team_id": None}, + ] + }, + ] + } + ) + else: + # User is not a member of any team, only show non-team created_by keys + or_conditions.append( + {"AND": [{"created_by": user_id}, {"team_id": None}]} + ) + else: + # No team membership info provided (backward compatibility for + # direct _list_key_helper callers like Prometheus) + or_conditions.append({"created_by": user_id}) - # Add condition for admin team keys if provided + # Add condition for admin team keys (admins see ALL team keys) if admin_team_ids: or_conditions.append({"team_id": {"in": admin_team_ids}}) + # Add condition for member team service accounts (members only see keys with user_id=NULL) + if member_team_ids: + # Exclude teams where user is already admin (those are covered above with full visibility) + member_only_team_ids = [ + tid for tid in member_team_ids if tid not in (admin_team_ids or []) + ] + if member_only_team_ids: + or_conditions.append( + { + "AND": [ + {"team_id": {"in": member_only_team_ids}}, + {"user_id": None}, + ] + } + ) + # Combine conditions with OR if we have multiple conditions if len(or_conditions) > 1: where = {"AND": [where, {"OR": or_conditions}]} @@ -4217,6 +4326,9 @@ async def _list_key_helper( admin_team_ids: Optional[ List[str] ] = None, # New parameter for teams where user is admin + member_team_ids: Optional[ + List[str] + ] = None, # Team IDs where user is a member (any role) - for service account visibility include_created_by_keys: bool = False, sort_by: Optional[str] = None, sort_order: str = "desc", @@ -4234,6 +4346,7 @@ async def _list_key_helper( exclude_team_id: Optional[str] # exclude a specific team_id return_full_object: bool # when true, will return UserAPIKeyAuth objects instead of just the token admin_team_ids: Optional[List[str]] # list of team IDs where the user is an admin + member_team_ids: Optional[List[str]] # list of team IDs where user is a member (for service account visibility) Returns: KeyListResponseObject @@ -4252,6 +4365,7 @@ async def _list_key_helper( key_hash=key_hash, exclude_team_id=exclude_team_id, admin_team_ids=admin_team_ids, + member_team_ids=member_team_ids, include_created_by_keys=include_created_by_keys, ) diff --git a/tests/test_litellm/proxy/management_endpoints/test_key_management_endpoints.py b/tests/test_litellm/proxy/management_endpoints/test_key_management_endpoints.py index efa7d27ec4..ffb4e95542 100644 --- a/tests/test_litellm/proxy/management_endpoints/test_key_management_endpoints.py +++ b/tests/test_litellm/proxy/management_endpoints/test_key_management_endpoints.py @@ -5780,3 +5780,380 @@ async def test_default_key_generate_params_duration(monkeypatch): assert request.duration == "180d" finally: litellm.default_key_generate_params = original_value + + +@pytest.mark.asyncio +async def test_build_key_filter_member_team_service_accounts(): + """ + Test that regular team members can see service accounts (user_id=NULL) + for their teams, but NOT other members' personal keys. + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + _build_key_filter_conditions, + ) + + user_id = "regular-member-123" + member_team_ids = ["team-A", "team-B"] + + where = _build_key_filter_conditions( + user_id=user_id, + team_id=None, + organization_id=None, + key_alias=None, + key_hash=None, + exclude_team_id=None, + admin_team_ids=None, + member_team_ids=member_team_ids, + include_created_by_keys=False, + ) + + # Should have AND with OR conditions + assert "AND" in where + or_conditions = where["AND"][1]["OR"] + + # Should have 2 conditions: user's own keys + member team service accounts + assert len(or_conditions) == 2 + + # First: user's own keys + user_cond = or_conditions[0] + assert user_cond["user_id"] == user_id + + # Second: service accounts for member teams (user_id=None AND team_id in member teams) + service_account_cond = or_conditions[1] + assert "AND" in service_account_cond + and_parts = service_account_cond["AND"] + assert {"team_id": {"in": member_team_ids}} in and_parts + assert {"user_id": None} in and_parts + + +@pytest.mark.asyncio +async def test_build_key_filter_admin_sees_all_team_keys(): + """ + Test that team admins see ALL keys for their teams (not just service accounts), + and that member_team_ids doesn't duplicate admin teams. + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + _build_key_filter_conditions, + ) + + user_id = "admin-user-123" + admin_team_ids = ["team-A"] + member_team_ids = ["team-A", "team-B"] + + where = _build_key_filter_conditions( + user_id=user_id, + team_id=None, + organization_id=None, + key_alias=None, + key_hash=None, + exclude_team_id=None, + admin_team_ids=admin_team_ids, + member_team_ids=member_team_ids, + include_created_by_keys=False, + ) + + assert "AND" in where + or_conditions = where["AND"][1]["OR"] + + # Should have 3 conditions: + # 1. user's own keys + # 2. admin team keys (all keys for team-A) + # 3. member-only service accounts (only service accounts for team-B, since team-A is already covered by admin) + assert len(or_conditions) == 3 + + # Find admin condition + admin_cond = None + service_account_cond = None + for cond in or_conditions: + if isinstance(cond.get("team_id"), dict) and "in" in cond.get("team_id", {}): + admin_cond = cond + elif "AND" in cond: + service_account_cond = cond + + assert admin_cond is not None, "Admin team condition should be present" + assert admin_cond["team_id"]["in"] == admin_team_ids + + # member-only condition should only include team-B (team-A is covered by admin) + assert service_account_cond is not None, "Service account condition should be present" + and_parts = service_account_cond["AND"] + assert {"team_id": {"in": ["team-B"]}} in and_parts + assert {"user_id": None} in and_parts + + +@pytest.mark.asyncio +async def test_build_key_filter_created_by_scoped_to_current_teams(): + """ + Test that created_by filter is scoped to teams user currently belongs to. + A former team member should NOT see service accounts they created for + a team they've left. + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + _build_key_filter_conditions, + ) + + user_id = "user-456" + # User is currently only a member of team-A (left team-B) + member_team_ids = ["team-A"] + + where = _build_key_filter_conditions( + user_id=user_id, + team_id=None, + organization_id=None, + key_alias=None, + key_hash=None, + exclude_team_id=None, + admin_team_ids=None, + member_team_ids=member_team_ids, + include_created_by_keys=True, + ) + + assert "AND" in where + or_conditions = where["AND"][1]["OR"] + + # Find the created_by condition + created_by_cond = None + for cond in or_conditions: + if "AND" in cond: + and_parts = cond["AND"] + for part in and_parts: + if isinstance(part, dict) and "created_by" in part: + created_by_cond = cond + break + + assert created_by_cond is not None, "Created by condition should be present" + + # created_by should be scoped: created_by=user AND (team_id in [team-A] OR team_id=None) + and_parts = created_by_cond["AND"] + assert {"created_by": user_id} in and_parts + + # Find the OR part that scopes to current teams + team_scope = None + for part in and_parts: + if isinstance(part, dict) and "OR" in part: + team_scope = part["OR"] + + assert team_scope is not None, "Team scope OR condition should be present" + assert {"team_id": {"in": member_team_ids}} in team_scope + assert {"team_id": None} in team_scope + + +@pytest.mark.asyncio +async def test_build_key_filter_created_by_no_teams(): + """ + Test that when user has no team memberships (empty list), created_by + only returns non-team keys (personal keys). + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + _build_key_filter_conditions, + ) + + user_id = "user-no-teams" + member_team_ids = [] # User has no team memberships + + where = _build_key_filter_conditions( + user_id=user_id, + team_id=None, + organization_id=None, + key_alias=None, + key_hash=None, + exclude_team_id=None, + admin_team_ids=None, + member_team_ids=member_team_ids, + include_created_by_keys=True, + ) + + assert "AND" in where + or_conditions = where["AND"][1]["OR"] + + # Find the created_by condition + created_by_cond = None + for cond in or_conditions: + if "AND" in cond: + and_parts = cond["AND"] + for part in and_parts: + if isinstance(part, dict) and "created_by" in part: + created_by_cond = cond + break + + assert created_by_cond is not None + and_parts = created_by_cond["AND"] + assert {"created_by": user_id} in and_parts + assert {"team_id": None} in and_parts + # Should NOT have an OR with team_id in [] - just a simple team_id=None + for part in and_parts: + if isinstance(part, dict) and "OR" in part: + pytest.fail("Should not have OR condition when member_team_ids is empty") + + +@pytest.mark.asyncio +async def test_build_key_filter_backward_compat_no_member_team_ids(): + """ + Test backward compatibility: when member_team_ids is None (not provided), + created_by filter should use the old unrestricted behavior. + This ensures direct callers of _list_key_helper (like Prometheus) still work. + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + _build_key_filter_conditions, + ) + + user_id = "user-789" + + where = _build_key_filter_conditions( + user_id=user_id, + team_id=None, + organization_id=None, + key_alias=None, + key_hash=None, + exclude_team_id=None, + admin_team_ids=None, + member_team_ids=None, # Not provided + include_created_by_keys=True, + ) + + assert "AND" in where + or_conditions = where["AND"][1]["OR"] + + # Find the created_by condition - should be simple {"created_by": user_id} + created_by_cond = None + for cond in or_conditions: + if "created_by" in cond: + created_by_cond = cond + + assert created_by_cond is not None + assert created_by_cond == {"created_by": user_id} + assert len(created_by_cond) == 1, "Should be simple created_by without team scoping" + + +@pytest.mark.asyncio +async def test_build_key_filter_admin_all_member_overlap(): + """ + Test that when user is admin of ALL teams they belong to, + no member-only service account condition is added (would be redundant). + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + _build_key_filter_conditions, + ) + + user_id = "admin-all" + admin_team_ids = ["team-A", "team-B"] + member_team_ids = ["team-A", "team-B"] + + where = _build_key_filter_conditions( + user_id=user_id, + team_id=None, + organization_id=None, + key_alias=None, + key_hash=None, + exclude_team_id=None, + admin_team_ids=admin_team_ids, + member_team_ids=member_team_ids, + include_created_by_keys=False, + ) + + assert "AND" in where + or_conditions = where["AND"][1]["OR"] + + # Should only have 2 conditions: user's own keys + admin team keys + # No member-only service account condition since all teams are admin + assert len(or_conditions) == 2 + + # Verify no AND condition with user_id=None exists (that's the member-only pattern) + for cond in or_conditions: + if "AND" in cond: + and_parts = cond["AND"] + if {"user_id": None} in and_parts: + pytest.fail( + "Should not have member-only service account condition " + "when user is admin of all teams" + ) + + +@pytest.mark.asyncio +async def test_get_member_team_ids(): + """ + Test that get_member_team_ids returns all teams where user is a member + (any role), not just admin teams. + """ + from litellm.proxy.management_endpoints.key_management_endpoints import ( + get_member_team_ids, + ) + + user_id = "member-user-123" + + # Create mock user info with teams + user_info = LiteLLM_UserTable( + user_id=user_id, + teams=["team-A", "team-B", "team-C"], + ) + + user_api_key_dict = UserAPIKeyAuth( + user_role=LitellmUserRoles.INTERNAL_USER, + api_key="sk-test", + user_id=user_id, + ) + + # Mock prisma client + mock_prisma_client = AsyncMock() + + # Create mock team objects - user is admin of team-A, member of team-B, not in team-C's members list + mock_team_a = MagicMock() + mock_team_a.model_dump.return_value = { + "team_id": "team-A", + "team_alias": "Team A", + "members_with_roles": [ + {"user_id": user_id, "role": "admin", "user_email": None} + ], + "max_budget": None, + "budget_duration": None, + "budget_reset_at": None, + "tpm_limit": None, + "rpm_limit": None, + "models": [], + "blocked": False, + } + + mock_team_b = MagicMock() + mock_team_b.model_dump.return_value = { + "team_id": "team-B", + "team_alias": "Team B", + "members_with_roles": [ + {"user_id": user_id, "role": "user", "user_email": None} + ], + "max_budget": None, + "budget_duration": None, + "budget_reset_at": None, + "tpm_limit": None, + "rpm_limit": None, + "models": [], + "blocked": False, + } + + mock_team_c = MagicMock() + mock_team_c.model_dump.return_value = { + "team_id": "team-C", + "team_alias": "Team C", + "members_with_roles": [ + {"user_id": "other-user", "role": "admin", "user_email": None} + ], + "max_budget": None, + "budget_duration": None, + "budget_reset_at": None, + "tpm_limit": None, + "rpm_limit": None, + "models": [], + "blocked": False, + } + + mock_prisma_client.db.litellm_teamtable.find_many = AsyncMock( + return_value=[mock_team_a, mock_team_b, mock_team_c] + ) + + result = await get_member_team_ids( + complete_user_info=user_info, + user_api_key_dict=user_api_key_dict, + prisma_client=mock_prisma_client, + ) + + # Should return team-A and team-B (user is a member of both) + # Should NOT return team-C (user is not in members list) + assert sorted(result) == ["team-A", "team-B"]