Chapter 6: Task Handling Logic (Server-side)
Welcome back! In Chapter 5: A2A Client Implementation, we learned how to build the “customer” side – the A2A Client – that sends requests to an agent’s server. We saw how it formats messages and talks to the agent’s “embassy” (A2A Server Implementation).
But what happens inside the embassy once a request arrives? Who actually reads the request, does the work, and prepares the response?
This chapter focuses on the Task Handling Logic. It solves the problem: What is the core “brain” inside the A2A Server that performs the requested work?
The Agent’s “Brain” - The Chef in the Kitchen
Imagine our A2A Server (Chapter 4) is like a restaurant’s front desk. It takes orders (Tasks) from customers (A2A Clients) using the standard A2A language (A2A Protocol & Core Types).
But the front desk doesn’t cook the food! It passes the order to the kitchen, where the chef takes over. The chef:
- Reads the Order: Understands what the customer wants (e.g., “Translate ‘hello’ to French”).
- Prepares the Dish: Uses ingredients (data), tools (APIs, databases), and expertise (AI models like Gemini) to fulfill the request.
- Updates the Waiter: Might send updates back like “Order is being prepared” (
workingstate). - Finishes the Dish: Creates the final product (the translated text “Bonjour le monde”).
- Plates the Dish: Packages the result (
Artifacts) and signals completion (completedstate).
The Task Handling Logic is the “chef” inside your A2A Server. It’s the core piece of code that contains the agent’s specific skills and business logic.
What Does the Task Handler Do?
When the A2A Server receives a request like tasks/send, it hands off the details to the Task Handling Logic. This logic is responsible for:
- Understanding the Request: Receiving the user’s
Messageand any other context associated with theTask. - Executing the Work:
- Calling AI models (like Gemini via libraries like Genkit) for generation, analysis, etc.
- Using tools (like calling a weather API, searching a database, or using specific libraries like CrewAI or LangGraph).
- Performing custom calculations or data manipulation.
- Managing State: Signaling progress by updating the
Task’s status (e.g., changing fromsubmittedtoworking). - Generating Output: Creating the final results (
Artifacts) or intermediate updates. - Handling Errors: Reporting back if something goes wrong (
failedstate).
Implementing the “Brain”
The Google A2A libraries provide structures to help you implement this logic. Let’s look at simplified examples.
JavaScript Example (Async Generator Handler)
In JavaScript, the task handler is often an async function* (an asynchronous generator). It receives TaskContext and uses yield to send back updates.
Imagine a simple agent that pretends to call an AI to greet the user.
// File: samples/js/src/server/handler.ts (Conceptual Example of a Handler)
import * as schema from "../schema.js"; // For types like Task, Message, etc.
import { TaskContext, TaskYieldUpdate } from "./handler.js"; // Handler types
// The Task Handling Logic for our 'Greeter Agent'
async function* greeterAgentHandler(
context: TaskContext
): AsyncGenerator<TaskYieldUpdate> { // It yields updates
// 1. Get the user's name from the input message
const userMessageText = context.userMessage.parts[0].text ?? "there";
const userName = userMessageText.split(" ").pop(); // Simple extraction
// 2. Signal that work is starting
console.log(`[GreeterAgent] Task ${context.task.id}: Starting`);
yield {
state: "working", // Update status to 'working'
message: { role: "agent", parts: [{ text: "Thinking..." }] }
};
// 3. Simulate calling an AI (the "chef" uses an "ingredient")
await new Promise(resolve => setTimeout(resolve, 500)); // Pretend work
const greeting = `Hello, ${userName}! Welcome.`;
// 4. Signal completion and provide the final message
console.log(`[GreeterAgent] Task ${context.task.id}: Completing`);
yield {
state: "completed", // Update status to 'completed'
message: { role: "agent", parts: [{ text: greeting }] }
};
// For more complex results, we could yield Artifacts here too.
}
// This handler function (`greeterAgentHandler`) would be passed
// to the A2AServer constructor, like in Chapter 4.
// const server = new A2AServer(greeterAgentHandler, { card: greeterAgentCard });
Explanation:
- Input: The function receives
contextwhich contains the currenttaskand theuserMessage. We extract the user’s name. - Signal Working: It
yields an update object setting thestatetoworkingand providing an optional status message. The A2A Server receives this yield. - Do Work: It simulates calling an AI to generate a greeting. In real agents (like
samples/js/src/agents/coder/index.tsorsamples/js/src/agents/movie-agent/index.ts), this is where you’d interact with Genkit, external APIs, or other tools. - Signal Completion: It
yields the final update, setting thestatetocompletedand including the greeting in the agent’smessage.
Python Example (TaskManager with Streaming)
In Python, you typically subclass TaskManager and implement methods like on_send_task or on_send_task_subscribe. For streaming responses, on_send_task_subscribe can also be an async generator.
Let’s create a similar Greeter Agent.
# File: my_agent/task_manager.py (Conceptual Example)
import asyncio
from typing import Union, AsyncIterable
from common.server.task_manager import InMemoryTaskManager # Base class
from common.types import (
Task, TaskSendParams, TaskStatus, TaskState, Message, TextPart,
SendTaskStreamingRequest, SendTaskStreamingResponse, TaskStatusUpdateEvent,
JSONRPCResponse
)
import logging
logger = logging.getLogger(__name__)
class GreeterTaskManager(InMemoryTaskManager): # Inherit from base
# Handle non-streaming requests (optional)
async def on_send_task(self, request):
# ... implementation for non-streaming ...
raise NotImplementedError()
# Handle STREAMING requests using an async generator
async def on_send_task_subscribe(
self, request: SendTaskStreamingRequest
) -> Union[AsyncIterable[SendTaskStreamingResponse], JSONRPCResponse]:
task_params: TaskSendParams = request.params
task_id = task_params.id
logger.info(f"[GreeterAgent] Task {task_id}: Received")
# 0. Set up internal queue for SSE events
# (Handled by library/base class, conceptually)
# 1. Update store & get initial Task object
await self.upsert_task(task_params) # Store the task initially
# --- Start the async generator part ---
async def _process_task() -> AsyncIterable[SendTaskStreamingResponse]:
try:
# 2. Get user name from input
user_message_text = task_params.message.parts[0].text if task_params.message.parts else "there"
user_name = user_message_text.split(" ").pop()
# 3. Signal working (Yield a status update event)
working_status = TaskStatus(state=TaskState.WORKING, message=Message(role="agent", parts=[TextPart(text="Thinking...")]))
working_event = TaskStatusUpdateEvent(id=task_id, status=working_status, final=False)
yield SendTaskStreamingResponse(id=request.id, result=working_event)
# Update internal store (optional, depending on base class)
await self.update_store(task_id, working_status, artifacts=None)
# 4. Simulate AI call
await asyncio.sleep(0.5)
greeting = f"Hello, {user_name}! Welcome from Python."
# 5. Signal completion (Yield final status update event)
completed_status = TaskStatus(state=TaskState.COMPLETED, message=Message(role="agent", parts=[TextPart(text=greeting)]))
completed_event = TaskStatusUpdateEvent(id=task_id, status=completed_status, final=True) # final=True
yield SendTaskStreamingResponse(id=request.id, result=completed_event)
# Update internal store
await self.update_store(task_id, completed_status, artifacts=None)
logger.info(f"[GreeterAgent] Task {task_id}: Completed")
except Exception as e:
logger.error(f"[GreeterAgent] Task {task_id}: Error - {e}")
# Signal failure
failed_status = TaskStatus(state=TaskState.FAILED, message=Message(role="agent", parts=[TextPart(text=f"Error: {e}")]))
failed_event = TaskStatusUpdateEvent(id=task_id, status=failed_status, final=True)
yield SendTaskStreamingResponse(id=request.id, result=failed_event)
await self.update_store(task_id, failed_status, artifacts=None)
# Return the async generator
return _process_task()
# This GreeterTaskManager class would be passed to the A2AServer
# server = A2AServer(task_manager=GreeterTaskManager(), ...)
Explanation:
- Inheritance: We create
GreeterTaskManagerinheriting fromInMemoryTaskManager(which provides basic task storage). on_send_task_subscribe: This method handles streaming requests. It first stores the initial task details.- Async Generator (
_process_task): The core logic is inside an innerasync defthat returns anAsyncIterable. This allows us toyieldupdates over time, similar to the JavaScript generator. - Yielding Events: Instead of yielding raw status updates, we yield
SendTaskStreamingResponseobjects containingTaskStatusUpdateEvent. Thefinal=Trueflag marks the last event. (Chapter 7: Streaming Communication (SSE) covers SSE in detail). - Updating Store: We explicitly call
self.update_storeafter yielding events to keep the task’s state consistent in ourInMemoryTaskManager. - Error Handling: A
try...exceptblock handles potential errors and yields afailedstate event.
Real-world Python agents might use frameworks like CrewAI (samples/python/agents/crewai/agent.py) or LangGraph (samples/python/agents/langgraph/agent.py) within these handler methods to orchestrate more complex logic.
Key Inputs to the Handler
The handler needs information to do its job. The context typically includes:
- Task Details: The current
Taskobject, including its uniqueid, currentstatus, and anymetadata. - User Message: The specific
Messagefrom the user that triggered this work (containingPartslike text or files). - History (Optional): Previous
Messagesexchanged within thisTaskfor conversational context. - Cancellation Check: A way to see if the client has requested to cancel the task.
These inputs are bundled in TaskContext (JS) or passed as parameters to the TaskManager methods (Python).
Signaling Progress and Delivering Results
- Status Updates: Yielding status changes (
working,input-required,completed,failed) keeps the client informed, especially for long-running tasks. This often includes aMessagefrom the agent (e.g., “Looking up information…”, “Please provide the city name.”). - Artifacts (Results): For tasks that produce distinct outputs (like files, structured data, or images), the handler yields
Artifactobjects. These artifacts are collected and associated with theTask.- JS: Yield
schema.Artifactobjects directly. (samples/js/src/agents/coder/index.ts) - Python (Streaming): Yield
SendTaskStreamingResponsecontainingTaskArtifactUpdateEvent. (demo/ui/service/server/adk_host_manager.pyshowsprocess_artifact_event)
- JS: Yield
Connecting to the Server
As shown in Chapter 4, you connect your Task Handling Logic to the A2AServer during its setup:
- JS: Pass the async generator function (
greeterAgentHandler) to theA2AServerconstructor. - Python: Pass an instance of your
TaskManagersubclass (GreeterTaskManager()) to theA2AServerconstructor.
The server then knows exactly which “chef” to call when an order comes in.
Under the Hood: Server Invoking the Handler
Let’s visualize how the server uses the handler when a streaming tasks/sendSubscribe request arrives:
sequenceDiagram
participant C as A2A Client
participant S as A2A Server
participant TH as Task Handler (e.g., greeterAgentHandler)
participant AI as AI Model/Tool (Optional)
participant TS as Task Store
C->>S: POST / (JSON-RPC: method="tasks/sendSubscribe", params={...})
Note right of S: Receives request, parses JSON-RPC
S->>TS: Create/Get Task Record (ID: task-123)
TS-->>S: Task Object (state: submitted)
S->>TH: Invoke handler(context) / Call on_send_task_subscribe()
Note right of TH: Handler starts executing
TH->>TS: Update Task (state: working)
TH-->>S: yield {state: "working", ...} / yield TaskStatusUpdateEvent(working)
Note right of S: Receives yielded update
S-->>C: Send SSE Event (data: TaskStatusUpdateEvent - working)
Note left of C: Client receives 'working' status
alt Handler needs AI/Tool
TH->>AI: Request generation("greet user")
AI-->>TH: Response ("Hello there!")
end
TH->>TS: Update Task (state: completed, message: "Hello...")
TH-->>S: yield {state: "completed", ...} / yield TaskStatusUpdateEvent(completed, final=True)
Note right of S: Receives final yielded update
S-->>C: Send SSE Event (data: TaskStatusUpdateEvent - completed, final=True)
Note left of C: Client receives 'completed' status, stream ends
Steps:
- Request In: The
A2A Serverreceives thetasks/sendSubscriberequest. - Task Prep: It looks up or creates the
Taskin theTask Store. - Invoke Handler: It calls your registered Task Handling Logic (e.g.,
greeterAgentHandlerorGreeterTaskManager.on_send_task_subscribe), providing the necessary context. - Handler Executes & Yields: Your handler runs. When it
yields a status update (likeworking):- It might update the
Task Store. - It returns the update to the
A2AServer.
- It might update the
- Server Sends Update: The
A2AServerformats the update as a Server-Sent Event (SSE) and sends it to theA2A Client. - (Optional) External Calls: The handler might call external services (AI, tools).
- Handler Yields Final Result: When the handler is done, it
yields the finalcompleted(orfailed) status update (often marked asfinal=Truein streaming). - Server Sends Final Update: The
A2AServersends the final SSE event to the client, closing the stream.
Key files involved:
- JS Handler Definition:
samples/js/src/server/handler.ts(definesTaskContext,TaskYieldUpdate,TaskHandler) - JS Agent Example:
samples/js/src/agents/coder/index.ts,samples/js/src/agents/movie-agent/index.ts - Python Base Manager:
samples/python/common/server/task_manager.py(definesTaskManager,InMemoryTaskManager) - Python Agent Examples:
samples/python/agents/crewai/agent.py,samples/python/agents/langgraph/agent.py,demo/ui/service/server/adk_host_manager.py(more complex, uses ADK)
Conclusion
The Task Handling Logic is the heart of your A2A agent – the “chef” that actually does the work. It receives requests via the A2AServer, interacts with AI models or tools, manages the task’s state transitions, and generates the final response or intermediate updates.
By implementing this logic (often as an async generator in JS or a TaskManager subclass in Python) and connecting it to your server, you define your agent’s unique capabilities and how it fulfills the tasks requested by clients.
We saw how handlers can yield updates. But how do these updates actually get sent back to the client in real-time? Let’s dive into the mechanism used for that: Streaming Communication using Server-Sent Events (SSE).
Next: Chapter 7: Streaming Communication (SSE)
Generated by AI Codebase Knowledge Builder