diff --git a/litellm/proxy/auth/login_utils.py b/litellm/proxy/auth/login_utils.py new file mode 100644 index 0000000000..6ef983b221 --- /dev/null +++ b/litellm/proxy/auth/login_utils.py @@ -0,0 +1,337 @@ +""" +Login utilities for handling user authentication in the proxy server. + +This module contains the core login logic that can be reused across different +login endpoints (e.g., /login and /v2/login). +""" + +import os +import secrets +from typing import Literal, Optional, cast + +import litellm +from fastapi import HTTPException + +from litellm.constants import LITELLM_PROXY_ADMIN_NAME +from litellm.proxy._types import ( + LiteLLM_UserTable, + LitellmUserRoles, + ProxyErrorTypes, + ProxyException, + UpdateUserRequest, + UserAPIKeyAuth, + hash_token, +) +from litellm.proxy.management_endpoints.internal_user_endpoints import user_update +from litellm.proxy.management_endpoints.key_management_endpoints import ( + generate_key_helper_fn, +) +from litellm.proxy.management_endpoints.ui_sso import ( + get_disabled_non_admin_personal_key_creation, +) +from litellm.proxy.utils import PrismaClient, get_server_root_path +from litellm.secret_managers.main import get_secret_bool +from litellm.types.proxy.ui_sso import ReturnedUITokenObject + + +def get_ui_credentials(master_key: Optional[str]) -> tuple[str, str]: + """ + Get UI username and password from environment variables or master key. + + Args: + master_key: Master key for the proxy (used as fallback for password) + + Returns: + tuple[str, str]: A tuple containing (ui_username, ui_password) + + Raises: + ProxyException: If neither UI_PASSWORD nor master_key is available + """ + ui_username = os.getenv("UI_USERNAME", "admin") + ui_password = os.getenv("UI_PASSWORD", None) + if ui_password is None: + ui_password = str(master_key) if master_key is not None else None + if ui_password is None: + raise ProxyException( + message="set Proxy master key to use UI. https://docs.litellm.ai/docs/proxy/virtual_keys. If set, use `--detailed_debug` to debug issue.", + type=ProxyErrorTypes.auth_error, + param="UI_PASSWORD", + code=500, + ) + return ui_username, ui_password + + +class LoginResult: + """Result object containing authentication data from login.""" + + def __init__( + self, + user_id: str, + key: str, + user_email: Optional[str], + user_role: str, + login_method: str = "username_password", + ): + self.user_id = user_id + self.key = key + self.user_email = user_email + self.user_role = user_role + self.login_method = login_method + + +async def authenticate_user( + username: str, + password: str, + master_key: Optional[str], + prisma_client: Optional[PrismaClient], +) -> LoginResult: + """ + Authenticate a user and generate an API key for UI access. + + This function handles two login scenarios: + 1. Admin login using UI_USERNAME and UI_PASSWORD + 2. User login using email and password from database + + Args: + username: Username or email from the login form + password: Password from the login form + master_key: Master key for the proxy (required) + prisma_client: Prisma database client (optional) + + Returns: + LoginResult: Object containing authentication data + + Raises: + ProxyException: If authentication fails or required configuration is missing + """ + if master_key is None: + raise ProxyException( + message="Master Key not set for Proxy. Please set Master Key to use Admin UI. Set `LITELLM_MASTER_KEY` in .env or set general_settings:master_key in config.yaml. https://docs.litellm.ai/docs/proxy/virtual_keys. If set, use `--detailed_debug` to debug issue.", + type=ProxyErrorTypes.auth_error, + param="master_key", + code=500, + ) + + ui_username, ui_password = get_ui_credentials(master_key) + + # Check if we can find the `username` in the db. On the UI, users can enter username=their email + _user_row: Optional[LiteLLM_UserTable] = None + user_role: Optional[ + Literal[ + LitellmUserRoles.PROXY_ADMIN, + LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, + LitellmUserRoles.INTERNAL_USER, + LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, + ] + ] = None + + if prisma_client is not None: + _user_row = cast( + Optional[LiteLLM_UserTable], + await prisma_client.db.litellm_usertable.find_first( + where={"user_email": {"equals": username}} + ), + ) + + """ + To login to Admin UI, we support the following + - Login with UI_USERNAME and UI_PASSWORD + - Login with Invite Link `user_email` and `password` combination + """ + if secrets.compare_digest(username, ui_username) and secrets.compare_digest( + password, ui_password + ): + # Non SSO -> If user is using UI_USERNAME and UI_PASSWORD they are Proxy admin + user_role = LitellmUserRoles.PROXY_ADMIN + user_id = LITELLM_PROXY_ADMIN_NAME + + # we want the key created to have PROXY_ADMIN_PERMISSIONS + key_user_id = LITELLM_PROXY_ADMIN_NAME + if ( + os.getenv("PROXY_ADMIN_ID", None) is not None + and os.environ["PROXY_ADMIN_ID"] == user_id + ) or user_id == LITELLM_PROXY_ADMIN_NAME: + # checks if user is admin + key_user_id = os.getenv("PROXY_ADMIN_ID", LITELLM_PROXY_ADMIN_NAME) + + # Admin is Authe'd in - generate key for the UI to access Proxy + + # ensure this user is set as the proxy admin, in this route there is no sso, we can assume this user is only the admin + await user_update( + data=UpdateUserRequest( + user_id=key_user_id, + user_role=user_role, + ), + user_api_key_dict=UserAPIKeyAuth( + user_role=LitellmUserRoles.PROXY_ADMIN, + ), + ) + + if os.getenv("DATABASE_URL") is not None: + response = await generate_key_helper_fn( + request_type="key", + **{ + "user_role": LitellmUserRoles.PROXY_ADMIN, + "duration": "24hr", + "key_max_budget": litellm.max_ui_session_budget, + "models": [], + "aliases": {}, + "config": {}, + "spend": 0, + "user_id": key_user_id, + "team_id": "litellm-dashboard", + }, # type: ignore + ) + else: + raise ProxyException( + message="No Database connected. Set DATABASE_URL in .env. If set, use `--detailed_debug` to debug issue.", + type=ProxyErrorTypes.auth_error, + param="DATABASE_URL", + code=500, + ) + + key = response["token"] # type: ignore + + if get_secret_bool("EXPERIMENTAL_UI_LOGIN"): + user_info: Optional[LiteLLM_UserTable] = None + if _user_row is not None: + user_info = _user_row + elif ( + user_id is not None + ): # if user_id is not None, we are using the UI_USERNAME and UI_PASSWORD + from litellm.proxy.auth.auth_checks import ExperimentalUIJWTToken + + user_info = LiteLLM_UserTable( + user_id=user_id, + user_role=user_role, + models=[], + max_budget=litellm.max_ui_session_budget, + ) + if user_info is None: + raise HTTPException( + status_code=401, + detail={ + "error": "User Information is required for experimental UI login" + }, + ) + + key = ExperimentalUIJWTToken.get_experimental_ui_login_jwt_auth_token( + user_info + ) + + return LoginResult( + user_id=user_id, + key=key, + user_email=None, + user_role=user_role, + login_method="username_password", + ) + + elif _user_row is not None: + """ + When sharing invite links + + -> if the user has no role in the DB assume they are only a viewer + """ + user_id = getattr(_user_row, "user_id", "unknown") + user_role = getattr( + _user_row, "user_role", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY + ) + user_email = getattr(_user_row, "user_email", "unknown") + _password = getattr(_user_row, "password", "unknown") + + if _password is None: + raise ProxyException( + message="User has no password set. Please set a password for the user via `/user/update`.", + type=ProxyErrorTypes.auth_error, + param="password", + code=401, + ) + + # check if password == _user_row.password + hash_password = hash_token(token=password) + if secrets.compare_digest(password, _password) or secrets.compare_digest( + hash_password, _password + ): + if os.getenv("DATABASE_URL") is not None: + response = await generate_key_helper_fn( + request_type="key", + **{ # type: ignore + "user_role": user_role, + "duration": "24hr", + "key_max_budget": litellm.max_ui_session_budget, + "models": [], + "aliases": {}, + "config": {}, + "spend": 0, + "user_id": user_id, + "team_id": "litellm-dashboard", + }, + ) + else: + raise ProxyException( + message="No Database connected. Set DATABASE_URL in .env. If set, use `--detailed_debug` to debug issue.", + type=ProxyErrorTypes.auth_error, + param="DATABASE_URL", + code=500, + ) + + key = response["token"] # type: ignore + + return LoginResult( + user_id=user_id, + key=key, + user_email=user_email, + user_role=cast(str, user_role), + login_method="username_password", + ) + else: + raise ProxyException( + message=f"Invalid credentials used to access UI.\nNot valid credentials for {username}", + type=ProxyErrorTypes.auth_error, + param="invalid_credentials", + code=401, + ) + else: + raise ProxyException( + message="Invalid credentials used to access UI.\nCheck 'UI_USERNAME', 'UI_PASSWORD' in .env file", + type=ProxyErrorTypes.auth_error, + param="invalid_credentials", + code=401, + ) + + +def create_ui_token_object( + login_result: LoginResult, + general_settings: dict, + premium_user: bool, +) -> ReturnedUITokenObject: + """ + Create a ReturnedUITokenObject from a LoginResult. + + Args: + login_result: The result from authenticate_user + general_settings: General proxy settings dictionary + premium_user: Whether premium features are enabled + + Returns: + ReturnedUITokenObject: Token object ready for JWT encoding + """ + disabled_non_admin_personal_key_creation = ( + get_disabled_non_admin_personal_key_creation() + ) + + return ReturnedUITokenObject( + user_id=login_result.user_id, + key=login_result.key, + user_email=login_result.user_email, + user_role=login_result.user_role, + login_method=login_result.login_method, + premium_user=premium_user, + auth_header_name=general_settings.get( + "litellm_key_header_name", "Authorization" + ), + disabled_non_admin_personal_key_creation=disabled_non_admin_personal_key_creation, + server_root_path=get_server_root_path(), + ) + diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index f2c7c361c4..9789f49550 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -8273,256 +8273,49 @@ async def fallback_login(request: Request): ) # hidden since this is a helper for UI sso login async def login(request: Request): # noqa: PLR0915 global premium_user, general_settings, master_key - from litellm.types.proxy.ui_sso import ReturnedUITokenObject + from litellm.proxy.auth.login_utils import authenticate_user, create_ui_token_object + from litellm.proxy.utils import get_custom_url - if master_key is None: - raise ProxyException( - message="Master Key not set for Proxy. Please set Master Key to use Admin UI. Set `LITELLM_MASTER_KEY` in .env or set general_settings:master_key in config.yaml. https://docs.litellm.ai/docs/proxy/virtual_keys. If set, use `--detailed_debug` to debug issue.", - type=ProxyErrorTypes.auth_error, - param="master_key", - code=status.HTTP_500_INTERNAL_SERVER_ERROR, - ) form = await request.form() username = str(form.get("username")) password = str(form.get("password")) - ui_username = os.getenv("UI_USERNAME", "admin") - ui_password = os.getenv("UI_PASSWORD", None) - if ui_password is None: - ui_password = str(master_key) if master_key is not None else None - if ui_password is None: - raise ProxyException( - message="set Proxy master key to use UI. https://docs.litellm.ai/docs/proxy/virtual_keys. If set, use `--detailed_debug` to debug issue.", - type=ProxyErrorTypes.auth_error, - param="UI_PASSWORD", - code=status.HTTP_500_INTERNAL_SERVER_ERROR, - ) - # check if we can find the `username` in the db. on the ui, users can enter username=their email - _user_row: Optional[LiteLLM_UserTable] = None - user_role: Optional[ - Literal[ - LitellmUserRoles.PROXY_ADMIN, - LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, - LitellmUserRoles.INTERNAL_USER, - LitellmUserRoles.INTERNAL_USER_VIEW_ONLY, - ] - ] = None - if prisma_client is not None: - _user_row = cast( - Optional[LiteLLM_UserTable], - await prisma_client.db.litellm_usertable.find_first( - where={"user_email": {"equals": username}} - ), - ) - disabled_non_admin_personal_key_creation = ( - get_disabled_non_admin_personal_key_creation() + # Authenticate user and get login result + login_result = await authenticate_user( + username=username, + password=password, + master_key=master_key, + prisma_client=prisma_client, ) - """ - To login to Admin UI, we support the following - - Login with UI_USERNAME and UI_PASSWORD - - Login with Invite Link `user_email` and `password` combination - """ - if secrets.compare_digest(username, ui_username) and secrets.compare_digest( - password, ui_password - ): - # Non SSO -> If user is using UI_USERNAME and UI_PASSWORD they are Proxy admin - user_role = LitellmUserRoles.PROXY_ADMIN - user_id = litellm_proxy_admin_name - # we want the key created to have PROXY_ADMIN_PERMISSIONS - key_user_id = litellm_proxy_admin_name - if ( - os.getenv("PROXY_ADMIN_ID", None) is not None - and os.environ["PROXY_ADMIN_ID"] == user_id - ) or user_id == litellm_proxy_admin_name: - # checks if user is admin - key_user_id = os.getenv("PROXY_ADMIN_ID", litellm_proxy_admin_name) + # Create UI token object + returned_ui_token_object = create_ui_token_object( + login_result=login_result, + general_settings=general_settings, + premium_user=premium_user, + ) - # Admin is Authe'd in - generate key for the UI to access Proxy + # Generate JWT token + import jwt - # ensure this user is set as the proxy admin, in this route there is no sso, we can assume this user is only the admin - await user_update( - data=UpdateUserRequest( - user_id=key_user_id, - user_role=user_role, - ), - user_api_key_dict=UserAPIKeyAuth( - user_role=LitellmUserRoles.PROXY_ADMIN, - ), - ) - if os.getenv("DATABASE_URL") is not None: - response = await generate_key_helper_fn( - request_type="key", - **{ - "user_role": LitellmUserRoles.PROXY_ADMIN, - "duration": "24hr", - "key_max_budget": litellm.max_ui_session_budget, - "models": [], - "aliases": {}, - "config": {}, - "spend": 0, - "user_id": key_user_id, - "team_id": "litellm-dashboard", - }, # type: ignore - ) - else: - raise ProxyException( - message="No Database connected. Set DATABASE_URL in .env. If set, use `--detailed_debug` to debug issue.", - type=ProxyErrorTypes.auth_error, - param="DATABASE_URL", - code=status.HTTP_500_INTERNAL_SERVER_ERROR, - ) - key = response["token"] # type: ignore - litellm_dashboard_ui = get_custom_url(str(request.base_url)) - if litellm_dashboard_ui.endswith("/"): - litellm_dashboard_ui += "ui/" - else: - litellm_dashboard_ui += "/ui/" - import jwt + jwt_token = jwt.encode( # type: ignore + cast(dict, returned_ui_token_object), + master_key, + algorithm="HS256", + ) - if get_secret_bool("EXPERIMENTAL_UI_LOGIN"): - user_info: Optional[LiteLLM_UserTable] = None - if _user_row is not None: - user_info = _user_row - elif ( - user_id is not None - ): # if user_id is not None, we are using the UI_USERNAME and UI_PASSWORD - user_info = LiteLLM_UserTable( - user_id=user_id, - user_role=user_role, - models=[], - max_budget=litellm.max_ui_session_budget, - ) - if user_info is None: - raise HTTPException( - status_code=401, - detail={ - "error": "User Information is required for experimental UI login" - }, - ) - - key = ExperimentalUIJWTToken.get_experimental_ui_login_jwt_auth_token( - user_info - ) - - returned_ui_token_object = ReturnedUITokenObject( - user_id=user_id, - key=key, - user_email=None, - user_role=user_role, - login_method="username_password", - premium_user=premium_user, - auth_header_name=general_settings.get( - "litellm_key_header_name", "Authorization" - ), - disabled_non_admin_personal_key_creation=disabled_non_admin_personal_key_creation, - server_root_path=get_server_root_path(), - ) - - jwt_token = jwt.encode( # type: ignore - cast(dict, returned_ui_token_object), - master_key, - algorithm="HS256", - ) - litellm_dashboard_ui += "?login=success" - redirect_response = RedirectResponse(url=litellm_dashboard_ui, status_code=303) - redirect_response.set_cookie(key="token", value=jwt_token) - return redirect_response - elif _user_row is not None: - """ - When sharing invite links - - -> if the user has no role in the DB assume they are only a viewer - """ - user_id = getattr(_user_row, "user_id", "unknown") - user_role = getattr( - _user_row, "user_role", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY - ) - user_email = getattr(_user_row, "user_email", "unknown") - _password = getattr(_user_row, "password", "unknown") - - if _password is None: - raise ProxyException( - message="User has no password set. Please set a password for the user via `/user/update`.", - type=ProxyErrorTypes.auth_error, - param="password", - code=status.HTTP_401_UNAUTHORIZED, - ) - - # check if password == _user_row.password - hash_password = hash_token(token=password) - if secrets.compare_digest(password, _password) or secrets.compare_digest( - hash_password, _password - ): - if os.getenv("DATABASE_URL") is not None: - response = await generate_key_helper_fn( - request_type="key", - **{ # type: ignore - "user_role": user_role, - "duration": "24hr", - "key_max_budget": litellm.max_ui_session_budget, - "models": [], - "aliases": {}, - "config": {}, - "spend": 0, - "user_id": user_id, - "team_id": "litellm-dashboard", - }, - ) - else: - raise ProxyException( - message="No Database connected. Set DATABASE_URL in .env. If set, use `--detailed_debug` to debug issue.", - type=ProxyErrorTypes.auth_error, - param="DATABASE_URL", - code=status.HTTP_500_INTERNAL_SERVER_ERROR, - ) - key = response["token"] # type: ignore - litellm_dashboard_ui = get_custom_url(str(request.base_url)) - if litellm_dashboard_ui.endswith("/"): - litellm_dashboard_ui += "ui/" - else: - litellm_dashboard_ui += "/ui/" - import jwt - - returned_ui_token_object = ReturnedUITokenObject( - user_id=user_id, - key=key, - user_email=user_email, - user_role=cast(str, user_role), - login_method="username_password", - premium_user=premium_user, - auth_header_name=general_settings.get( - "litellm_key_header_name", "Authorization" - ), - disabled_non_admin_personal_key_creation=disabled_non_admin_personal_key_creation, - server_root_path=get_server_root_path(), - ) - - jwt_token = jwt.encode( # type: ignore - cast(dict, returned_ui_token_object), - master_key, - algorithm="HS256", - ) - litellm_dashboard_ui += "?login=success" - redirect_response = RedirectResponse( - url=litellm_dashboard_ui, status_code=303 - ) - redirect_response.set_cookie(key="token", value=jwt_token) - return redirect_response - else: - raise ProxyException( - message=f"Invalid credentials used to access UI.\nNot valid credentials for {username}", - type=ProxyErrorTypes.auth_error, - param="invalid_credentials", - code=status.HTTP_401_UNAUTHORIZED, - ) + # Build redirect URL + litellm_dashboard_ui = get_custom_url(str(request.base_url)) + if litellm_dashboard_ui.endswith("/"): + litellm_dashboard_ui += "ui/" else: - raise ProxyException( - message="Invalid credentials used to access UI.\nCheck 'UI_USERNAME', 'UI_PASSWORD' in .env file", - type=ProxyErrorTypes.auth_error, - param="invalid_credentials", - code=status.HTTP_401_UNAUTHORIZED, - ) + litellm_dashboard_ui += "/ui/" + litellm_dashboard_ui += "?login=success" + + # Create redirect response with cookie + redirect_response = RedirectResponse(url=litellm_dashboard_ui, status_code=303) + redirect_response.set_cookie(key="token", value=jwt_token) + return redirect_response @app.get("/onboarding/get_token", include_in_schema=False) diff --git a/tests/test_litellm/proxy/auth/test_login_utils.py b/tests/test_litellm/proxy/auth/test_login_utils.py new file mode 100644 index 0000000000..201461dc8b --- /dev/null +++ b/tests/test_litellm/proxy/auth/test_login_utils.py @@ -0,0 +1,284 @@ +""" +Tests for login_utils module. + +This module tests the refactored login logic that was moved from proxy_server.py +to login_utils.py for better reusability. +""" + +import os +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from litellm.constants import LITELLM_PROXY_ADMIN_NAME +from litellm.proxy._types import ( + LiteLLM_UserTable, + LitellmUserRoles, + ProxyErrorTypes, + ProxyException, + hash_token, +) +from litellm.proxy.auth.login_utils import ( + LoginResult, + authenticate_user, + get_ui_credentials, +) + + +def test_get_ui_credentials_prefers_explicit_password(): + """The configured UI password should be returned when available.""" + with patch.dict( + os.environ, + {"UI_USERNAME": "test-admin", "UI_PASSWORD": "secure-pass"}, + clear=True, + ): + username, password = get_ui_credentials(master_key="sk-123") + + assert username == "test-admin" + assert password == "secure-pass" + + +def test_get_ui_credentials_can_use_master_key(): + """Master key should be used as password when UI_PASSWORD is missing.""" + with patch.dict(os.environ, {"UI_USERNAME": "fallback-admin"}, clear=True): + username, password = get_ui_credentials(master_key="fallback-key") + + assert username == "fallback-admin" + assert password == "fallback-key" + + +def test_get_ui_credentials_requires_password(): + """Missing UI password and master key results in error.""" + with patch.dict(os.environ, {}, clear=True): + with pytest.raises(ProxyException) as exc_info: + get_ui_credentials(master_key=None) + + assert exc_info.value.type == ProxyErrorTypes.auth_error + assert exc_info.value.code == "500" + + +@pytest.mark.asyncio +async def test_authenticate_user_admin_login_with_ui_credentials(): + """Test admin login using UI_USERNAME and UI_PASSWORD""" + master_key = "sk-1234" + ui_username = "admin" + ui_password = "sk-1234" + + mock_prisma_client = MagicMock() + mock_prisma_client.db.litellm_usertable.find_first = AsyncMock(return_value=None) + + with patch.dict( + os.environ, + { + "UI_USERNAME": ui_username, + "UI_PASSWORD": ui_password, + "DATABASE_URL": "postgresql://test:test@localhost/test", + }, + ): + with patch( + "litellm.proxy.auth.login_utils.generate_key_helper_fn", + new_callable=AsyncMock, + ) as mock_generate_key: + mock_generate_key.return_value = { + "token": "test-token-123", + "user_id": LITELLM_PROXY_ADMIN_NAME, + } + + with patch( + "litellm.proxy.auth.login_utils.user_update", + new_callable=AsyncMock, + return_value=None, + ) as mock_user_update: + with patch( + "litellm.proxy.auth.login_utils.get_secret_bool", + return_value=False, + ): + result = await authenticate_user( + username=ui_username, + password=ui_password, + master_key=master_key, + prisma_client=mock_prisma_client, + ) + + assert isinstance(result, LoginResult) + assert result.user_id == LITELLM_PROXY_ADMIN_NAME + assert result.key == "test-token-123" + assert result.user_email is None + assert result.user_role == LitellmUserRoles.PROXY_ADMIN + assert result.login_method == "username_password" + + +@pytest.mark.asyncio +async def test_authenticate_user_admin_login_with_master_key_as_password(): + """Test admin login when UI_PASSWORD is not set, should use master_key""" + master_key = "sk-1234" + ui_username = "admin" + + mock_prisma_client = MagicMock() + mock_prisma_client.db.litellm_usertable.find_first = AsyncMock(return_value=None) + + env_vars = {"UI_USERNAME": ui_username, "DATABASE_URL": "postgresql://test:test@localhost/test"} + # Remove UI_PASSWORD to test fallback to master_key + if "UI_PASSWORD" in os.environ: + # Keep other env vars but don't set UI_PASSWORD + pass + else: + # Ensure UI_PASSWORD is not in the patched env + pass + + with patch.dict(os.environ, env_vars, clear=False): + # Explicitly remove UI_PASSWORD if it exists + original_ui_password = os.environ.pop("UI_PASSWORD", None) + try: + with patch( + "litellm.proxy.auth.login_utils.generate_key_helper_fn", + new_callable=AsyncMock, + ) as mock_generate_key: + mock_generate_key.return_value = { + "token": "test-token-123", + "user_id": LITELLM_PROXY_ADMIN_NAME, + } + + with patch( + "litellm.proxy.auth.login_utils.user_update", + new_callable=AsyncMock, + return_value=None, + ) as mock_user_update: + with patch( + "litellm.proxy.auth.login_utils.get_secret_bool", + return_value=False, + ): + result = await authenticate_user( + username=ui_username, + password=master_key, + master_key=master_key, + prisma_client=mock_prisma_client, + ) + + assert isinstance(result, LoginResult) + assert result.user_id == LITELLM_PROXY_ADMIN_NAME + assert result.user_role == LitellmUserRoles.PROXY_ADMIN + finally: + if original_ui_password: + os.environ["UI_PASSWORD"] = original_ui_password + +@pytest.mark.asyncio +async def test_authenticate_user_invalid_credentials(): + """Test authentication failure with invalid credentials""" + master_key = "sk-1234" + ui_username = "admin" + wrong_password = "wrong-password" + + mock_prisma_client = MagicMock() + mock_prisma_client.db.litellm_usertable.find_first = AsyncMock(return_value=None) + + with patch.dict(os.environ, {"UI_USERNAME": ui_username, "UI_PASSWORD": "correct-password"}): + with pytest.raises(ProxyException) as exc_info: + await authenticate_user( + username=ui_username, + password=wrong_password, + master_key=master_key, + prisma_client=mock_prisma_client, + ) + + assert exc_info.value.type == ProxyErrorTypes.auth_error + assert exc_info.value.code == "401" + assert "Invalid credentials" in exc_info.value.message + + +@pytest.mark.asyncio +async def test_authenticate_user_missing_master_key(): + """Test authentication failure when master_key is None""" + mock_prisma_client = MagicMock() + + with pytest.raises(ProxyException) as exc_info: + await authenticate_user( + username="admin", + password="password", + master_key=None, + prisma_client=mock_prisma_client, + ) + + assert exc_info.value.type == ProxyErrorTypes.auth_error + assert exc_info.value.code == "500" + assert "Master Key not set" in exc_info.value.message + + +@pytest.mark.asyncio +async def test_authenticate_user_wrong_password(): + """Test authentication failure with wrong password for database user""" + master_key = "sk-1234" + user_email = "test@example.com" + correct_password = "correct-password" + wrong_password = "wrong-password" + hashed_password = hash_token(token=correct_password) + + mock_user = LiteLLM_UserTable( + user_id="test-user-123", + user_email=user_email, + password=hashed_password, + user_role=LitellmUserRoles.INTERNAL_USER, + ) + + mock_prisma_client = MagicMock() + mock_prisma_client.db.litellm_usertable.find_first = AsyncMock( + return_value=mock_user + ) + + with patch.dict( + os.environ, + { + "DATABASE_URL": "postgresql://test:test@localhost/test", + "UI_USERNAME": "admin", + "UI_PASSWORD": "admin-password", + }, + ): + with pytest.raises(ProxyException) as exc_info: + await authenticate_user( + username=user_email, + password=wrong_password, + master_key=master_key, + prisma_client=mock_prisma_client, + ) + + assert exc_info.value.type == ProxyErrorTypes.auth_error + assert exc_info.value.code == "401" + assert "Invalid credentials" in exc_info.value.message + + +@pytest.mark.asyncio +async def test_authenticate_user_database_required_for_admin(): + """Test that database is required for admin login""" + master_key = "sk-1234" + ui_username = "admin" + ui_password = "sk-1234" + + mock_prisma_client = MagicMock() + mock_prisma_client.db.litellm_usertable.find_first = AsyncMock(return_value=None) + + with patch.dict(os.environ, {"UI_USERNAME": ui_username, "UI_PASSWORD": ui_password}): + with patch( + "litellm.proxy.auth.login_utils.user_update", + new_callable=AsyncMock, + return_value=None, + ): + # Remove DATABASE_URL to simulate no database + original_db_url = os.environ.get("DATABASE_URL") + if "DATABASE_URL" in os.environ: + del os.environ["DATABASE_URL"] + + try: + with pytest.raises(ProxyException) as exc_info: + await authenticate_user( + username=ui_username, + password=ui_password, + master_key=master_key, + prisma_client=mock_prisma_client, + ) + + assert exc_info.value.type == ProxyErrorTypes.auth_error + assert exc_info.value.code == "500" + assert "No Database connected" in exc_info.value.message + finally: + if original_db_url: + os.environ["DATABASE_URL"] = original_db_url