feat: hashicorp vault rotate support

This commit is contained in:
Yuta Saito
2026-01-23 17:32:55 +09:00
parent 4381e7f98f
commit 695fbf4ec5
4 changed files with 768 additions and 4 deletions
@@ -150,12 +150,14 @@ class KeyManagementEventHooks:
existing_key_row.key_alias
or f"virtual-key-{existing_key_row.token}"
)
team_id = getattr(existing_key_row, "team_id", None)
await KeyManagementEventHooks._rotate_virtual_key_in_secret_manager(
current_secret_name=initial_secret_name,
new_secret_name=response.key_alias
or data.key_alias
or f"virtual-key-{response.token_id}",
new_secret_value=response.key,
team_id=team_id,
)
except Exception as e:
verbose_proxy_logger.warning(
@@ -286,14 +288,19 @@ class KeyManagementEventHooks:
@staticmethod
async def _rotate_virtual_key_in_secret_manager(
current_secret_name: str, new_secret_name: str, new_secret_value: str
current_secret_name: str,
new_secret_name: str,
new_secret_value: str,
team_id: Optional[str] = None,
):
"""
Update a virtual key in the secret manager
Args:
secret_name: Name of the virtual key
secret_token: Value of the virtual key (example: sk-1234)
current_secret_name: Current name of the virtual key
new_secret_name: New name of the virtual key
new_secret_value: New value of the virtual key (example: sk-1234)
team_id: Optional team ID to get team-specific secret manager settings
"""
if litellm._key_management_settings is not None:
if litellm._key_management_settings.store_virtual_keys is True:
@@ -303,6 +310,9 @@ class KeyManagementEventHooks:
# store the key in the secret manager
if isinstance(litellm.secret_manager_client, BaseSecretManager):
optional_params = await KeyManagementEventHooks._get_secret_manager_optional_params(
team_id
)
await litellm.secret_manager_client.async_rotate_secret(
current_secret_name=KeyManagementEventHooks._get_secret_name(
current_secret_name
@@ -311,6 +321,7 @@ class KeyManagementEventHooks:
new_secret_name
),
new_secret_value=new_secret_value,
optional_params=optional_params,
)
@staticmethod
@@ -444,7 +444,131 @@ class HashicorpSecretManager(BaseSecretManager):
optional_params: Dict | None = None,
timeout: float | httpx.Timeout | None = None,
) -> Dict:
raise NotImplementedError("Hashicorp does not support secret rotation")
"""
Rotates a secret by creating a new one and deleting the old one.
Uses _build_secret_target to handle optional_params for namespace, mount, path_prefix customization.
Args:
current_secret_name: Current name of the secret
new_secret_name: New name for the secret
new_secret_value: New value for the secret
optional_params: Additional parameters (namespace, mount, path_prefix, data)
timeout: Request timeout
Returns:
dict: Response containing status and details of the operation.
On success, returns the response from async_write_secret.
On error, returns {"status": "error", "message": "error message"}
"""
async_client = get_async_httpx_client(
llm_provider=httpxSpecialProvider.SecretManager,
params={"timeout": timeout},
)
try:
# First verify the old secret exists using _build_secret_target
current_target = self._build_secret_target(current_secret_name, optional_params)
try:
response = await async_client.get(
url=current_target["url"],
headers=self._get_request_headers(),
)
response.raise_for_status()
# Secret exists, we can proceed
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
verbose_logger.exception(f"Current secret {current_secret_name} not found")
return {"status": "error", "message": f"Current secret {current_secret_name} not found"}
verbose_logger.exception(
f"Error checking current secret existence: {e.response.text if hasattr(e, 'response') else str(e)}"
)
return {
"status": "error",
"message": f"HTTP error occurred while checking current secret: {e.response.text if hasattr(e, 'response') else str(e)}",
}
except Exception as e:
verbose_logger.exception(f"Error checking current secret existence: {e}")
return {"status": "error", "message": f"Error checking current secret: {e}"}
# Create new secret with new name and value
# Use _build_secret_target to handle optional_params
create_response = await self.async_write_secret(
secret_name=new_secret_name,
secret_value=new_secret_value,
description=f"Rotated from {current_secret_name}",
optional_params=optional_params,
timeout=timeout,
)
# Check if async_write_secret returned an error
if isinstance(create_response, dict) and create_response.get("status") == "error":
return create_response
# Verify new secret was created successfully using _build_secret_target
new_target = self._build_secret_target(new_secret_name, optional_params)
try:
response = await async_client.get(
url=new_target["url"],
headers=self._get_request_headers(),
)
response.raise_for_status()
json_resp = response.json()
# Use data_key from target to get the correct value
data_key = new_target["data_key"]
new_secret_value_from_vault = json_resp.get("data", {}).get("data", {}).get(data_key, None)
if new_secret_value_from_vault != new_secret_value:
verbose_logger.exception(
f"New secret value mismatch. Expected: {new_secret_value}, Got: {new_secret_value_from_vault}"
)
return {
"status": "error",
"message": f"New secret value mismatch. Expected: {new_secret_value}, Got: {new_secret_value_from_vault}",
}
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
verbose_logger.exception(f"Failed to verify new secret {new_secret_name}")
return {"status": "error", "message": f"Failed to verify new secret {new_secret_name}"}
verbose_logger.exception(
f"Error verifying new secret: {e.response.text if hasattr(e, 'response') else str(e)}"
)
return {
"status": "error",
"message": f"HTTP error occurred while verifying new secret: {e.response.text if hasattr(e, 'response') else str(e)}",
}
except Exception as e:
verbose_logger.exception(f"Error verifying new secret: {e}")
return {"status": "error", "message": f"Error verifying new secret: {e}"}
# If everything is successful, delete the old secret
# Only delete if the names are different (same name means we're just updating the value)
if current_secret_name != new_secret_name:
delete_response = await self.async_delete_secret(
secret_name=current_secret_name,
recovery_window_in_days=7, # Keep for recovery if needed
optional_params=optional_params,
timeout=timeout,
)
# Check if async_delete_secret returned an error
if isinstance(delete_response, dict) and delete_response.get("status") == "error":
# Log the error but don't fail the rotation since new secret was created successfully
verbose_logger.warning(
f"Failed to delete old secret {current_secret_name} after rotation: {delete_response.get('message')}"
)
else:
# Clear cache for the old secret only if deletion was successful
self.cache.delete_cache(current_secret_name)
# Clear cache for the new secret (or updated secret if names are the same)
self.cache.delete_cache(new_secret_name)
return create_response
except httpx.TimeoutException as e:
verbose_logger.exception("Timeout error occurred during secret rotation")
return {"status": "error", "message": "Timeout error occurred"}
except Exception as e:
verbose_logger.exception(f"Error rotating secret in Hashicorp Vault: {e}")
return {"status": "error", "message": str(e)}
async def async_delete_secret(
self,
+380
View File
@@ -380,3 +380,383 @@ def test_hashicorp_custom_mount_and_prefix(hashicorp_secret_manager):
hashicorp_secret_manager.vault_mount_name = original_mount
hashicorp_secret_manager.vault_path_prefix = original_prefix
hashicorp_secret_manager.vault_namespace = original_namespace
mock_old_vault_response = {
"request_id": "80fafb6a-e96a-4c5b-29fa-ff505ac72201",
"lease_id": "",
"renewable": False,
"lease_duration": 0,
"data": {
"data": {"key": "old-secret-value"},
"metadata": {
"created_time": "2025-01-01T22:13:50.93942388Z",
"custom_metadata": None,
"deletion_time": "",
"destroyed": False,
"version": 1,
},
},
"wrap_info": None,
"warnings": None,
"auth": None,
"mount_type": "kv",
}
mock_new_vault_response = {
"request_id": "80fafb6a-e96a-4c5b-29fa-ff505ac72201",
"lease_id": "",
"renewable": False,
"lease_duration": 0,
"data": {
"data": {"key": "new-secret-value"},
"metadata": {
"created_time": "2025-01-02T22:13:50.93942388Z",
"custom_metadata": None,
"deletion_time": "",
"destroyed": False,
"version": 1,
},
},
"wrap_info": None,
"warnings": None,
"auth": None,
"mount_type": "kv",
}
@pytest.mark.asyncio
async def test_hashicorp_secret_manager_rotate_secret_different_names(hashicorp_secret_manager):
"""Test rotating a secret with different names (create new, delete old)."""
with patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.get"
) as mock_get, patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.post"
) as mock_post, patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.delete"
) as mock_delete:
# Mock GET for current secret check
mock_get_response_current = MagicMock()
mock_get_response_current.json.return_value = mock_old_vault_response
mock_get_response_current.raise_for_status.return_value = None
# Mock POST for creating new secret
mock_post_response = MagicMock()
mock_post_response.json.return_value = mock_write_response
mock_post_response.raise_for_status.return_value = None
# Mock GET for verifying new secret
mock_get_response_new = MagicMock()
mock_get_response_new.json.return_value = mock_new_vault_response
mock_get_response_new.raise_for_status.return_value = None
# Mock DELETE for deleting old secret
mock_delete_response = MagicMock()
mock_delete_response.raise_for_status.return_value = None
# Configure mock return values
mock_get.side_effect = [mock_get_response_current, mock_get_response_new]
mock_post.return_value = mock_post_response
mock_delete.return_value = mock_delete_response
current_secret_name = f"old-secret-{uuid.uuid4()}"
new_secret_name = f"new-secret-{uuid.uuid4()}"
new_secret_value = "new-secret-value"
response = await hashicorp_secret_manager.async_rotate_secret(
current_secret_name=current_secret_name,
new_secret_name=new_secret_name,
new_secret_value=new_secret_value,
)
# Verify response
assert response == mock_write_response
# Verify GET was called twice (check current, verify new)
assert mock_get.call_count == 2
# Verify POST was called once (create new secret)
mock_post.assert_called_once()
# Verify DELETE was called once (delete old secret)
mock_delete.assert_called_once()
# Verify URLs
get_calls = mock_get.call_args_list
assert current_secret_name in get_calls[0][1]["url"]
assert new_secret_name in get_calls[1][1]["url"]
delete_url = mock_delete.call_args[1]["url"]
assert current_secret_name in delete_url
@pytest.mark.asyncio
async def test_hashicorp_secret_manager_rotate_secret_same_name(hashicorp_secret_manager):
"""Test rotating a secret with the same name (update value only, no delete)."""
with patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.get"
) as mock_get, patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.post"
) as mock_post, patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.delete"
) as mock_delete:
# Mock GET for current secret check
mock_get_response_current = MagicMock()
mock_get_response_current.json.return_value = mock_old_vault_response
mock_get_response_current.raise_for_status.return_value = None
# Mock POST for updating secret
mock_post_response = MagicMock()
mock_post_response.json.return_value = mock_write_response
mock_post_response.raise_for_status.return_value = None
# Mock GET for verifying updated secret - use updated value
mock_get_response_new = MagicMock()
mock_updated_vault_response = {
"request_id": "80fafb6a-e96a-4c5b-29fa-ff505ac72201",
"lease_id": "",
"renewable": False,
"lease_duration": 0,
"data": {
"data": {"key": "updated-secret-value"},
"metadata": {
"created_time": "2025-01-02T22:13:50.93942388Z",
"custom_metadata": None,
"deletion_time": "",
"destroyed": False,
"version": 1,
},
},
"wrap_info": None,
"warnings": None,
"auth": None,
"mount_type": "kv",
}
mock_get_response_new.json.return_value = mock_updated_vault_response
mock_get_response_new.raise_for_status.return_value = None
# Configure mock return values
mock_get.side_effect = [mock_get_response_current, mock_get_response_new]
mock_post.return_value = mock_post_response
secret_name = f"same-secret-{uuid.uuid4()}"
new_secret_value = "updated-secret-value"
response = await hashicorp_secret_manager.async_rotate_secret(
current_secret_name=secret_name,
new_secret_name=secret_name, # Same name
new_secret_value=new_secret_value,
)
# Verify response
assert response == mock_write_response
# Verify GET was called twice (check current, verify new)
assert mock_get.call_count == 2
# Verify POST was called once (update secret)
mock_post.assert_called_once()
# Verify DELETE was NOT called (same name means no delete)
mock_delete.assert_not_called()
@pytest.mark.asyncio
async def test_hashicorp_secret_manager_rotate_secret_current_not_found(hashicorp_secret_manager):
"""Test rotating a secret when current secret doesn't exist."""
with patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.get"
) as mock_get:
# Mock GET to return 404
mock_404_response = MagicMock()
mock_404_response.status_code = 404
mock_404_response.text = "Not Found"
http_error = httpx.HTTPStatusError(
"Not Found",
request=MagicMock(),
response=mock_404_response,
)
mock_get.side_effect = http_error
current_secret_name = f"non-existent-{uuid.uuid4()}"
new_secret_name = f"new-secret-{uuid.uuid4()}"
response = await hashicorp_secret_manager.async_rotate_secret(
current_secret_name=current_secret_name,
new_secret_name=new_secret_name,
new_secret_value="new-value",
)
# Verify error response
assert response["status"] == "error"
assert current_secret_name in response["message"]
assert "not found" in response["message"].lower()
@pytest.mark.asyncio
async def test_hashicorp_secret_manager_rotate_secret_write_fails(hashicorp_secret_manager):
"""Test rotating a secret when write fails."""
with patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.get"
) as mock_get, patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.post"
) as mock_post:
# Mock GET for current secret check
mock_get_response_current = MagicMock()
mock_get_response_current.json.return_value = mock_old_vault_response
mock_get_response_current.raise_for_status.return_value = None
mock_get.return_value = mock_get_response_current
# Mock POST to return error
mock_post_response = MagicMock()
mock_post_response.json.return_value = {"status": "error", "message": "Write failed"}
mock_post.return_value = mock_post_response
current_secret_name = f"old-secret-{uuid.uuid4()}"
new_secret_name = f"new-secret-{uuid.uuid4()}"
response = await hashicorp_secret_manager.async_rotate_secret(
current_secret_name=current_secret_name,
new_secret_name=new_secret_name,
new_secret_value="new-value",
)
# Verify error response
assert response["status"] == "error"
assert "Write failed" in response["message"]
@pytest.mark.asyncio
async def test_hashicorp_secret_manager_rotate_secret_with_team_overrides(hashicorp_secret_manager):
"""Test rotating a secret with optional_params (team settings)."""
with patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.get"
) as mock_get, patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.post"
) as mock_post, patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.delete"
) as mock_delete:
# Mock GET for current secret check
mock_get_response_current = MagicMock()
mock_get_response_current.json.return_value = mock_old_vault_response
mock_get_response_current.raise_for_status.return_value = None
# Mock POST for creating new secret
mock_post_response = MagicMock()
mock_post_response.json.return_value = mock_write_response
mock_post_response.raise_for_status.return_value = None
# Mock GET for verifying new secret - use password key for team settings
mock_get_response_new = MagicMock()
mock_team_vault_response = {
"request_id": "80fafb6a-e96a-4c5b-29fa-ff505ac72201",
"lease_id": "",
"renewable": False,
"lease_duration": 0,
"data": {
"data": {"password": "new-team-secret-value"}, # Use password key
"metadata": {
"created_time": "2025-01-02T22:13:50.93942388Z",
"custom_metadata": None,
"deletion_time": "",
"destroyed": False,
"version": 1,
},
},
"wrap_info": None,
"warnings": None,
"auth": None,
"mount_type": "kv",
}
mock_get_response_new.json.return_value = mock_team_vault_response
mock_get_response_new.raise_for_status.return_value = None
# Mock DELETE for deleting old secret
mock_delete_response = MagicMock()
mock_delete_response.raise_for_status.return_value = None
# Configure mock return values
mock_get.side_effect = [mock_get_response_current, mock_get_response_new]
mock_post.return_value = mock_post_response
mock_delete.return_value = mock_delete_response
team_settings = {
"secret_manager_settings": {
"namespace": "team-namespace",
"mount": "kv-team",
"path_prefix": "teams/custom",
"data": "password",
}
}
current_secret_name = "team-old-secret"
new_secret_name = "team-new-secret"
new_secret_value = "new-team-secret-value"
response = await hashicorp_secret_manager.async_rotate_secret(
current_secret_name=current_secret_name,
new_secret_name=new_secret_name,
new_secret_value=new_secret_value,
optional_params=team_settings,
)
# Verify response
assert response == mock_write_response
# Verify URLs use team settings
get_calls = mock_get.call_args_list
assert "team-namespace" in get_calls[0][1]["url"]
assert "kv-team" in get_calls[0][1]["url"]
assert "teams/custom" in get_calls[0][1]["url"]
delete_url = mock_delete.call_args[1]["url"]
assert "team-namespace" in delete_url
assert "kv-team" in delete_url
@pytest.mark.asyncio
async def test_hashicorp_secret_manager_rotate_secret_value_mismatch(hashicorp_secret_manager):
"""Test rotating a secret when verification shows value mismatch."""
with patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.get"
) as mock_get, patch(
"litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.post"
) as mock_post:
# Mock GET for current secret check
mock_get_response_current = MagicMock()
mock_get_response_current.json.return_value = mock_old_vault_response
mock_get_response_current.raise_for_status.return_value = None
# Mock POST for creating new secret
mock_post_response = MagicMock()
mock_post_response.json.return_value = mock_write_response
mock_post_response.raise_for_status.return_value = None
# Mock GET for verifying new secret - return different value
mock_get_response_new = MagicMock()
mock_get_response_new.json.return_value = {
"data": {
"data": {"key": "different-value"}, # Different from expected
}
}
mock_get_response_new.raise_for_status.return_value = None
# Configure mock return values
mock_get.side_effect = [mock_get_response_current, mock_get_response_new]
mock_post.return_value = mock_post_response
current_secret_name = f"old-secret-{uuid.uuid4()}"
new_secret_name = f"new-secret-{uuid.uuid4()}"
new_secret_value = "expected-value"
response = await hashicorp_secret_manager.async_rotate_secret(
current_secret_name=current_secret_name,
new_secret_name=new_secret_name,
new_secret_value=new_secret_value,
)
# Verify error response
assert response["status"] == "error"
assert "mismatch" in response["message"].lower()
assert "expected-value" in response["message"]
@@ -139,3 +139,252 @@ class TestKeyManagementEventHooksIndependentOperations:
# Email should have been called despite secret manager failure
assert email_called["called"] is True
class TestRotateVirtualKeyInSecretManager:
"""Tests for _rotate_virtual_key_in_secret_manager with team_id support."""
@pytest.mark.asyncio
async def test_rotate_virtual_key_with_team_id(self):
"""Test that team_id is passed to async_rotate_secret."""
from litellm.types.secret_managers.main import KeyManagementSystem, KeyManagementSettings
from litellm.secret_managers.base_secret_manager import BaseSecretManager
import litellm
# Setup - Create a mock that inherits from BaseSecretManager
mock_secret_manager = MagicMock(spec=BaseSecretManager)
mock_secret_manager.async_rotate_secret = AsyncMock(return_value={"status": "success"})
litellm.secret_manager_client = mock_secret_manager
litellm._key_management_system = KeyManagementSystem.HASHICORP_VAULT
litellm._key_management_settings = KeyManagementSettings(
store_virtual_keys=True,
prefix_for_stored_virtual_keys="litellm/",
)
current_secret_name = "virtual-key-old"
new_secret_name = "virtual-key-new"
new_secret_value = "sk-new-key-value"
team_id = "team-123"
# Mock _get_secret_manager_optional_params to return team settings
team_settings = {
"namespace": "team-namespace",
"mount": "kv-team",
"path_prefix": "teams/custom",
}
# Patch isinstance in the key_management_event_hooks module to return True for BaseSecretManager check
import builtins
original_isinstance = builtins.isinstance
def mock_isinstance(obj, cls):
if cls == BaseSecretManager and obj == mock_secret_manager:
return True
return original_isinstance(obj, cls)
with patch.object(
KeyManagementEventHooks,
"_get_secret_manager_optional_params",
return_value=team_settings,
) as mock_get_params, patch(
"litellm.proxy.hooks.key_management_event_hooks.isinstance",
side_effect=mock_isinstance
):
await KeyManagementEventHooks._rotate_virtual_key_in_secret_manager(
current_secret_name=current_secret_name,
new_secret_name=new_secret_name,
new_secret_value=new_secret_value,
team_id=team_id,
)
# Verify _get_secret_manager_optional_params was called with team_id
mock_get_params.assert_called_once_with(team_id)
# Verify async_rotate_secret was called with correct parameters
mock_secret_manager.async_rotate_secret.assert_called_once()
call_kwargs = mock_secret_manager.async_rotate_secret.call_args[1]
# Verify secret names have prefix
assert call_kwargs["current_secret_name"] == "litellm/virtual-key-old"
assert call_kwargs["new_secret_name"] == "litellm/virtual-key-new"
assert call_kwargs["new_secret_value"] == new_secret_value
assert call_kwargs["optional_params"] == team_settings
@pytest.mark.asyncio
async def test_rotate_virtual_key_without_team_id(self):
"""Test that None team_id is handled correctly."""
from litellm.types.secret_managers.main import KeyManagementSystem, KeyManagementSettings
from litellm.secret_managers.base_secret_manager import BaseSecretManager
import litellm
# Setup - Create a mock that inherits from BaseSecretManager
mock_secret_manager = MagicMock(spec=BaseSecretManager)
mock_secret_manager.async_rotate_secret = AsyncMock(return_value={"status": "success"})
litellm.secret_manager_client = mock_secret_manager
litellm._key_management_system = KeyManagementSystem.HASHICORP_VAULT
litellm._key_management_settings = KeyManagementSettings(
store_virtual_keys=True,
prefix_for_stored_virtual_keys="litellm/",
)
current_secret_name = "virtual-key-old"
new_secret_name = "virtual-key-new"
new_secret_value = "sk-new-key-value"
# Patch isinstance in the key_management_event_hooks module to return True for BaseSecretManager check
import builtins
original_isinstance = builtins.isinstance
def mock_isinstance(obj, cls):
if cls == BaseSecretManager and obj == mock_secret_manager:
return True
return original_isinstance(obj, cls)
# Mock _get_secret_manager_optional_params to return None (no team settings)
with patch.object(
KeyManagementEventHooks,
"_get_secret_manager_optional_params",
return_value=None,
) as mock_get_params, patch(
"litellm.proxy.hooks.key_management_event_hooks.isinstance",
side_effect=mock_isinstance
):
await KeyManagementEventHooks._rotate_virtual_key_in_secret_manager(
current_secret_name=current_secret_name,
new_secret_name=new_secret_name,
new_secret_value=new_secret_value,
team_id=None,
)
# Verify _get_secret_manager_optional_params was called with None
mock_get_params.assert_called_once_with(None)
# Verify async_rotate_secret was called with None optional_params
mock_secret_manager.async_rotate_secret.assert_called_once()
call_kwargs = mock_secret_manager.async_rotate_secret.call_args[1]
assert call_kwargs["optional_params"] is None
@pytest.mark.asyncio
async def test_rotate_virtual_key_in_key_rotated_hook(self):
"""Test that async_key_rotated_hook passes team_id to _rotate_virtual_key_in_secret_manager."""
from litellm.proxy._types import LiteLLM_VerificationToken, GenerateKeyResponse, RegenerateKeyRequest
from litellm.types.secret_managers.main import KeyManagementSystem, KeyManagementSettings
import litellm
# Setup
mock_secret_manager = MagicMock()
mock_secret_manager.async_rotate_secret = AsyncMock(return_value={"status": "success"})
litellm.secret_manager_client = mock_secret_manager
litellm._key_management_system = KeyManagementSystem.HASHICORP_VAULT
litellm._key_management_settings = KeyManagementSettings(
store_virtual_keys=True,
prefix_for_stored_virtual_keys="litellm/",
)
# Create mock existing key row with team_id
existing_key_row = LiteLLM_VerificationToken(
token="sk-old-key",
key_alias="test-key-alias",
team_id="team-456",
)
# Create mock response
response = GenerateKeyResponse(
token_id="token-new-123",
key="sk-new-key",
key_alias="test-key-alias-new",
)
# Create mock request
data = RegenerateKeyRequest(
key="sk-old-key",
key_alias="test-key-alias-new",
)
mock_user_api_key_dict = MagicMock()
# Mock _rotate_virtual_key_in_secret_manager to track calls
with patch.object(
KeyManagementEventHooks,
"_rotate_virtual_key_in_secret_manager",
new_callable=AsyncMock,
) as mock_rotate, patch(
"litellm.store_audit_logs", False
), patch.object(
KeyManagementEventHooks,
"_send_key_rotated_email",
new_callable=AsyncMock,
):
await KeyManagementEventHooks.async_key_rotated_hook(
data=data,
existing_key_row=existing_key_row,
response=response,
user_api_key_dict=mock_user_api_key_dict,
)
# Verify _rotate_virtual_key_in_secret_manager was called
mock_rotate.assert_called_once()
call_kwargs = mock_rotate.call_args[1]
# Verify team_id was passed
assert call_kwargs["team_id"] == "team-456"
assert call_kwargs["current_secret_name"] == "test-key-alias"
assert call_kwargs["new_secret_name"] == "test-key-alias-new"
assert call_kwargs["new_secret_value"] == "sk-new-key"
@pytest.mark.asyncio
async def test_rotate_virtual_key_when_store_virtual_keys_disabled(self):
"""Test that rotation is skipped when store_virtual_keys is False."""
from litellm.types.secret_managers.main import KeyManagementSystem, KeyManagementSettings
import litellm
# Setup
mock_secret_manager = MagicMock()
mock_secret_manager.async_rotate_secret = AsyncMock()
litellm.secret_manager_client = mock_secret_manager
litellm._key_management_system = KeyManagementSystem.HASHICORP_VAULT
litellm._key_management_settings = KeyManagementSettings(
store_virtual_keys=False, # Disabled
prefix_for_stored_virtual_keys="litellm/",
)
await KeyManagementEventHooks._rotate_virtual_key_in_secret_manager(
current_secret_name="old-key",
new_secret_name="new-key",
new_secret_value="sk-new-value",
team_id="team-123",
)
# Verify async_rotate_secret was NOT called
mock_secret_manager.async_rotate_secret.assert_not_called()
@pytest.mark.asyncio
async def test_rotate_virtual_key_when_secret_manager_not_set(self):
"""Test that rotation is skipped when secret_manager_client is None."""
from litellm.types.secret_managers.main import KeyManagementSettings
import litellm
# Setup
litellm.secret_manager_client = None
litellm._key_management_settings = KeyManagementSettings(
store_virtual_keys=True,
prefix_for_stored_virtual_keys="litellm/",
)
mock_secret_manager = MagicMock()
mock_secret_manager.async_rotate_secret = AsyncMock()
# Should not raise an error, just skip
await KeyManagementEventHooks._rotate_virtual_key_in_secret_manager(
current_secret_name="old-key",
new_secret_name="new-key",
new_secret_value="sk-new-value",
team_id="team-123",
)
# Verify async_rotate_secret was NOT called
mock_secret_manager.async_rotate_secret.assert_not_called()