diff --git a/litellm/proxy/hooks/key_management_event_hooks.py b/litellm/proxy/hooks/key_management_event_hooks.py index 50f8b2a3de..a8325d3461 100644 --- a/litellm/proxy/hooks/key_management_event_hooks.py +++ b/litellm/proxy/hooks/key_management_event_hooks.py @@ -150,12 +150,14 @@ class KeyManagementEventHooks: existing_key_row.key_alias or f"virtual-key-{existing_key_row.token}" ) + team_id = getattr(existing_key_row, "team_id", None) await KeyManagementEventHooks._rotate_virtual_key_in_secret_manager( current_secret_name=initial_secret_name, new_secret_name=response.key_alias or data.key_alias or f"virtual-key-{response.token_id}", new_secret_value=response.key, + team_id=team_id, ) except Exception as e: verbose_proxy_logger.warning( @@ -286,14 +288,19 @@ class KeyManagementEventHooks: @staticmethod async def _rotate_virtual_key_in_secret_manager( - current_secret_name: str, new_secret_name: str, new_secret_value: str + current_secret_name: str, + new_secret_name: str, + new_secret_value: str, + team_id: Optional[str] = None, ): """ Update a virtual key in the secret manager Args: - secret_name: Name of the virtual key - secret_token: Value of the virtual key (example: sk-1234) + current_secret_name: Current name of the virtual key + new_secret_name: New name of the virtual key + new_secret_value: New value of the virtual key (example: sk-1234) + team_id: Optional team ID to get team-specific secret manager settings """ if litellm._key_management_settings is not None: if litellm._key_management_settings.store_virtual_keys is True: @@ -303,6 +310,9 @@ class KeyManagementEventHooks: # store the key in the secret manager if isinstance(litellm.secret_manager_client, BaseSecretManager): + optional_params = await KeyManagementEventHooks._get_secret_manager_optional_params( + team_id + ) await litellm.secret_manager_client.async_rotate_secret( current_secret_name=KeyManagementEventHooks._get_secret_name( current_secret_name @@ -311,6 +321,7 @@ class KeyManagementEventHooks: new_secret_name ), new_secret_value=new_secret_value, + optional_params=optional_params, ) @staticmethod diff --git a/litellm/secret_managers/hashicorp_secret_manager.py b/litellm/secret_managers/hashicorp_secret_manager.py index cad9ccc7a9..dac0397dd9 100644 --- a/litellm/secret_managers/hashicorp_secret_manager.py +++ b/litellm/secret_managers/hashicorp_secret_manager.py @@ -444,7 +444,131 @@ class HashicorpSecretManager(BaseSecretManager): optional_params: Dict | None = None, timeout: float | httpx.Timeout | None = None, ) -> Dict: - raise NotImplementedError("Hashicorp does not support secret rotation") + """ + Rotates a secret by creating a new one and deleting the old one. + Uses _build_secret_target to handle optional_params for namespace, mount, path_prefix customization. + + Args: + current_secret_name: Current name of the secret + new_secret_name: New name for the secret + new_secret_value: New value for the secret + optional_params: Additional parameters (namespace, mount, path_prefix, data) + timeout: Request timeout + + Returns: + dict: Response containing status and details of the operation. + On success, returns the response from async_write_secret. + On error, returns {"status": "error", "message": "error message"} + """ + async_client = get_async_httpx_client( + llm_provider=httpxSpecialProvider.SecretManager, + params={"timeout": timeout}, + ) + + try: + # First verify the old secret exists using _build_secret_target + current_target = self._build_secret_target(current_secret_name, optional_params) + try: + response = await async_client.get( + url=current_target["url"], + headers=self._get_request_headers(), + ) + response.raise_for_status() + # Secret exists, we can proceed + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + verbose_logger.exception(f"Current secret {current_secret_name} not found") + return {"status": "error", "message": f"Current secret {current_secret_name} not found"} + verbose_logger.exception( + f"Error checking current secret existence: {e.response.text if hasattr(e, 'response') else str(e)}" + ) + return { + "status": "error", + "message": f"HTTP error occurred while checking current secret: {e.response.text if hasattr(e, 'response') else str(e)}", + } + except Exception as e: + verbose_logger.exception(f"Error checking current secret existence: {e}") + return {"status": "error", "message": f"Error checking current secret: {e}"} + + # Create new secret with new name and value + # Use _build_secret_target to handle optional_params + create_response = await self.async_write_secret( + secret_name=new_secret_name, + secret_value=new_secret_value, + description=f"Rotated from {current_secret_name}", + optional_params=optional_params, + timeout=timeout, + ) + + # Check if async_write_secret returned an error + if isinstance(create_response, dict) and create_response.get("status") == "error": + return create_response + + # Verify new secret was created successfully using _build_secret_target + new_target = self._build_secret_target(new_secret_name, optional_params) + try: + response = await async_client.get( + url=new_target["url"], + headers=self._get_request_headers(), + ) + response.raise_for_status() + json_resp = response.json() + # Use data_key from target to get the correct value + data_key = new_target["data_key"] + new_secret_value_from_vault = json_resp.get("data", {}).get("data", {}).get(data_key, None) + if new_secret_value_from_vault != new_secret_value: + verbose_logger.exception( + f"New secret value mismatch. Expected: {new_secret_value}, Got: {new_secret_value_from_vault}" + ) + return { + "status": "error", + "message": f"New secret value mismatch. Expected: {new_secret_value}, Got: {new_secret_value_from_vault}", + } + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + verbose_logger.exception(f"Failed to verify new secret {new_secret_name}") + return {"status": "error", "message": f"Failed to verify new secret {new_secret_name}"} + verbose_logger.exception( + f"Error verifying new secret: {e.response.text if hasattr(e, 'response') else str(e)}" + ) + return { + "status": "error", + "message": f"HTTP error occurred while verifying new secret: {e.response.text if hasattr(e, 'response') else str(e)}", + } + except Exception as e: + verbose_logger.exception(f"Error verifying new secret: {e}") + return {"status": "error", "message": f"Error verifying new secret: {e}"} + + # If everything is successful, delete the old secret + # Only delete if the names are different (same name means we're just updating the value) + if current_secret_name != new_secret_name: + delete_response = await self.async_delete_secret( + secret_name=current_secret_name, + recovery_window_in_days=7, # Keep for recovery if needed + optional_params=optional_params, + timeout=timeout, + ) + # Check if async_delete_secret returned an error + if isinstance(delete_response, dict) and delete_response.get("status") == "error": + # Log the error but don't fail the rotation since new secret was created successfully + verbose_logger.warning( + f"Failed to delete old secret {current_secret_name} after rotation: {delete_response.get('message')}" + ) + else: + # Clear cache for the old secret only if deletion was successful + self.cache.delete_cache(current_secret_name) + + # Clear cache for the new secret (or updated secret if names are the same) + self.cache.delete_cache(new_secret_name) + + return create_response + + except httpx.TimeoutException as e: + verbose_logger.exception("Timeout error occurred during secret rotation") + return {"status": "error", "message": "Timeout error occurred"} + except Exception as e: + verbose_logger.exception(f"Error rotating secret in Hashicorp Vault: {e}") + return {"status": "error", "message": str(e)} async def async_delete_secret( self, diff --git a/tests/litellm_utils_tests/test_hashicorp.py b/tests/litellm_utils_tests/test_hashicorp.py index 4f3536f9bf..e4d69da6ac 100644 --- a/tests/litellm_utils_tests/test_hashicorp.py +++ b/tests/litellm_utils_tests/test_hashicorp.py @@ -380,3 +380,383 @@ def test_hashicorp_custom_mount_and_prefix(hashicorp_secret_manager): hashicorp_secret_manager.vault_mount_name = original_mount hashicorp_secret_manager.vault_path_prefix = original_prefix hashicorp_secret_manager.vault_namespace = original_namespace + + +mock_old_vault_response = { + "request_id": "80fafb6a-e96a-4c5b-29fa-ff505ac72201", + "lease_id": "", + "renewable": False, + "lease_duration": 0, + "data": { + "data": {"key": "old-secret-value"}, + "metadata": { + "created_time": "2025-01-01T22:13:50.93942388Z", + "custom_metadata": None, + "deletion_time": "", + "destroyed": False, + "version": 1, + }, + }, + "wrap_info": None, + "warnings": None, + "auth": None, + "mount_type": "kv", +} + +mock_new_vault_response = { + "request_id": "80fafb6a-e96a-4c5b-29fa-ff505ac72201", + "lease_id": "", + "renewable": False, + "lease_duration": 0, + "data": { + "data": {"key": "new-secret-value"}, + "metadata": { + "created_time": "2025-01-02T22:13:50.93942388Z", + "custom_metadata": None, + "deletion_time": "", + "destroyed": False, + "version": 1, + }, + }, + "wrap_info": None, + "warnings": None, + "auth": None, + "mount_type": "kv", +} + + +@pytest.mark.asyncio +async def test_hashicorp_secret_manager_rotate_secret_different_names(hashicorp_secret_manager): + """Test rotating a secret with different names (create new, delete old).""" + with patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.get" + ) as mock_get, patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.post" + ) as mock_post, patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.delete" + ) as mock_delete: + # Mock GET for current secret check + mock_get_response_current = MagicMock() + mock_get_response_current.json.return_value = mock_old_vault_response + mock_get_response_current.raise_for_status.return_value = None + + # Mock POST for creating new secret + mock_post_response = MagicMock() + mock_post_response.json.return_value = mock_write_response + mock_post_response.raise_for_status.return_value = None + + # Mock GET for verifying new secret + mock_get_response_new = MagicMock() + mock_get_response_new.json.return_value = mock_new_vault_response + mock_get_response_new.raise_for_status.return_value = None + + # Mock DELETE for deleting old secret + mock_delete_response = MagicMock() + mock_delete_response.raise_for_status.return_value = None + + # Configure mock return values + mock_get.side_effect = [mock_get_response_current, mock_get_response_new] + mock_post.return_value = mock_post_response + mock_delete.return_value = mock_delete_response + + current_secret_name = f"old-secret-{uuid.uuid4()}" + new_secret_name = f"new-secret-{uuid.uuid4()}" + new_secret_value = "new-secret-value" + + response = await hashicorp_secret_manager.async_rotate_secret( + current_secret_name=current_secret_name, + new_secret_name=new_secret_name, + new_secret_value=new_secret_value, + ) + + # Verify response + assert response == mock_write_response + + # Verify GET was called twice (check current, verify new) + assert mock_get.call_count == 2 + + # Verify POST was called once (create new secret) + mock_post.assert_called_once() + + # Verify DELETE was called once (delete old secret) + mock_delete.assert_called_once() + + # Verify URLs + get_calls = mock_get.call_args_list + assert current_secret_name in get_calls[0][1]["url"] + assert new_secret_name in get_calls[1][1]["url"] + + delete_url = mock_delete.call_args[1]["url"] + assert current_secret_name in delete_url + + +@pytest.mark.asyncio +async def test_hashicorp_secret_manager_rotate_secret_same_name(hashicorp_secret_manager): + """Test rotating a secret with the same name (update value only, no delete).""" + with patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.get" + ) as mock_get, patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.post" + ) as mock_post, patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.delete" + ) as mock_delete: + # Mock GET for current secret check + mock_get_response_current = MagicMock() + mock_get_response_current.json.return_value = mock_old_vault_response + mock_get_response_current.raise_for_status.return_value = None + + # Mock POST for updating secret + mock_post_response = MagicMock() + mock_post_response.json.return_value = mock_write_response + mock_post_response.raise_for_status.return_value = None + + # Mock GET for verifying updated secret - use updated value + mock_get_response_new = MagicMock() + mock_updated_vault_response = { + "request_id": "80fafb6a-e96a-4c5b-29fa-ff505ac72201", + "lease_id": "", + "renewable": False, + "lease_duration": 0, + "data": { + "data": {"key": "updated-secret-value"}, + "metadata": { + "created_time": "2025-01-02T22:13:50.93942388Z", + "custom_metadata": None, + "deletion_time": "", + "destroyed": False, + "version": 1, + }, + }, + "wrap_info": None, + "warnings": None, + "auth": None, + "mount_type": "kv", + } + mock_get_response_new.json.return_value = mock_updated_vault_response + mock_get_response_new.raise_for_status.return_value = None + + # Configure mock return values + mock_get.side_effect = [mock_get_response_current, mock_get_response_new] + mock_post.return_value = mock_post_response + + secret_name = f"same-secret-{uuid.uuid4()}" + new_secret_value = "updated-secret-value" + + response = await hashicorp_secret_manager.async_rotate_secret( + current_secret_name=secret_name, + new_secret_name=secret_name, # Same name + new_secret_value=new_secret_value, + ) + + # Verify response + assert response == mock_write_response + + # Verify GET was called twice (check current, verify new) + assert mock_get.call_count == 2 + + # Verify POST was called once (update secret) + mock_post.assert_called_once() + + # Verify DELETE was NOT called (same name means no delete) + mock_delete.assert_not_called() + + +@pytest.mark.asyncio +async def test_hashicorp_secret_manager_rotate_secret_current_not_found(hashicorp_secret_manager): + """Test rotating a secret when current secret doesn't exist.""" + with patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.get" + ) as mock_get: + # Mock GET to return 404 + mock_404_response = MagicMock() + mock_404_response.status_code = 404 + mock_404_response.text = "Not Found" + + http_error = httpx.HTTPStatusError( + "Not Found", + request=MagicMock(), + response=mock_404_response, + ) + mock_get.side_effect = http_error + + current_secret_name = f"non-existent-{uuid.uuid4()}" + new_secret_name = f"new-secret-{uuid.uuid4()}" + + response = await hashicorp_secret_manager.async_rotate_secret( + current_secret_name=current_secret_name, + new_secret_name=new_secret_name, + new_secret_value="new-value", + ) + + # Verify error response + assert response["status"] == "error" + assert current_secret_name in response["message"] + assert "not found" in response["message"].lower() + + +@pytest.mark.asyncio +async def test_hashicorp_secret_manager_rotate_secret_write_fails(hashicorp_secret_manager): + """Test rotating a secret when write fails.""" + with patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.get" + ) as mock_get, patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.post" + ) as mock_post: + # Mock GET for current secret check + mock_get_response_current = MagicMock() + mock_get_response_current.json.return_value = mock_old_vault_response + mock_get_response_current.raise_for_status.return_value = None + mock_get.return_value = mock_get_response_current + + # Mock POST to return error + mock_post_response = MagicMock() + mock_post_response.json.return_value = {"status": "error", "message": "Write failed"} + mock_post.return_value = mock_post_response + + current_secret_name = f"old-secret-{uuid.uuid4()}" + new_secret_name = f"new-secret-{uuid.uuid4()}" + + response = await hashicorp_secret_manager.async_rotate_secret( + current_secret_name=current_secret_name, + new_secret_name=new_secret_name, + new_secret_value="new-value", + ) + + # Verify error response + assert response["status"] == "error" + assert "Write failed" in response["message"] + + +@pytest.mark.asyncio +async def test_hashicorp_secret_manager_rotate_secret_with_team_overrides(hashicorp_secret_manager): + """Test rotating a secret with optional_params (team settings).""" + with patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.get" + ) as mock_get, patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.post" + ) as mock_post, patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.delete" + ) as mock_delete: + # Mock GET for current secret check + mock_get_response_current = MagicMock() + mock_get_response_current.json.return_value = mock_old_vault_response + mock_get_response_current.raise_for_status.return_value = None + + # Mock POST for creating new secret + mock_post_response = MagicMock() + mock_post_response.json.return_value = mock_write_response + mock_post_response.raise_for_status.return_value = None + + # Mock GET for verifying new secret - use password key for team settings + mock_get_response_new = MagicMock() + mock_team_vault_response = { + "request_id": "80fafb6a-e96a-4c5b-29fa-ff505ac72201", + "lease_id": "", + "renewable": False, + "lease_duration": 0, + "data": { + "data": {"password": "new-team-secret-value"}, # Use password key + "metadata": { + "created_time": "2025-01-02T22:13:50.93942388Z", + "custom_metadata": None, + "deletion_time": "", + "destroyed": False, + "version": 1, + }, + }, + "wrap_info": None, + "warnings": None, + "auth": None, + "mount_type": "kv", + } + mock_get_response_new.json.return_value = mock_team_vault_response + mock_get_response_new.raise_for_status.return_value = None + + # Mock DELETE for deleting old secret + mock_delete_response = MagicMock() + mock_delete_response.raise_for_status.return_value = None + + # Configure mock return values + mock_get.side_effect = [mock_get_response_current, mock_get_response_new] + mock_post.return_value = mock_post_response + mock_delete.return_value = mock_delete_response + + team_settings = { + "secret_manager_settings": { + "namespace": "team-namespace", + "mount": "kv-team", + "path_prefix": "teams/custom", + "data": "password", + } + } + + current_secret_name = "team-old-secret" + new_secret_name = "team-new-secret" + new_secret_value = "new-team-secret-value" + + response = await hashicorp_secret_manager.async_rotate_secret( + current_secret_name=current_secret_name, + new_secret_name=new_secret_name, + new_secret_value=new_secret_value, + optional_params=team_settings, + ) + + # Verify response + assert response == mock_write_response + + # Verify URLs use team settings + get_calls = mock_get.call_args_list + assert "team-namespace" in get_calls[0][1]["url"] + assert "kv-team" in get_calls[0][1]["url"] + assert "teams/custom" in get_calls[0][1]["url"] + + delete_url = mock_delete.call_args[1]["url"] + assert "team-namespace" in delete_url + assert "kv-team" in delete_url + + +@pytest.mark.asyncio +async def test_hashicorp_secret_manager_rotate_secret_value_mismatch(hashicorp_secret_manager): + """Test rotating a secret when verification shows value mismatch.""" + with patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.get" + ) as mock_get, patch( + "litellm.llms.custom_httpx.http_handler.AsyncHTTPHandler.post" + ) as mock_post: + # Mock GET for current secret check + mock_get_response_current = MagicMock() + mock_get_response_current.json.return_value = mock_old_vault_response + mock_get_response_current.raise_for_status.return_value = None + + # Mock POST for creating new secret + mock_post_response = MagicMock() + mock_post_response.json.return_value = mock_write_response + mock_post_response.raise_for_status.return_value = None + + # Mock GET for verifying new secret - return different value + mock_get_response_new = MagicMock() + mock_get_response_new.json.return_value = { + "data": { + "data": {"key": "different-value"}, # Different from expected + } + } + mock_get_response_new.raise_for_status.return_value = None + + # Configure mock return values + mock_get.side_effect = [mock_get_response_current, mock_get_response_new] + mock_post.return_value = mock_post_response + + current_secret_name = f"old-secret-{uuid.uuid4()}" + new_secret_name = f"new-secret-{uuid.uuid4()}" + new_secret_value = "expected-value" + + response = await hashicorp_secret_manager.async_rotate_secret( + current_secret_name=current_secret_name, + new_secret_name=new_secret_name, + new_secret_value=new_secret_value, + ) + + # Verify error response + assert response["status"] == "error" + assert "mismatch" in response["message"].lower() + assert "expected-value" in response["message"] diff --git a/tests/test_litellm/proxy/hooks/test_key_management_event_hooks.py b/tests/test_litellm/proxy/hooks/test_key_management_event_hooks.py index 97c1733a93..f66a65f08f 100644 --- a/tests/test_litellm/proxy/hooks/test_key_management_event_hooks.py +++ b/tests/test_litellm/proxy/hooks/test_key_management_event_hooks.py @@ -139,3 +139,252 @@ class TestKeyManagementEventHooksIndependentOperations: # Email should have been called despite secret manager failure assert email_called["called"] is True + + +class TestRotateVirtualKeyInSecretManager: + """Tests for _rotate_virtual_key_in_secret_manager with team_id support.""" + + @pytest.mark.asyncio + async def test_rotate_virtual_key_with_team_id(self): + """Test that team_id is passed to async_rotate_secret.""" + from litellm.types.secret_managers.main import KeyManagementSystem, KeyManagementSettings + from litellm.secret_managers.base_secret_manager import BaseSecretManager + import litellm + + # Setup - Create a mock that inherits from BaseSecretManager + mock_secret_manager = MagicMock(spec=BaseSecretManager) + mock_secret_manager.async_rotate_secret = AsyncMock(return_value={"status": "success"}) + + litellm.secret_manager_client = mock_secret_manager + litellm._key_management_system = KeyManagementSystem.HASHICORP_VAULT + litellm._key_management_settings = KeyManagementSettings( + store_virtual_keys=True, + prefix_for_stored_virtual_keys="litellm/", + ) + + current_secret_name = "virtual-key-old" + new_secret_name = "virtual-key-new" + new_secret_value = "sk-new-key-value" + team_id = "team-123" + + # Mock _get_secret_manager_optional_params to return team settings + team_settings = { + "namespace": "team-namespace", + "mount": "kv-team", + "path_prefix": "teams/custom", + } + + # Patch isinstance in the key_management_event_hooks module to return True for BaseSecretManager check + import builtins + original_isinstance = builtins.isinstance + + def mock_isinstance(obj, cls): + if cls == BaseSecretManager and obj == mock_secret_manager: + return True + return original_isinstance(obj, cls) + + with patch.object( + KeyManagementEventHooks, + "_get_secret_manager_optional_params", + return_value=team_settings, + ) as mock_get_params, patch( + "litellm.proxy.hooks.key_management_event_hooks.isinstance", + side_effect=mock_isinstance + ): + await KeyManagementEventHooks._rotate_virtual_key_in_secret_manager( + current_secret_name=current_secret_name, + new_secret_name=new_secret_name, + new_secret_value=new_secret_value, + team_id=team_id, + ) + + # Verify _get_secret_manager_optional_params was called with team_id + mock_get_params.assert_called_once_with(team_id) + + # Verify async_rotate_secret was called with correct parameters + mock_secret_manager.async_rotate_secret.assert_called_once() + call_kwargs = mock_secret_manager.async_rotate_secret.call_args[1] + + # Verify secret names have prefix + assert call_kwargs["current_secret_name"] == "litellm/virtual-key-old" + assert call_kwargs["new_secret_name"] == "litellm/virtual-key-new" + assert call_kwargs["new_secret_value"] == new_secret_value + assert call_kwargs["optional_params"] == team_settings + + @pytest.mark.asyncio + async def test_rotate_virtual_key_without_team_id(self): + """Test that None team_id is handled correctly.""" + from litellm.types.secret_managers.main import KeyManagementSystem, KeyManagementSettings + from litellm.secret_managers.base_secret_manager import BaseSecretManager + import litellm + + # Setup - Create a mock that inherits from BaseSecretManager + mock_secret_manager = MagicMock(spec=BaseSecretManager) + mock_secret_manager.async_rotate_secret = AsyncMock(return_value={"status": "success"}) + + litellm.secret_manager_client = mock_secret_manager + litellm._key_management_system = KeyManagementSystem.HASHICORP_VAULT + litellm._key_management_settings = KeyManagementSettings( + store_virtual_keys=True, + prefix_for_stored_virtual_keys="litellm/", + ) + + current_secret_name = "virtual-key-old" + new_secret_name = "virtual-key-new" + new_secret_value = "sk-new-key-value" + + # Patch isinstance in the key_management_event_hooks module to return True for BaseSecretManager check + import builtins + original_isinstance = builtins.isinstance + + def mock_isinstance(obj, cls): + if cls == BaseSecretManager and obj == mock_secret_manager: + return True + return original_isinstance(obj, cls) + + # Mock _get_secret_manager_optional_params to return None (no team settings) + with patch.object( + KeyManagementEventHooks, + "_get_secret_manager_optional_params", + return_value=None, + ) as mock_get_params, patch( + "litellm.proxy.hooks.key_management_event_hooks.isinstance", + side_effect=mock_isinstance + ): + await KeyManagementEventHooks._rotate_virtual_key_in_secret_manager( + current_secret_name=current_secret_name, + new_secret_name=new_secret_name, + new_secret_value=new_secret_value, + team_id=None, + ) + + # Verify _get_secret_manager_optional_params was called with None + mock_get_params.assert_called_once_with(None) + + # Verify async_rotate_secret was called with None optional_params + mock_secret_manager.async_rotate_secret.assert_called_once() + call_kwargs = mock_secret_manager.async_rotate_secret.call_args[1] + assert call_kwargs["optional_params"] is None + + @pytest.mark.asyncio + async def test_rotate_virtual_key_in_key_rotated_hook(self): + """Test that async_key_rotated_hook passes team_id to _rotate_virtual_key_in_secret_manager.""" + from litellm.proxy._types import LiteLLM_VerificationToken, GenerateKeyResponse, RegenerateKeyRequest + from litellm.types.secret_managers.main import KeyManagementSystem, KeyManagementSettings + import litellm + + # Setup + mock_secret_manager = MagicMock() + mock_secret_manager.async_rotate_secret = AsyncMock(return_value={"status": "success"}) + + litellm.secret_manager_client = mock_secret_manager + litellm._key_management_system = KeyManagementSystem.HASHICORP_VAULT + litellm._key_management_settings = KeyManagementSettings( + store_virtual_keys=True, + prefix_for_stored_virtual_keys="litellm/", + ) + + # Create mock existing key row with team_id + existing_key_row = LiteLLM_VerificationToken( + token="sk-old-key", + key_alias="test-key-alias", + team_id="team-456", + ) + + # Create mock response + response = GenerateKeyResponse( + token_id="token-new-123", + key="sk-new-key", + key_alias="test-key-alias-new", + ) + + # Create mock request + data = RegenerateKeyRequest( + key="sk-old-key", + key_alias="test-key-alias-new", + ) + + mock_user_api_key_dict = MagicMock() + + # Mock _rotate_virtual_key_in_secret_manager to track calls + with patch.object( + KeyManagementEventHooks, + "_rotate_virtual_key_in_secret_manager", + new_callable=AsyncMock, + ) as mock_rotate, patch( + "litellm.store_audit_logs", False + ), patch.object( + KeyManagementEventHooks, + "_send_key_rotated_email", + new_callable=AsyncMock, + ): + await KeyManagementEventHooks.async_key_rotated_hook( + data=data, + existing_key_row=existing_key_row, + response=response, + user_api_key_dict=mock_user_api_key_dict, + ) + + # Verify _rotate_virtual_key_in_secret_manager was called + mock_rotate.assert_called_once() + call_kwargs = mock_rotate.call_args[1] + + # Verify team_id was passed + assert call_kwargs["team_id"] == "team-456" + assert call_kwargs["current_secret_name"] == "test-key-alias" + assert call_kwargs["new_secret_name"] == "test-key-alias-new" + assert call_kwargs["new_secret_value"] == "sk-new-key" + + @pytest.mark.asyncio + async def test_rotate_virtual_key_when_store_virtual_keys_disabled(self): + """Test that rotation is skipped when store_virtual_keys is False.""" + from litellm.types.secret_managers.main import KeyManagementSystem, KeyManagementSettings + import litellm + + # Setup + mock_secret_manager = MagicMock() + mock_secret_manager.async_rotate_secret = AsyncMock() + + litellm.secret_manager_client = mock_secret_manager + litellm._key_management_system = KeyManagementSystem.HASHICORP_VAULT + litellm._key_management_settings = KeyManagementSettings( + store_virtual_keys=False, # Disabled + prefix_for_stored_virtual_keys="litellm/", + ) + + await KeyManagementEventHooks._rotate_virtual_key_in_secret_manager( + current_secret_name="old-key", + new_secret_name="new-key", + new_secret_value="sk-new-value", + team_id="team-123", + ) + + # Verify async_rotate_secret was NOT called + mock_secret_manager.async_rotate_secret.assert_not_called() + + @pytest.mark.asyncio + async def test_rotate_virtual_key_when_secret_manager_not_set(self): + """Test that rotation is skipped when secret_manager_client is None.""" + from litellm.types.secret_managers.main import KeyManagementSettings + import litellm + + # Setup + litellm.secret_manager_client = None + litellm._key_management_settings = KeyManagementSettings( + store_virtual_keys=True, + prefix_for_stored_virtual_keys="litellm/", + ) + + mock_secret_manager = MagicMock() + mock_secret_manager.async_rotate_secret = AsyncMock() + + # Should not raise an error, just skip + await KeyManagementEventHooks._rotate_virtual_key_in_secret_manager( + current_secret_name="old-key", + new_secret_name="new-key", + new_secret_value="sk-new-value", + team_id="team-123", + ) + + # Verify async_rotate_secret was NOT called + mock_secret_manager.async_rotate_secret.assert_not_called()