diff --git a/litellm/a2a_protocol/providers/litellm_completion/README.md b/litellm/a2a_protocol/providers/litellm_completion/README.md deleted file mode 100644 index a809e9bf55..0000000000 --- a/litellm/a2a_protocol/providers/litellm_completion/README.md +++ /dev/null @@ -1,74 +0,0 @@ -# A2A to LiteLLM Completion Bridge - -Routes A2A protocol requests through `litellm.acompletion`, enabling any LiteLLM-supported provider to be invoked via A2A. - -## Flow - -``` -A2A Request → Transform → litellm.acompletion → Transform → A2A Response -``` - -## SDK Usage - -Use the existing `asend_message` and `asend_message_streaming` functions with `litellm_params`: - -```python -from litellm.a2a_protocol import asend_message, asend_message_streaming -from a2a.types import SendMessageRequest, SendStreamingMessageRequest, MessageSendParams -from uuid import uuid4 - -# Non-streaming -request = SendMessageRequest( - id=str(uuid4()), - params=MessageSendParams( - message={"role": "user", "parts": [{"kind": "text", "text": "Hello!"}], "messageId": uuid4().hex} - ) -) -response = await asend_message( - request=request, - api_base="http://localhost:2024", - litellm_params={"custom_llm_provider": "langgraph", "model": "agent"}, -) - -# Streaming -stream_request = SendStreamingMessageRequest( - id=str(uuid4()), - params=MessageSendParams( - message={"role": "user", "parts": [{"kind": "text", "text": "Hello!"}], "messageId": uuid4().hex} - ) -) -async for chunk in asend_message_streaming( - request=stream_request, - api_base="http://localhost:2024", - litellm_params={"custom_llm_provider": "langgraph", "model": "agent"}, -): - print(chunk) -``` - -## Proxy Usage - -Configure an agent with `custom_llm_provider` in `litellm_params`: - -```yaml -agents: - - agent_name: my-langgraph-agent - agent_card_params: - name: "LangGraph Agent" - url: "http://localhost:2024" # Used as api_base - litellm_params: - custom_llm_provider: langgraph - model: agent -``` - -When an A2A request hits `/a2a/{agent_id}/message/send`, the bridge: - -1. Detects `custom_llm_provider` in agent's `litellm_params` -2. Transforms A2A message → OpenAI messages -3. Calls `litellm.acompletion(model="langgraph/agent", api_base="http://localhost:2024")` -4. Transforms response → A2A format - -## Classes - -- `A2ACompletionBridgeTransformation` - Static methods for message format conversion -- `A2ACompletionBridgeHandler` - Static methods for handling requests (streaming/non-streaming) - diff --git a/litellm/a2a_protocol/providers/litellm_completion/__init__.py b/litellm/a2a_protocol/providers/litellm_completion/__init__.py deleted file mode 100644 index fc2fc17f54..0000000000 --- a/litellm/a2a_protocol/providers/litellm_completion/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -""" -LiteLLM Completion bridge provider for A2A protocol. - -Routes A2A requests through litellm.acompletion based on custom_llm_provider. -""" diff --git a/litellm/a2a_protocol/providers/litellm_completion/handler.py b/litellm/a2a_protocol/providers/litellm_completion/handler.py deleted file mode 100644 index f7e806f86e..0000000000 --- a/litellm/a2a_protocol/providers/litellm_completion/handler.py +++ /dev/null @@ -1,317 +0,0 @@ -""" -Handler for A2A to LiteLLM completion bridge. - -Routes A2A requests through litellm.acompletion based on custom_llm_provider. - -A2A Streaming Events (in order): -1. Task event (kind: "task") - Initial task creation with status "submitted" -2. Status update (kind: "status-update") - Status change to "working" -3. Artifact update (kind: "artifact-update") - Content/artifact delivery -4. Status update (kind: "status-update") - Final status "completed" with final=true -""" - -from typing import Any, AsyncIterator, Dict, Optional - -import litellm -from litellm._logging import verbose_logger -from litellm.a2a_protocol.litellm_completion_bridge.pydantic_ai_transformation import ( - PydanticAITransformation, -) -from litellm.a2a_protocol.litellm_completion_bridge.transformation import ( - A2ACompletionBridgeTransformation, - A2AStreamingContext, -) - - -class A2ACompletionBridgeHandler: - """ - Static methods for handling A2A requests via LiteLLM completion. - """ - - @staticmethod - async def handle_non_streaming( - request_id: str, - params: Dict[str, Any], - litellm_params: Dict[str, Any], - api_base: Optional[str] = None, - ) -> Dict[str, Any]: - """ - Handle non-streaming A2A request via litellm.acompletion. - - Args: - request_id: A2A JSON-RPC request ID - params: A2A MessageSendParams containing the message - litellm_params: Agent's litellm_params (custom_llm_provider, model, etc.) - api_base: API base URL from agent_card_params - - Returns: - A2A SendMessageResponse dict - """ - # Check if this is a Pydantic AI agent request - custom_llm_provider = litellm_params.get("custom_llm_provider") - if custom_llm_provider == "pydantic_ai_agents": - if api_base is None: - raise ValueError("api_base is required for Pydantic AI agents") - - verbose_logger.info( - f"Pydantic AI: Routing to Pydantic AI agent at {api_base}" - ) - - # Send request directly to Pydantic AI agent - response_data = await PydanticAITransformation.send_non_streaming_request( - api_base=api_base, - request_id=request_id, - params=params, - ) - - return response_data - - # Extract message from params - message = params.get("message", {}) - - # Transform A2A message to OpenAI format - openai_messages = ( - A2ACompletionBridgeTransformation.a2a_message_to_openai_messages(message) - ) - - # Get completion params - custom_llm_provider = litellm_params.get("custom_llm_provider") - model = litellm_params.get("model", "agent") - - # Build full model string if provider specified - # Skip prepending if model already starts with the provider prefix - if custom_llm_provider and not model.startswith(f"{custom_llm_provider}/"): - full_model = f"{custom_llm_provider}/{model}" - else: - full_model = model - - verbose_logger.info( - f"A2A completion bridge: model={full_model}, api_base={api_base}" - ) - - # Build completion params dict - completion_params = { - "model": full_model, - "messages": openai_messages, - "api_base": api_base, - "stream": False, - } - # Add litellm_params (contains api_key, client_id, client_secret, tenant_id, etc.) - litellm_params_to_add = { - k: v - for k, v in litellm_params.items() - if k not in ("model", "custom_llm_provider") - } - completion_params.update(litellm_params_to_add) - # Apply forward metadata AFTER the litellm_params merge so an - # agent-configured ``extra_body`` does not overwrite the forwarded - # A2A metadata; the helper merges into any existing ``extra_body``. - A2ACompletionBridgeTransformation.apply_forward_metadata_to_completion_params( - completion_params=completion_params, - a2a_message=message, - params=params, - ) - - # Call litellm.acompletion - response = await litellm.acompletion(**completion_params) - - # Transform response to A2A format - a2a_response = ( - A2ACompletionBridgeTransformation.openai_response_to_a2a_response( - response=response, - request_id=request_id, - ) - ) - - verbose_logger.info(f"A2A completion bridge completed: request_id={request_id}") - - return a2a_response - - @staticmethod - async def handle_streaming( - request_id: str, - params: Dict[str, Any], - litellm_params: Dict[str, Any], - api_base: Optional[str] = None, - ) -> AsyncIterator[Dict[str, Any]]: - """ - Handle streaming A2A request via litellm.acompletion with stream=True. - - Emits proper A2A streaming events: - 1. Task event (kind: "task") - Initial task with status "submitted" - 2. Status update (kind: "status-update") - Status "working" - 3. Artifact update (kind: "artifact-update") - Content delivery - 4. Status update (kind: "status-update") - Final "completed" status - - Args: - request_id: A2A JSON-RPC request ID - params: A2A MessageSendParams containing the message - litellm_params: Agent's litellm_params (custom_llm_provider, model, etc.) - api_base: API base URL from agent_card_params - - Yields: - A2A streaming response events - """ - # Check if this is a Pydantic AI agent request - custom_llm_provider = litellm_params.get("custom_llm_provider") - if custom_llm_provider == "pydantic_ai_agents": - if api_base is None: - raise ValueError("api_base is required for Pydantic AI agents") - - verbose_logger.info( - f"Pydantic AI: Faking streaming for Pydantic AI agent at {api_base}" - ) - - # Get non-streaming response first - response_data = await PydanticAITransformation.send_non_streaming_request( - api_base=api_base, - request_id=request_id, - params=params, - ) - - # Convert to fake streaming - async for chunk in PydanticAITransformation.fake_streaming_from_response( - response_data=response_data, - request_id=request_id, - ): - yield chunk - - return - - # Extract message from params - message = params.get("message", {}) - - # Create streaming context - ctx = A2AStreamingContext( - request_id=request_id, - input_message=message, - ) - - # Transform A2A message to OpenAI format - openai_messages = ( - A2ACompletionBridgeTransformation.a2a_message_to_openai_messages(message) - ) - - # Get completion params - custom_llm_provider = litellm_params.get("custom_llm_provider") - model = litellm_params.get("model", "agent") - - # Build full model string if provider specified - # Skip prepending if model already starts with the provider prefix - if custom_llm_provider and not model.startswith(f"{custom_llm_provider}/"): - full_model = f"{custom_llm_provider}/{model}" - else: - full_model = model - - verbose_logger.info( - f"A2A completion bridge streaming: model={full_model}, api_base={api_base}" - ) - - # Build completion params dict - completion_params = { - "model": full_model, - "messages": openai_messages, - "api_base": api_base, - "stream": True, - } - # Add litellm_params (contains api_key, client_id, client_secret, tenant_id, etc.) - litellm_params_to_add = { - k: v - for k, v in litellm_params.items() - if k not in ("model", "custom_llm_provider") - } - completion_params.update(litellm_params_to_add) - # Apply forward metadata AFTER the litellm_params merge so an - # agent-configured ``extra_body`` does not overwrite the forwarded - # A2A metadata; the helper merges into any existing ``extra_body``. - A2ACompletionBridgeTransformation.apply_forward_metadata_to_completion_params( - completion_params=completion_params, - a2a_message=message, - params=params, - ) - - # 1. Emit initial task event (kind: "task", status: "submitted") - task_event = A2ACompletionBridgeTransformation.create_task_event(ctx) - yield task_event - - # 2. Emit status update (kind: "status-update", status: "working") - working_event = A2ACompletionBridgeTransformation.create_status_update_event( - ctx=ctx, - state="working", - final=False, - message_text="Processing request...", - ) - yield working_event - - # Call litellm.acompletion with streaming - response = await litellm.acompletion(**completion_params) - - # 3. Accumulate content and emit artifact update - accumulated_text = "" - chunk_count = 0 - async for chunk in response: # type: ignore[union-attr] - chunk_count += 1 - - # Extract delta content - content = "" - if chunk is not None and hasattr(chunk, "choices") and chunk.choices: - choice = chunk.choices[0] - if hasattr(choice, "delta") and choice.delta: - content = choice.delta.content or "" - - if content: - accumulated_text += content - - # Emit artifact update with accumulated content - if accumulated_text: - artifact_event = ( - A2ACompletionBridgeTransformation.create_artifact_update_event( - ctx=ctx, - text=accumulated_text, - ) - ) - yield artifact_event - - # 4. Emit final status update (kind: "status-update", status: "completed", final: true) - completed_event = A2ACompletionBridgeTransformation.create_status_update_event( - ctx=ctx, - state="completed", - final=True, - ) - yield completed_event - - verbose_logger.info( - f"A2A completion bridge streaming completed: request_id={request_id}, chunks={chunk_count}" - ) - - -# Convenience functions that delegate to the class methods -async def handle_a2a_completion( - request_id: str, - params: Dict[str, Any], - litellm_params: Dict[str, Any], - api_base: Optional[str] = None, -) -> Dict[str, Any]: - """Convenience function for non-streaming A2A completion.""" - return await A2ACompletionBridgeHandler.handle_non_streaming( - request_id=request_id, - params=params, - litellm_params=litellm_params, - api_base=api_base, - ) - - -async def handle_a2a_completion_streaming( - request_id: str, - params: Dict[str, Any], - litellm_params: Dict[str, Any], - api_base: Optional[str] = None, -) -> AsyncIterator[Dict[str, Any]]: - """Convenience function for streaming A2A completion.""" - async for chunk in A2ACompletionBridgeHandler.handle_streaming( - request_id=request_id, - params=params, - litellm_params=litellm_params, - api_base=api_base, - ): - yield chunk diff --git a/litellm/a2a_protocol/providers/litellm_completion/transformation.py b/litellm/a2a_protocol/providers/litellm_completion/transformation.py deleted file mode 100644 index c60bec6976..0000000000 --- a/litellm/a2a_protocol/providers/litellm_completion/transformation.py +++ /dev/null @@ -1,286 +0,0 @@ -""" -Transformation utilities for A2A <-> OpenAI message format conversion. - -A2A Message Format: -{ - "role": "user", - "parts": [{"kind": "text", "text": "Hello!"}], - "messageId": "abc123" -} - -OpenAI Message Format: -{"role": "user", "content": "Hello!"} - -A2A Streaming Events: -- Task event (kind: "task") - Initial task creation with status "submitted" -- Status update (kind: "status-update") - Status changes (working, completed) -- Artifact update (kind: "artifact-update") - Content/artifact delivery -""" - -from datetime import datetime, timezone -from typing import Any, Dict, List, Optional -from uuid import uuid4 - -from litellm._logging import verbose_logger - - -class A2AStreamingContext: - """ - Context holder for A2A streaming state. - Tracks task_id, context_id, and message accumulation. - """ - - def __init__(self, request_id: str, input_message: Dict[str, Any]): - self.request_id = request_id - self.task_id = str(uuid4()) - self.context_id = str(uuid4()) - self.input_message = input_message - self.accumulated_text = "" - self.has_emitted_task = False - self.has_emitted_working = False - - -class A2ACompletionBridgeTransformation: - """ - Static methods for transforming between A2A and OpenAI message formats. - """ - - @staticmethod - def a2a_message_to_openai_messages( - a2a_message: Dict[str, Any], - ) -> List[Dict[str, str]]: - """ - Transform an A2A message to OpenAI message format. - - Args: - a2a_message: A2A message with role, parts, and messageId - - Returns: - List of OpenAI-format messages - """ - role = a2a_message.get("role", "user") - parts = a2a_message.get("parts", []) - - # Map A2A roles to OpenAI roles - openai_role = role - if role == "user": - openai_role = "user" - elif role == "assistant": - openai_role = "assistant" - elif role == "system": - openai_role = "system" - - # Extract text content from parts - content_parts = [] - for part in parts: - kind = part.get("kind", "") - if kind == "text": - text = part.get("text", "") - content_parts.append(text) - - content = "\n".join(content_parts) if content_parts else "" - - verbose_logger.debug( - f"A2A -> OpenAI transform: role={role} -> {openai_role}, content_length={len(content)}" - ) - - return [{"role": openai_role, "content": content}] - - @staticmethod - def openai_response_to_a2a_response( - response: Any, - request_id: Optional[str] = None, - ) -> Dict[str, Any]: - """ - Transform a LiteLLM ModelResponse to A2A SendMessageResponse format. - - Args: - response: LiteLLM ModelResponse object - request_id: Original A2A request ID - - Returns: - A2A SendMessageResponse dict - """ - # Extract content from response - content = "" - if hasattr(response, "choices") and response.choices: - choice = response.choices[0] - if hasattr(choice, "message") and choice.message: - content = choice.message.content or "" - - # Build A2A message - a2a_message = { - "kind": "message", - "role": "agent", - "parts": [{"kind": "text", "text": content}], - "messageId": uuid4().hex, - } - - # Build A2A response - a2a_response = { - "jsonrpc": "2.0", - "id": request_id, - "result": a2a_message, - } - - verbose_logger.debug(f"OpenAI -> A2A transform: content_length={len(content)}") - - return a2a_response - - @staticmethod - def _get_timestamp() -> str: - """Get current timestamp in ISO format with timezone.""" - return datetime.now(timezone.utc).isoformat() - - @staticmethod - def create_task_event( - ctx: A2AStreamingContext, - ) -> Dict[str, Any]: - """ - Create the initial task event with status 'submitted'. - - This is the first event emitted in an A2A streaming response. - """ - return { - "id": ctx.request_id, - "jsonrpc": "2.0", - "result": { - "contextId": ctx.context_id, - "history": [ - { - "contextId": ctx.context_id, - "kind": "message", - "messageId": ctx.input_message.get("messageId", uuid4().hex), - "parts": ctx.input_message.get("parts", []), - "role": ctx.input_message.get("role", "user"), - "taskId": ctx.task_id, - } - ], - "id": ctx.task_id, - "kind": "task", - "status": { - "state": "submitted", - }, - }, - } - - @staticmethod - def create_status_update_event( - ctx: A2AStreamingContext, - state: str, - final: bool = False, - message_text: Optional[str] = None, - ) -> Dict[str, Any]: - """ - Create a status update event. - - Args: - ctx: Streaming context - state: Status state ('working', 'completed') - final: Whether this is the final event - message_text: Optional message text for 'working' status - """ - status: Dict[str, Any] = { - "state": state, - "timestamp": A2ACompletionBridgeTransformation._get_timestamp(), - } - - # Add message for 'working' status - if state == "working" and message_text: - status["message"] = { - "contextId": ctx.context_id, - "kind": "message", - "messageId": str(uuid4()), - "parts": [{"kind": "text", "text": message_text}], - "role": "agent", - "taskId": ctx.task_id, - } - - return { - "id": ctx.request_id, - "jsonrpc": "2.0", - "result": { - "contextId": ctx.context_id, - "final": final, - "kind": "status-update", - "status": status, - "taskId": ctx.task_id, - }, - } - - @staticmethod - def create_artifact_update_event( - ctx: A2AStreamingContext, - text: str, - ) -> Dict[str, Any]: - """ - Create an artifact update event with content. - - Args: - ctx: Streaming context - text: The text content for the artifact - """ - return { - "id": ctx.request_id, - "jsonrpc": "2.0", - "result": { - "artifact": { - "artifactId": str(uuid4()), - "name": "response", - "parts": [{"kind": "text", "text": text}], - }, - "contextId": ctx.context_id, - "kind": "artifact-update", - "taskId": ctx.task_id, - }, - } - - @staticmethod - def openai_chunk_to_a2a_chunk( - chunk: Any, - request_id: Optional[str] = None, - is_final: bool = False, - ) -> Optional[Dict[str, Any]]: - """ - Transform a LiteLLM streaming chunk to A2A streaming format. - - NOTE: This method is deprecated for streaming. Use the event-based - methods (create_task_event, create_status_update_event, - create_artifact_update_event) instead for proper A2A streaming. - - Args: - chunk: LiteLLM ModelResponse chunk - request_id: Original A2A request ID - is_final: Whether this is the final chunk - - Returns: - A2A streaming chunk dict or None if no content - """ - # Extract delta content - content = "" - if chunk is not None and hasattr(chunk, "choices") and chunk.choices: - choice = chunk.choices[0] - if hasattr(choice, "delta") and choice.delta: - content = choice.delta.content or "" - - if not content and not is_final: - return None - - # Build A2A streaming chunk (legacy format). ``final`` is an - # envelope-level streaming property per the A2A spec and must live - # alongside ``message`` in ``result``, not inside the message object. - a2a_chunk = { - "jsonrpc": "2.0", - "id": request_id, - "result": { - "message": { - "kind": "message", - "role": "agent", - "parts": [{"kind": "text", "text": content}], - "messageId": uuid4().hex, - }, - "final": is_final, - }, - } - - return a2a_chunk diff --git a/litellm/proxy/a2a/endpoints.py b/litellm/proxy/a2a/endpoints.py index 9723dc6ee6..520fdc9d8c 100644 --- a/litellm/proxy/a2a/endpoints.py +++ b/litellm/proxy/a2a/endpoints.py @@ -56,10 +56,6 @@ class DiscoverAgentRequest(BaseModel): "``{'assistant_id': }``. ``well_known_fallback`` ignores this." ), ) - headers: Optional[Dict[str, str]] = Field( - default=None, - description="Optional headers to send with the discovery request (e.g. auth).", - ) class DiscoverAgentResponse(BaseModel): @@ -106,7 +102,6 @@ async def discover_agent_card( request.url, discovery_mode=request.discovery_mode, params=request.params, - headers=request.headers, ) except AgentCardDiscoveryError as exc: raise HTTPException(status_code=400, detail=str(exc))