mirror of
https://github.com/tiennm99/litellm.git
synced 2026-06-18 07:33:58 +00:00
Merge pull request #17379 from BerriAI/litellm_login_route_refactor
[Refactor] /login route
This commit is contained in:
@@ -0,0 +1,337 @@
|
||||
"""
|
||||
Login utilities for handling user authentication in the proxy server.
|
||||
|
||||
This module contains the core login logic that can be reused across different
|
||||
login endpoints (e.g., /login and /v2/login).
|
||||
"""
|
||||
|
||||
import os
|
||||
import secrets
|
||||
from typing import Literal, Optional, cast
|
||||
|
||||
import litellm
|
||||
from fastapi import HTTPException
|
||||
|
||||
from litellm.constants import LITELLM_PROXY_ADMIN_NAME
|
||||
from litellm.proxy._types import (
|
||||
LiteLLM_UserTable,
|
||||
LitellmUserRoles,
|
||||
ProxyErrorTypes,
|
||||
ProxyException,
|
||||
UpdateUserRequest,
|
||||
UserAPIKeyAuth,
|
||||
hash_token,
|
||||
)
|
||||
from litellm.proxy.management_endpoints.internal_user_endpoints import user_update
|
||||
from litellm.proxy.management_endpoints.key_management_endpoints import (
|
||||
generate_key_helper_fn,
|
||||
)
|
||||
from litellm.proxy.management_endpoints.ui_sso import (
|
||||
get_disabled_non_admin_personal_key_creation,
|
||||
)
|
||||
from litellm.proxy.utils import PrismaClient, get_server_root_path
|
||||
from litellm.secret_managers.main import get_secret_bool
|
||||
from litellm.types.proxy.ui_sso import ReturnedUITokenObject
|
||||
|
||||
|
||||
def get_ui_credentials(master_key: Optional[str]) -> tuple[str, str]:
|
||||
"""
|
||||
Get UI username and password from environment variables or master key.
|
||||
|
||||
Args:
|
||||
master_key: Master key for the proxy (used as fallback for password)
|
||||
|
||||
Returns:
|
||||
tuple[str, str]: A tuple containing (ui_username, ui_password)
|
||||
|
||||
Raises:
|
||||
ProxyException: If neither UI_PASSWORD nor master_key is available
|
||||
"""
|
||||
ui_username = os.getenv("UI_USERNAME", "admin")
|
||||
ui_password = os.getenv("UI_PASSWORD", None)
|
||||
if ui_password is None:
|
||||
ui_password = str(master_key) if master_key is not None else None
|
||||
if ui_password is None:
|
||||
raise ProxyException(
|
||||
message="set Proxy master key to use UI. https://docs.litellm.ai/docs/proxy/virtual_keys. If set, use `--detailed_debug` to debug issue.",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="UI_PASSWORD",
|
||||
code=500,
|
||||
)
|
||||
return ui_username, ui_password
|
||||
|
||||
|
||||
class LoginResult:
|
||||
"""Result object containing authentication data from login."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
user_id: str,
|
||||
key: str,
|
||||
user_email: Optional[str],
|
||||
user_role: str,
|
||||
login_method: str = "username_password",
|
||||
):
|
||||
self.user_id = user_id
|
||||
self.key = key
|
||||
self.user_email = user_email
|
||||
self.user_role = user_role
|
||||
self.login_method = login_method
|
||||
|
||||
|
||||
async def authenticate_user(
|
||||
username: str,
|
||||
password: str,
|
||||
master_key: Optional[str],
|
||||
prisma_client: Optional[PrismaClient],
|
||||
) -> LoginResult:
|
||||
"""
|
||||
Authenticate a user and generate an API key for UI access.
|
||||
|
||||
This function handles two login scenarios:
|
||||
1. Admin login using UI_USERNAME and UI_PASSWORD
|
||||
2. User login using email and password from database
|
||||
|
||||
Args:
|
||||
username: Username or email from the login form
|
||||
password: Password from the login form
|
||||
master_key: Master key for the proxy (required)
|
||||
prisma_client: Prisma database client (optional)
|
||||
|
||||
Returns:
|
||||
LoginResult: Object containing authentication data
|
||||
|
||||
Raises:
|
||||
ProxyException: If authentication fails or required configuration is missing
|
||||
"""
|
||||
if master_key is None:
|
||||
raise ProxyException(
|
||||
message="Master Key not set for Proxy. Please set Master Key to use Admin UI. Set `LITELLM_MASTER_KEY` in .env or set general_settings:master_key in config.yaml. https://docs.litellm.ai/docs/proxy/virtual_keys. If set, use `--detailed_debug` to debug issue.",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="master_key",
|
||||
code=500,
|
||||
)
|
||||
|
||||
ui_username, ui_password = get_ui_credentials(master_key)
|
||||
|
||||
# Check if we can find the `username` in the db. On the UI, users can enter username=their email
|
||||
_user_row: Optional[LiteLLM_UserTable] = None
|
||||
user_role: Optional[
|
||||
Literal[
|
||||
LitellmUserRoles.PROXY_ADMIN,
|
||||
LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY,
|
||||
LitellmUserRoles.INTERNAL_USER,
|
||||
LitellmUserRoles.INTERNAL_USER_VIEW_ONLY,
|
||||
]
|
||||
] = None
|
||||
|
||||
if prisma_client is not None:
|
||||
_user_row = cast(
|
||||
Optional[LiteLLM_UserTable],
|
||||
await prisma_client.db.litellm_usertable.find_first(
|
||||
where={"user_email": {"equals": username}}
|
||||
),
|
||||
)
|
||||
|
||||
"""
|
||||
To login to Admin UI, we support the following
|
||||
- Login with UI_USERNAME and UI_PASSWORD
|
||||
- Login with Invite Link `user_email` and `password` combination
|
||||
"""
|
||||
if secrets.compare_digest(username, ui_username) and secrets.compare_digest(
|
||||
password, ui_password
|
||||
):
|
||||
# Non SSO -> If user is using UI_USERNAME and UI_PASSWORD they are Proxy admin
|
||||
user_role = LitellmUserRoles.PROXY_ADMIN
|
||||
user_id = LITELLM_PROXY_ADMIN_NAME
|
||||
|
||||
# we want the key created to have PROXY_ADMIN_PERMISSIONS
|
||||
key_user_id = LITELLM_PROXY_ADMIN_NAME
|
||||
if (
|
||||
os.getenv("PROXY_ADMIN_ID", None) is not None
|
||||
and os.environ["PROXY_ADMIN_ID"] == user_id
|
||||
) or user_id == LITELLM_PROXY_ADMIN_NAME:
|
||||
# checks if user is admin
|
||||
key_user_id = os.getenv("PROXY_ADMIN_ID", LITELLM_PROXY_ADMIN_NAME)
|
||||
|
||||
# Admin is Authe'd in - generate key for the UI to access Proxy
|
||||
|
||||
# ensure this user is set as the proxy admin, in this route there is no sso, we can assume this user is only the admin
|
||||
await user_update(
|
||||
data=UpdateUserRequest(
|
||||
user_id=key_user_id,
|
||||
user_role=user_role,
|
||||
),
|
||||
user_api_key_dict=UserAPIKeyAuth(
|
||||
user_role=LitellmUserRoles.PROXY_ADMIN,
|
||||
),
|
||||
)
|
||||
|
||||
if os.getenv("DATABASE_URL") is not None:
|
||||
response = await generate_key_helper_fn(
|
||||
request_type="key",
|
||||
**{
|
||||
"user_role": LitellmUserRoles.PROXY_ADMIN,
|
||||
"duration": "24hr",
|
||||
"key_max_budget": litellm.max_ui_session_budget,
|
||||
"models": [],
|
||||
"aliases": {},
|
||||
"config": {},
|
||||
"spend": 0,
|
||||
"user_id": key_user_id,
|
||||
"team_id": "litellm-dashboard",
|
||||
}, # type: ignore
|
||||
)
|
||||
else:
|
||||
raise ProxyException(
|
||||
message="No Database connected. Set DATABASE_URL in .env. If set, use `--detailed_debug` to debug issue.",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="DATABASE_URL",
|
||||
code=500,
|
||||
)
|
||||
|
||||
key = response["token"] # type: ignore
|
||||
|
||||
if get_secret_bool("EXPERIMENTAL_UI_LOGIN"):
|
||||
user_info: Optional[LiteLLM_UserTable] = None
|
||||
if _user_row is not None:
|
||||
user_info = _user_row
|
||||
elif (
|
||||
user_id is not None
|
||||
): # if user_id is not None, we are using the UI_USERNAME and UI_PASSWORD
|
||||
from litellm.proxy.auth.auth_checks import ExperimentalUIJWTToken
|
||||
|
||||
user_info = LiteLLM_UserTable(
|
||||
user_id=user_id,
|
||||
user_role=user_role,
|
||||
models=[],
|
||||
max_budget=litellm.max_ui_session_budget,
|
||||
)
|
||||
if user_info is None:
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail={
|
||||
"error": "User Information is required for experimental UI login"
|
||||
},
|
||||
)
|
||||
|
||||
key = ExperimentalUIJWTToken.get_experimental_ui_login_jwt_auth_token(
|
||||
user_info
|
||||
)
|
||||
|
||||
return LoginResult(
|
||||
user_id=user_id,
|
||||
key=key,
|
||||
user_email=None,
|
||||
user_role=user_role,
|
||||
login_method="username_password",
|
||||
)
|
||||
|
||||
elif _user_row is not None:
|
||||
"""
|
||||
When sharing invite links
|
||||
|
||||
-> if the user has no role in the DB assume they are only a viewer
|
||||
"""
|
||||
user_id = getattr(_user_row, "user_id", "unknown")
|
||||
user_role = getattr(
|
||||
_user_row, "user_role", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY
|
||||
)
|
||||
user_email = getattr(_user_row, "user_email", "unknown")
|
||||
_password = getattr(_user_row, "password", "unknown")
|
||||
|
||||
if _password is None:
|
||||
raise ProxyException(
|
||||
message="User has no password set. Please set a password for the user via `/user/update`.",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="password",
|
||||
code=401,
|
||||
)
|
||||
|
||||
# check if password == _user_row.password
|
||||
hash_password = hash_token(token=password)
|
||||
if secrets.compare_digest(password, _password) or secrets.compare_digest(
|
||||
hash_password, _password
|
||||
):
|
||||
if os.getenv("DATABASE_URL") is not None:
|
||||
response = await generate_key_helper_fn(
|
||||
request_type="key",
|
||||
**{ # type: ignore
|
||||
"user_role": user_role,
|
||||
"duration": "24hr",
|
||||
"key_max_budget": litellm.max_ui_session_budget,
|
||||
"models": [],
|
||||
"aliases": {},
|
||||
"config": {},
|
||||
"spend": 0,
|
||||
"user_id": user_id,
|
||||
"team_id": "litellm-dashboard",
|
||||
},
|
||||
)
|
||||
else:
|
||||
raise ProxyException(
|
||||
message="No Database connected. Set DATABASE_URL in .env. If set, use `--detailed_debug` to debug issue.",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="DATABASE_URL",
|
||||
code=500,
|
||||
)
|
||||
|
||||
key = response["token"] # type: ignore
|
||||
|
||||
return LoginResult(
|
||||
user_id=user_id,
|
||||
key=key,
|
||||
user_email=user_email,
|
||||
user_role=cast(str, user_role),
|
||||
login_method="username_password",
|
||||
)
|
||||
else:
|
||||
raise ProxyException(
|
||||
message=f"Invalid credentials used to access UI.\nNot valid credentials for {username}",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="invalid_credentials",
|
||||
code=401,
|
||||
)
|
||||
else:
|
||||
raise ProxyException(
|
||||
message="Invalid credentials used to access UI.\nCheck 'UI_USERNAME', 'UI_PASSWORD' in .env file",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="invalid_credentials",
|
||||
code=401,
|
||||
)
|
||||
|
||||
|
||||
def create_ui_token_object(
|
||||
login_result: LoginResult,
|
||||
general_settings: dict,
|
||||
premium_user: bool,
|
||||
) -> ReturnedUITokenObject:
|
||||
"""
|
||||
Create a ReturnedUITokenObject from a LoginResult.
|
||||
|
||||
Args:
|
||||
login_result: The result from authenticate_user
|
||||
general_settings: General proxy settings dictionary
|
||||
premium_user: Whether premium features are enabled
|
||||
|
||||
Returns:
|
||||
ReturnedUITokenObject: Token object ready for JWT encoding
|
||||
"""
|
||||
disabled_non_admin_personal_key_creation = (
|
||||
get_disabled_non_admin_personal_key_creation()
|
||||
)
|
||||
|
||||
return ReturnedUITokenObject(
|
||||
user_id=login_result.user_id,
|
||||
key=login_result.key,
|
||||
user_email=login_result.user_email,
|
||||
user_role=login_result.user_role,
|
||||
login_method=login_result.login_method,
|
||||
premium_user=premium_user,
|
||||
auth_header_name=general_settings.get(
|
||||
"litellm_key_header_name", "Authorization"
|
||||
),
|
||||
disabled_non_admin_personal_key_creation=disabled_non_admin_personal_key_creation,
|
||||
server_root_path=get_server_root_path(),
|
||||
)
|
||||
|
||||
+32
-239
@@ -8273,256 +8273,49 @@ async def fallback_login(request: Request):
|
||||
) # hidden since this is a helper for UI sso login
|
||||
async def login(request: Request): # noqa: PLR0915
|
||||
global premium_user, general_settings, master_key
|
||||
from litellm.types.proxy.ui_sso import ReturnedUITokenObject
|
||||
from litellm.proxy.auth.login_utils import authenticate_user, create_ui_token_object
|
||||
from litellm.proxy.utils import get_custom_url
|
||||
|
||||
if master_key is None:
|
||||
raise ProxyException(
|
||||
message="Master Key not set for Proxy. Please set Master Key to use Admin UI. Set `LITELLM_MASTER_KEY` in .env or set general_settings:master_key in config.yaml. https://docs.litellm.ai/docs/proxy/virtual_keys. If set, use `--detailed_debug` to debug issue.",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="master_key",
|
||||
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
)
|
||||
form = await request.form()
|
||||
username = str(form.get("username"))
|
||||
password = str(form.get("password"))
|
||||
ui_username = os.getenv("UI_USERNAME", "admin")
|
||||
ui_password = os.getenv("UI_PASSWORD", None)
|
||||
if ui_password is None:
|
||||
ui_password = str(master_key) if master_key is not None else None
|
||||
if ui_password is None:
|
||||
raise ProxyException(
|
||||
message="set Proxy master key to use UI. https://docs.litellm.ai/docs/proxy/virtual_keys. If set, use `--detailed_debug` to debug issue.",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="UI_PASSWORD",
|
||||
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
)
|
||||
|
||||
# check if we can find the `username` in the db. on the ui, users can enter username=their email
|
||||
_user_row: Optional[LiteLLM_UserTable] = None
|
||||
user_role: Optional[
|
||||
Literal[
|
||||
LitellmUserRoles.PROXY_ADMIN,
|
||||
LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY,
|
||||
LitellmUserRoles.INTERNAL_USER,
|
||||
LitellmUserRoles.INTERNAL_USER_VIEW_ONLY,
|
||||
]
|
||||
] = None
|
||||
if prisma_client is not None:
|
||||
_user_row = cast(
|
||||
Optional[LiteLLM_UserTable],
|
||||
await prisma_client.db.litellm_usertable.find_first(
|
||||
where={"user_email": {"equals": username}}
|
||||
),
|
||||
)
|
||||
disabled_non_admin_personal_key_creation = (
|
||||
get_disabled_non_admin_personal_key_creation()
|
||||
# Authenticate user and get login result
|
||||
login_result = await authenticate_user(
|
||||
username=username,
|
||||
password=password,
|
||||
master_key=master_key,
|
||||
prisma_client=prisma_client,
|
||||
)
|
||||
"""
|
||||
To login to Admin UI, we support the following
|
||||
- Login with UI_USERNAME and UI_PASSWORD
|
||||
- Login with Invite Link `user_email` and `password` combination
|
||||
"""
|
||||
if secrets.compare_digest(username, ui_username) and secrets.compare_digest(
|
||||
password, ui_password
|
||||
):
|
||||
# Non SSO -> If user is using UI_USERNAME and UI_PASSWORD they are Proxy admin
|
||||
user_role = LitellmUserRoles.PROXY_ADMIN
|
||||
user_id = litellm_proxy_admin_name
|
||||
|
||||
# we want the key created to have PROXY_ADMIN_PERMISSIONS
|
||||
key_user_id = litellm_proxy_admin_name
|
||||
if (
|
||||
os.getenv("PROXY_ADMIN_ID", None) is not None
|
||||
and os.environ["PROXY_ADMIN_ID"] == user_id
|
||||
) or user_id == litellm_proxy_admin_name:
|
||||
# checks if user is admin
|
||||
key_user_id = os.getenv("PROXY_ADMIN_ID", litellm_proxy_admin_name)
|
||||
# Create UI token object
|
||||
returned_ui_token_object = create_ui_token_object(
|
||||
login_result=login_result,
|
||||
general_settings=general_settings,
|
||||
premium_user=premium_user,
|
||||
)
|
||||
|
||||
# Admin is Authe'd in - generate key for the UI to access Proxy
|
||||
# Generate JWT token
|
||||
import jwt
|
||||
|
||||
# ensure this user is set as the proxy admin, in this route there is no sso, we can assume this user is only the admin
|
||||
await user_update(
|
||||
data=UpdateUserRequest(
|
||||
user_id=key_user_id,
|
||||
user_role=user_role,
|
||||
),
|
||||
user_api_key_dict=UserAPIKeyAuth(
|
||||
user_role=LitellmUserRoles.PROXY_ADMIN,
|
||||
),
|
||||
)
|
||||
if os.getenv("DATABASE_URL") is not None:
|
||||
response = await generate_key_helper_fn(
|
||||
request_type="key",
|
||||
**{
|
||||
"user_role": LitellmUserRoles.PROXY_ADMIN,
|
||||
"duration": "24hr",
|
||||
"key_max_budget": litellm.max_ui_session_budget,
|
||||
"models": [],
|
||||
"aliases": {},
|
||||
"config": {},
|
||||
"spend": 0,
|
||||
"user_id": key_user_id,
|
||||
"team_id": "litellm-dashboard",
|
||||
}, # type: ignore
|
||||
)
|
||||
else:
|
||||
raise ProxyException(
|
||||
message="No Database connected. Set DATABASE_URL in .env. If set, use `--detailed_debug` to debug issue.",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="DATABASE_URL",
|
||||
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
)
|
||||
key = response["token"] # type: ignore
|
||||
litellm_dashboard_ui = get_custom_url(str(request.base_url))
|
||||
if litellm_dashboard_ui.endswith("/"):
|
||||
litellm_dashboard_ui += "ui/"
|
||||
else:
|
||||
litellm_dashboard_ui += "/ui/"
|
||||
import jwt
|
||||
jwt_token = jwt.encode( # type: ignore
|
||||
cast(dict, returned_ui_token_object),
|
||||
master_key,
|
||||
algorithm="HS256",
|
||||
)
|
||||
|
||||
if get_secret_bool("EXPERIMENTAL_UI_LOGIN"):
|
||||
user_info: Optional[LiteLLM_UserTable] = None
|
||||
if _user_row is not None:
|
||||
user_info = _user_row
|
||||
elif (
|
||||
user_id is not None
|
||||
): # if user_id is not None, we are using the UI_USERNAME and UI_PASSWORD
|
||||
user_info = LiteLLM_UserTable(
|
||||
user_id=user_id,
|
||||
user_role=user_role,
|
||||
models=[],
|
||||
max_budget=litellm.max_ui_session_budget,
|
||||
)
|
||||
if user_info is None:
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail={
|
||||
"error": "User Information is required for experimental UI login"
|
||||
},
|
||||
)
|
||||
|
||||
key = ExperimentalUIJWTToken.get_experimental_ui_login_jwt_auth_token(
|
||||
user_info
|
||||
)
|
||||
|
||||
returned_ui_token_object = ReturnedUITokenObject(
|
||||
user_id=user_id,
|
||||
key=key,
|
||||
user_email=None,
|
||||
user_role=user_role,
|
||||
login_method="username_password",
|
||||
premium_user=premium_user,
|
||||
auth_header_name=general_settings.get(
|
||||
"litellm_key_header_name", "Authorization"
|
||||
),
|
||||
disabled_non_admin_personal_key_creation=disabled_non_admin_personal_key_creation,
|
||||
server_root_path=get_server_root_path(),
|
||||
)
|
||||
|
||||
jwt_token = jwt.encode( # type: ignore
|
||||
cast(dict, returned_ui_token_object),
|
||||
master_key,
|
||||
algorithm="HS256",
|
||||
)
|
||||
litellm_dashboard_ui += "?login=success"
|
||||
redirect_response = RedirectResponse(url=litellm_dashboard_ui, status_code=303)
|
||||
redirect_response.set_cookie(key="token", value=jwt_token)
|
||||
return redirect_response
|
||||
elif _user_row is not None:
|
||||
"""
|
||||
When sharing invite links
|
||||
|
||||
-> if the user has no role in the DB assume they are only a viewer
|
||||
"""
|
||||
user_id = getattr(_user_row, "user_id", "unknown")
|
||||
user_role = getattr(
|
||||
_user_row, "user_role", LitellmUserRoles.INTERNAL_USER_VIEW_ONLY
|
||||
)
|
||||
user_email = getattr(_user_row, "user_email", "unknown")
|
||||
_password = getattr(_user_row, "password", "unknown")
|
||||
|
||||
if _password is None:
|
||||
raise ProxyException(
|
||||
message="User has no password set. Please set a password for the user via `/user/update`.",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="password",
|
||||
code=status.HTTP_401_UNAUTHORIZED,
|
||||
)
|
||||
|
||||
# check if password == _user_row.password
|
||||
hash_password = hash_token(token=password)
|
||||
if secrets.compare_digest(password, _password) or secrets.compare_digest(
|
||||
hash_password, _password
|
||||
):
|
||||
if os.getenv("DATABASE_URL") is not None:
|
||||
response = await generate_key_helper_fn(
|
||||
request_type="key",
|
||||
**{ # type: ignore
|
||||
"user_role": user_role,
|
||||
"duration": "24hr",
|
||||
"key_max_budget": litellm.max_ui_session_budget,
|
||||
"models": [],
|
||||
"aliases": {},
|
||||
"config": {},
|
||||
"spend": 0,
|
||||
"user_id": user_id,
|
||||
"team_id": "litellm-dashboard",
|
||||
},
|
||||
)
|
||||
else:
|
||||
raise ProxyException(
|
||||
message="No Database connected. Set DATABASE_URL in .env. If set, use `--detailed_debug` to debug issue.",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="DATABASE_URL",
|
||||
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
)
|
||||
key = response["token"] # type: ignore
|
||||
litellm_dashboard_ui = get_custom_url(str(request.base_url))
|
||||
if litellm_dashboard_ui.endswith("/"):
|
||||
litellm_dashboard_ui += "ui/"
|
||||
else:
|
||||
litellm_dashboard_ui += "/ui/"
|
||||
import jwt
|
||||
|
||||
returned_ui_token_object = ReturnedUITokenObject(
|
||||
user_id=user_id,
|
||||
key=key,
|
||||
user_email=user_email,
|
||||
user_role=cast(str, user_role),
|
||||
login_method="username_password",
|
||||
premium_user=premium_user,
|
||||
auth_header_name=general_settings.get(
|
||||
"litellm_key_header_name", "Authorization"
|
||||
),
|
||||
disabled_non_admin_personal_key_creation=disabled_non_admin_personal_key_creation,
|
||||
server_root_path=get_server_root_path(),
|
||||
)
|
||||
|
||||
jwt_token = jwt.encode( # type: ignore
|
||||
cast(dict, returned_ui_token_object),
|
||||
master_key,
|
||||
algorithm="HS256",
|
||||
)
|
||||
litellm_dashboard_ui += "?login=success"
|
||||
redirect_response = RedirectResponse(
|
||||
url=litellm_dashboard_ui, status_code=303
|
||||
)
|
||||
redirect_response.set_cookie(key="token", value=jwt_token)
|
||||
return redirect_response
|
||||
else:
|
||||
raise ProxyException(
|
||||
message=f"Invalid credentials used to access UI.\nNot valid credentials for {username}",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="invalid_credentials",
|
||||
code=status.HTTP_401_UNAUTHORIZED,
|
||||
)
|
||||
# Build redirect URL
|
||||
litellm_dashboard_ui = get_custom_url(str(request.base_url))
|
||||
if litellm_dashboard_ui.endswith("/"):
|
||||
litellm_dashboard_ui += "ui/"
|
||||
else:
|
||||
raise ProxyException(
|
||||
message="Invalid credentials used to access UI.\nCheck 'UI_USERNAME', 'UI_PASSWORD' in .env file",
|
||||
type=ProxyErrorTypes.auth_error,
|
||||
param="invalid_credentials",
|
||||
code=status.HTTP_401_UNAUTHORIZED,
|
||||
)
|
||||
litellm_dashboard_ui += "/ui/"
|
||||
litellm_dashboard_ui += "?login=success"
|
||||
|
||||
# Create redirect response with cookie
|
||||
redirect_response = RedirectResponse(url=litellm_dashboard_ui, status_code=303)
|
||||
redirect_response.set_cookie(key="token", value=jwt_token)
|
||||
return redirect_response
|
||||
|
||||
|
||||
@app.get("/onboarding/get_token", include_in_schema=False)
|
||||
|
||||
@@ -0,0 +1,284 @@
|
||||
"""
|
||||
Tests for login_utils module.
|
||||
|
||||
This module tests the refactored login logic that was moved from proxy_server.py
|
||||
to login_utils.py for better reusability.
|
||||
"""
|
||||
|
||||
import os
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from litellm.constants import LITELLM_PROXY_ADMIN_NAME
|
||||
from litellm.proxy._types import (
|
||||
LiteLLM_UserTable,
|
||||
LitellmUserRoles,
|
||||
ProxyErrorTypes,
|
||||
ProxyException,
|
||||
hash_token,
|
||||
)
|
||||
from litellm.proxy.auth.login_utils import (
|
||||
LoginResult,
|
||||
authenticate_user,
|
||||
get_ui_credentials,
|
||||
)
|
||||
|
||||
|
||||
def test_get_ui_credentials_prefers_explicit_password():
|
||||
"""The configured UI password should be returned when available."""
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{"UI_USERNAME": "test-admin", "UI_PASSWORD": "secure-pass"},
|
||||
clear=True,
|
||||
):
|
||||
username, password = get_ui_credentials(master_key="sk-123")
|
||||
|
||||
assert username == "test-admin"
|
||||
assert password == "secure-pass"
|
||||
|
||||
|
||||
def test_get_ui_credentials_can_use_master_key():
|
||||
"""Master key should be used as password when UI_PASSWORD is missing."""
|
||||
with patch.dict(os.environ, {"UI_USERNAME": "fallback-admin"}, clear=True):
|
||||
username, password = get_ui_credentials(master_key="fallback-key")
|
||||
|
||||
assert username == "fallback-admin"
|
||||
assert password == "fallback-key"
|
||||
|
||||
|
||||
def test_get_ui_credentials_requires_password():
|
||||
"""Missing UI password and master key results in error."""
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
with pytest.raises(ProxyException) as exc_info:
|
||||
get_ui_credentials(master_key=None)
|
||||
|
||||
assert exc_info.value.type == ProxyErrorTypes.auth_error
|
||||
assert exc_info.value.code == "500"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_authenticate_user_admin_login_with_ui_credentials():
|
||||
"""Test admin login using UI_USERNAME and UI_PASSWORD"""
|
||||
master_key = "sk-1234"
|
||||
ui_username = "admin"
|
||||
ui_password = "sk-1234"
|
||||
|
||||
mock_prisma_client = MagicMock()
|
||||
mock_prisma_client.db.litellm_usertable.find_first = AsyncMock(return_value=None)
|
||||
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"UI_USERNAME": ui_username,
|
||||
"UI_PASSWORD": ui_password,
|
||||
"DATABASE_URL": "postgresql://test:test@localhost/test",
|
||||
},
|
||||
):
|
||||
with patch(
|
||||
"litellm.proxy.auth.login_utils.generate_key_helper_fn",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_generate_key:
|
||||
mock_generate_key.return_value = {
|
||||
"token": "test-token-123",
|
||||
"user_id": LITELLM_PROXY_ADMIN_NAME,
|
||||
}
|
||||
|
||||
with patch(
|
||||
"litellm.proxy.auth.login_utils.user_update",
|
||||
new_callable=AsyncMock,
|
||||
return_value=None,
|
||||
) as mock_user_update:
|
||||
with patch(
|
||||
"litellm.proxy.auth.login_utils.get_secret_bool",
|
||||
return_value=False,
|
||||
):
|
||||
result = await authenticate_user(
|
||||
username=ui_username,
|
||||
password=ui_password,
|
||||
master_key=master_key,
|
||||
prisma_client=mock_prisma_client,
|
||||
)
|
||||
|
||||
assert isinstance(result, LoginResult)
|
||||
assert result.user_id == LITELLM_PROXY_ADMIN_NAME
|
||||
assert result.key == "test-token-123"
|
||||
assert result.user_email is None
|
||||
assert result.user_role == LitellmUserRoles.PROXY_ADMIN
|
||||
assert result.login_method == "username_password"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_authenticate_user_admin_login_with_master_key_as_password():
|
||||
"""Test admin login when UI_PASSWORD is not set, should use master_key"""
|
||||
master_key = "sk-1234"
|
||||
ui_username = "admin"
|
||||
|
||||
mock_prisma_client = MagicMock()
|
||||
mock_prisma_client.db.litellm_usertable.find_first = AsyncMock(return_value=None)
|
||||
|
||||
env_vars = {"UI_USERNAME": ui_username, "DATABASE_URL": "postgresql://test:test@localhost/test"}
|
||||
# Remove UI_PASSWORD to test fallback to master_key
|
||||
if "UI_PASSWORD" in os.environ:
|
||||
# Keep other env vars but don't set UI_PASSWORD
|
||||
pass
|
||||
else:
|
||||
# Ensure UI_PASSWORD is not in the patched env
|
||||
pass
|
||||
|
||||
with patch.dict(os.environ, env_vars, clear=False):
|
||||
# Explicitly remove UI_PASSWORD if it exists
|
||||
original_ui_password = os.environ.pop("UI_PASSWORD", None)
|
||||
try:
|
||||
with patch(
|
||||
"litellm.proxy.auth.login_utils.generate_key_helper_fn",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_generate_key:
|
||||
mock_generate_key.return_value = {
|
||||
"token": "test-token-123",
|
||||
"user_id": LITELLM_PROXY_ADMIN_NAME,
|
||||
}
|
||||
|
||||
with patch(
|
||||
"litellm.proxy.auth.login_utils.user_update",
|
||||
new_callable=AsyncMock,
|
||||
return_value=None,
|
||||
) as mock_user_update:
|
||||
with patch(
|
||||
"litellm.proxy.auth.login_utils.get_secret_bool",
|
||||
return_value=False,
|
||||
):
|
||||
result = await authenticate_user(
|
||||
username=ui_username,
|
||||
password=master_key,
|
||||
master_key=master_key,
|
||||
prisma_client=mock_prisma_client,
|
||||
)
|
||||
|
||||
assert isinstance(result, LoginResult)
|
||||
assert result.user_id == LITELLM_PROXY_ADMIN_NAME
|
||||
assert result.user_role == LitellmUserRoles.PROXY_ADMIN
|
||||
finally:
|
||||
if original_ui_password:
|
||||
os.environ["UI_PASSWORD"] = original_ui_password
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_authenticate_user_invalid_credentials():
|
||||
"""Test authentication failure with invalid credentials"""
|
||||
master_key = "sk-1234"
|
||||
ui_username = "admin"
|
||||
wrong_password = "wrong-password"
|
||||
|
||||
mock_prisma_client = MagicMock()
|
||||
mock_prisma_client.db.litellm_usertable.find_first = AsyncMock(return_value=None)
|
||||
|
||||
with patch.dict(os.environ, {"UI_USERNAME": ui_username, "UI_PASSWORD": "correct-password"}):
|
||||
with pytest.raises(ProxyException) as exc_info:
|
||||
await authenticate_user(
|
||||
username=ui_username,
|
||||
password=wrong_password,
|
||||
master_key=master_key,
|
||||
prisma_client=mock_prisma_client,
|
||||
)
|
||||
|
||||
assert exc_info.value.type == ProxyErrorTypes.auth_error
|
||||
assert exc_info.value.code == "401"
|
||||
assert "Invalid credentials" in exc_info.value.message
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_authenticate_user_missing_master_key():
|
||||
"""Test authentication failure when master_key is None"""
|
||||
mock_prisma_client = MagicMock()
|
||||
|
||||
with pytest.raises(ProxyException) as exc_info:
|
||||
await authenticate_user(
|
||||
username="admin",
|
||||
password="password",
|
||||
master_key=None,
|
||||
prisma_client=mock_prisma_client,
|
||||
)
|
||||
|
||||
assert exc_info.value.type == ProxyErrorTypes.auth_error
|
||||
assert exc_info.value.code == "500"
|
||||
assert "Master Key not set" in exc_info.value.message
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_authenticate_user_wrong_password():
|
||||
"""Test authentication failure with wrong password for database user"""
|
||||
master_key = "sk-1234"
|
||||
user_email = "test@example.com"
|
||||
correct_password = "correct-password"
|
||||
wrong_password = "wrong-password"
|
||||
hashed_password = hash_token(token=correct_password)
|
||||
|
||||
mock_user = LiteLLM_UserTable(
|
||||
user_id="test-user-123",
|
||||
user_email=user_email,
|
||||
password=hashed_password,
|
||||
user_role=LitellmUserRoles.INTERNAL_USER,
|
||||
)
|
||||
|
||||
mock_prisma_client = MagicMock()
|
||||
mock_prisma_client.db.litellm_usertable.find_first = AsyncMock(
|
||||
return_value=mock_user
|
||||
)
|
||||
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"DATABASE_URL": "postgresql://test:test@localhost/test",
|
||||
"UI_USERNAME": "admin",
|
||||
"UI_PASSWORD": "admin-password",
|
||||
},
|
||||
):
|
||||
with pytest.raises(ProxyException) as exc_info:
|
||||
await authenticate_user(
|
||||
username=user_email,
|
||||
password=wrong_password,
|
||||
master_key=master_key,
|
||||
prisma_client=mock_prisma_client,
|
||||
)
|
||||
|
||||
assert exc_info.value.type == ProxyErrorTypes.auth_error
|
||||
assert exc_info.value.code == "401"
|
||||
assert "Invalid credentials" in exc_info.value.message
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_authenticate_user_database_required_for_admin():
|
||||
"""Test that database is required for admin login"""
|
||||
master_key = "sk-1234"
|
||||
ui_username = "admin"
|
||||
ui_password = "sk-1234"
|
||||
|
||||
mock_prisma_client = MagicMock()
|
||||
mock_prisma_client.db.litellm_usertable.find_first = AsyncMock(return_value=None)
|
||||
|
||||
with patch.dict(os.environ, {"UI_USERNAME": ui_username, "UI_PASSWORD": ui_password}):
|
||||
with patch(
|
||||
"litellm.proxy.auth.login_utils.user_update",
|
||||
new_callable=AsyncMock,
|
||||
return_value=None,
|
||||
):
|
||||
# Remove DATABASE_URL to simulate no database
|
||||
original_db_url = os.environ.get("DATABASE_URL")
|
||||
if "DATABASE_URL" in os.environ:
|
||||
del os.environ["DATABASE_URL"]
|
||||
|
||||
try:
|
||||
with pytest.raises(ProxyException) as exc_info:
|
||||
await authenticate_user(
|
||||
username=ui_username,
|
||||
password=ui_password,
|
||||
master_key=master_key,
|
||||
prisma_client=mock_prisma_client,
|
||||
)
|
||||
|
||||
assert exc_info.value.type == ProxyErrorTypes.auth_error
|
||||
assert exc_info.value.code == "500"
|
||||
assert "No Database connected" in exc_info.value.message
|
||||
finally:
|
||||
if original_db_url:
|
||||
os.environ["DATABASE_URL"] = original_db_url
|
||||
Reference in New Issue
Block a user