Chapter 6: Task Handling Logic (Server-side)
Welcome back! In Chapter 5: A2A Client Implementation, we learned how to build the “customer” side – the A2A Client – that sends requests to an agent’s server. We saw how it formats messages and talks to the agent’s “embassy” (A2A Server Implementation).
But what happens inside the embassy once a request arrives? Who actually reads the request, does the work, and prepares the response?
This chapter focuses on the Task Handling Logic. It solves the problem: What is the core “brain” inside the A2A Server that performs the requested work?
The Agent’s “Brain” - The Chef in the Kitchen
Imagine our A2A Server (Chapter 4) is like a restaurant’s front desk. It takes orders (Tasks) from customers (A2A Clients) using the standard A2A language (A2A Protocol & Core Types).
But the front desk doesn’t cook the food! It passes the order to the kitchen, where the chef takes over. The chef:
- Reads the Order: Understands what the customer wants (e.g., “Translate ‘hello’ to French”).
- Prepares the Dish: Uses ingredients (data), tools (APIs, databases), and expertise (AI models like Gemini) to fulfill the request.
- Updates the Waiter: Might send updates back like “Order is being prepared” (
working
state). - Finishes the Dish: Creates the final product (the translated text “Bonjour le monde”).
- Plates the Dish: Packages the result (
Artifacts
) and signals completion (completed
state).
The Task Handling Logic is the “chef” inside your A2A Server. It’s the core piece of code that contains the agent’s specific skills and business logic.
What Does the Task Handler Do?
When the A2A Server receives a request like tasks/send
, it hands off the details to the Task Handling Logic. This logic is responsible for:
- Understanding the Request: Receiving the user’s
Message
and any other context associated with theTask
. - Executing the Work:
- Calling AI models (like Gemini via libraries like Genkit) for generation, analysis, etc.
- Using tools (like calling a weather API, searching a database, or using specific libraries like CrewAI or LangGraph).
- Performing custom calculations or data manipulation.
- Managing State: Signaling progress by updating the
Task
’s status (e.g., changing fromsubmitted
toworking
). - Generating Output: Creating the final results (
Artifacts
) or intermediate updates. - Handling Errors: Reporting back if something goes wrong (
failed
state).
Implementing the “Brain”
The Google A2A
libraries provide structures to help you implement this logic. Let’s look at simplified examples.
JavaScript Example (Async Generator Handler)
In JavaScript, the task handler is often an async function*
(an asynchronous generator). It receives TaskContext
and uses yield
to send back updates.
Imagine a simple agent that pretends to call an AI to greet the user.
// File: samples/js/src/server/handler.ts (Conceptual Example of a Handler)
import * as schema from "../schema.js"; // For types like Task, Message, etc.
import { TaskContext, TaskYieldUpdate } from "./handler.js"; // Handler types
// The Task Handling Logic for our 'Greeter Agent'
async function* greeterAgentHandler(
context: TaskContext
): AsyncGenerator<TaskYieldUpdate> { // It yields updates
// 1. Get the user's name from the input message
const userMessageText = context.userMessage.parts[0].text ?? "there";
const userName = userMessageText.split(" ").pop(); // Simple extraction
// 2. Signal that work is starting
console.log(`[GreeterAgent] Task ${context.task.id}: Starting`);
yield {
state: "working", // Update status to 'working'
message: { role: "agent", parts: [{ text: "Thinking..." }] }
};
// 3. Simulate calling an AI (the "chef" uses an "ingredient")
await new Promise(resolve => setTimeout(resolve, 500)); // Pretend work
const greeting = `Hello, ${userName}! Welcome.`;
// 4. Signal completion and provide the final message
console.log(`[GreeterAgent] Task ${context.task.id}: Completing`);
yield {
state: "completed", // Update status to 'completed'
message: { role: "agent", parts: [{ text: greeting }] }
};
// For more complex results, we could yield Artifacts here too.
}
// This handler function (`greeterAgentHandler`) would be passed
// to the A2AServer constructor, like in Chapter 4.
// const server = new A2AServer(greeterAgentHandler, { card: greeterAgentCard });
Explanation:
- Input: The function receives
context
which contains the currenttask
and theuserMessage
. We extract the user’s name. - Signal Working: It
yield
s an update object setting thestate
toworking
and providing an optional status message. The A2A Server receives this yield. - Do Work: It simulates calling an AI to generate a greeting. In real agents (like
samples/js/src/agents/coder/index.ts
orsamples/js/src/agents/movie-agent/index.ts
), this is where you’d interact with Genkit, external APIs, or other tools. - Signal Completion: It
yield
s the final update, setting thestate
tocompleted
and including the greeting in the agent’smessage
.
Python Example (TaskManager with Streaming)
In Python, you typically subclass TaskManager
and implement methods like on_send_task
or on_send_task_subscribe
. For streaming responses, on_send_task_subscribe
can also be an async generator.
Let’s create a similar Greeter Agent.
# File: my_agent/task_manager.py (Conceptual Example)
import asyncio
from typing import Union, AsyncIterable
from common.server.task_manager import InMemoryTaskManager # Base class
from common.types import (
Task, TaskSendParams, TaskStatus, TaskState, Message, TextPart,
SendTaskStreamingRequest, SendTaskStreamingResponse, TaskStatusUpdateEvent,
JSONRPCResponse
)
import logging
logger = logging.getLogger(__name__)
class GreeterTaskManager(InMemoryTaskManager): # Inherit from base
# Handle non-streaming requests (optional)
async def on_send_task(self, request):
# ... implementation for non-streaming ...
raise NotImplementedError()
# Handle STREAMING requests using an async generator
async def on_send_task_subscribe(
self, request: SendTaskStreamingRequest
) -> Union[AsyncIterable[SendTaskStreamingResponse], JSONRPCResponse]:
task_params: TaskSendParams = request.params
task_id = task_params.id
logger.info(f"[GreeterAgent] Task {task_id}: Received")
# 0. Set up internal queue for SSE events
# (Handled by library/base class, conceptually)
# 1. Update store & get initial Task object
await self.upsert_task(task_params) # Store the task initially
# --- Start the async generator part ---
async def _process_task() -> AsyncIterable[SendTaskStreamingResponse]:
try:
# 2. Get user name from input
user_message_text = task_params.message.parts[0].text if task_params.message.parts else "there"
user_name = user_message_text.split(" ").pop()
# 3. Signal working (Yield a status update event)
working_status = TaskStatus(state=TaskState.WORKING, message=Message(role="agent", parts=[TextPart(text="Thinking...")]))
working_event = TaskStatusUpdateEvent(id=task_id, status=working_status, final=False)
yield SendTaskStreamingResponse(id=request.id, result=working_event)
# Update internal store (optional, depending on base class)
await self.update_store(task_id, working_status, artifacts=None)
# 4. Simulate AI call
await asyncio.sleep(0.5)
greeting = f"Hello, {user_name}! Welcome from Python."
# 5. Signal completion (Yield final status update event)
completed_status = TaskStatus(state=TaskState.COMPLETED, message=Message(role="agent", parts=[TextPart(text=greeting)]))
completed_event = TaskStatusUpdateEvent(id=task_id, status=completed_status, final=True) # final=True
yield SendTaskStreamingResponse(id=request.id, result=completed_event)
# Update internal store
await self.update_store(task_id, completed_status, artifacts=None)
logger.info(f"[GreeterAgent] Task {task_id}: Completed")
except Exception as e:
logger.error(f"[GreeterAgent] Task {task_id}: Error - {e}")
# Signal failure
failed_status = TaskStatus(state=TaskState.FAILED, message=Message(role="agent", parts=[TextPart(text=f"Error: {e}")]))
failed_event = TaskStatusUpdateEvent(id=task_id, status=failed_status, final=True)
yield SendTaskStreamingResponse(id=request.id, result=failed_event)
await self.update_store(task_id, failed_status, artifacts=None)
# Return the async generator
return _process_task()
# This GreeterTaskManager class would be passed to the A2AServer
# server = A2AServer(task_manager=GreeterTaskManager(), ...)
Explanation:
- Inheritance: We create
GreeterTaskManager
inheriting fromInMemoryTaskManager
(which provides basic task storage). on_send_task_subscribe
: This method handles streaming requests. It first stores the initial task details.- Async Generator (
_process_task
): The core logic is inside an innerasync def
that returns anAsyncIterable
. This allows us toyield
updates over time, similar to the JavaScript generator. - Yielding Events: Instead of yielding raw status updates, we yield
SendTaskStreamingResponse
objects containingTaskStatusUpdateEvent
. Thefinal=True
flag marks the last event. (Chapter 7: Streaming Communication (SSE) covers SSE in detail). - Updating Store: We explicitly call
self.update_store
after yielding events to keep the task’s state consistent in ourInMemoryTaskManager
. - Error Handling: A
try...except
block handles potential errors and yields afailed
state event.
Real-world Python agents might use frameworks like CrewAI (samples/python/agents/crewai/agent.py
) or LangGraph (samples/python/agents/langgraph/agent.py
) within these handler methods to orchestrate more complex logic.
Key Inputs to the Handler
The handler needs information to do its job. The context typically includes:
- Task Details: The current
Task
object, including its uniqueid
, currentstatus
, and anymetadata
. - User Message: The specific
Message
from the user that triggered this work (containingParts
like text or files). - History (Optional): Previous
Messages
exchanged within thisTask
for conversational context. - Cancellation Check: A way to see if the client has requested to cancel the task.
These inputs are bundled in TaskContext
(JS) or passed as parameters to the TaskManager
methods (Python).
Signaling Progress and Delivering Results
- Status Updates: Yielding status changes (
working
,input-required
,completed
,failed
) keeps the client informed, especially for long-running tasks. This often includes aMessage
from the agent (e.g., “Looking up information…”, “Please provide the city name.”). - Artifacts (Results): For tasks that produce distinct outputs (like files, structured data, or images), the handler yields
Artifact
objects. These artifacts are collected and associated with theTask
.- JS: Yield
schema.Artifact
objects directly. (samples/js/src/agents/coder/index.ts
) - Python (Streaming): Yield
SendTaskStreamingResponse
containingTaskArtifactUpdateEvent
. (demo/ui/service/server/adk_host_manager.py
showsprocess_artifact_event
)
- JS: Yield
Connecting to the Server
As shown in Chapter 4, you connect your Task Handling Logic to the A2AServer
during its setup:
- JS: Pass the async generator function (
greeterAgentHandler
) to theA2AServer
constructor. - Python: Pass an instance of your
TaskManager
subclass (GreeterTaskManager()
) to theA2AServer
constructor.
The server then knows exactly which “chef” to call when an order comes in.
Under the Hood: Server Invoking the Handler
Let’s visualize how the server uses the handler when a streaming tasks/sendSubscribe
request arrives:
sequenceDiagram
participant C as A2A Client
participant S as A2A Server
participant TH as Task Handler (e.g., greeterAgentHandler)
participant AI as AI Model/Tool (Optional)
participant TS as Task Store
C->>S: POST / (JSON-RPC: method="tasks/sendSubscribe", params={...})
Note right of S: Receives request, parses JSON-RPC
S->>TS: Create/Get Task Record (ID: task-123)
TS-->>S: Task Object (state: submitted)
S->>TH: Invoke handler(context) / Call on_send_task_subscribe()
Note right of TH: Handler starts executing
TH->>TS: Update Task (state: working)
TH-->>S: yield {state: "working", ...} / yield TaskStatusUpdateEvent(working)
Note right of S: Receives yielded update
S-->>C: Send SSE Event (data: TaskStatusUpdateEvent - working)
Note left of C: Client receives 'working' status
alt Handler needs AI/Tool
TH->>AI: Request generation("greet user")
AI-->>TH: Response ("Hello there!")
end
TH->>TS: Update Task (state: completed, message: "Hello...")
TH-->>S: yield {state: "completed", ...} / yield TaskStatusUpdateEvent(completed, final=True)
Note right of S: Receives final yielded update
S-->>C: Send SSE Event (data: TaskStatusUpdateEvent - completed, final=True)
Note left of C: Client receives 'completed' status, stream ends
Steps:
- Request In: The
A2A Server
receives thetasks/sendSubscribe
request. - Task Prep: It looks up or creates the
Task
in theTask Store
. - Invoke Handler: It calls your registered Task Handling Logic (e.g.,
greeterAgentHandler
orGreeterTaskManager.on_send_task_subscribe
), providing the necessary context. - Handler Executes & Yields: Your handler runs. When it
yield
s a status update (likeworking
):- It might update the
Task Store
. - It returns the update to the
A2AServer
.
- It might update the
- Server Sends Update: The
A2AServer
formats the update as a Server-Sent Event (SSE) and sends it to theA2A Client
. - (Optional) External Calls: The handler might call external services (AI, tools).
- Handler Yields Final Result: When the handler is done, it
yield
s the finalcompleted
(orfailed
) status update (often marked asfinal=True
in streaming). - Server Sends Final Update: The
A2AServer
sends the final SSE event to the client, closing the stream.
Key files involved:
- JS Handler Definition:
samples/js/src/server/handler.ts
(definesTaskContext
,TaskYieldUpdate
,TaskHandler
) - JS Agent Example:
samples/js/src/agents/coder/index.ts
,samples/js/src/agents/movie-agent/index.ts
- Python Base Manager:
samples/python/common/server/task_manager.py
(definesTaskManager
,InMemoryTaskManager
) - Python Agent Examples:
samples/python/agents/crewai/agent.py
,samples/python/agents/langgraph/agent.py
,demo/ui/service/server/adk_host_manager.py
(more complex, uses ADK)
Conclusion
The Task Handling Logic is the heart of your A2A agent – the “chef” that actually does the work. It receives requests via the A2AServer
, interacts with AI models or tools, manages the task’s state transitions, and generates the final response or intermediate updates.
By implementing this logic (often as an async generator in JS or a TaskManager
subclass in Python) and connecting it to your server, you define your agent’s unique capabilities and how it fulfills the tasks requested by clients.
We saw how handlers can yield
updates. But how do these updates actually get sent back to the client in real-time? Let’s dive into the mechanism used for that: Streaming Communication using Server-Sent Events (SSE).
Next: Chapter 7: Streaming Communication (SSE)
Generated by AI Codebase Knowledge Builder