Chapter 3: AgentRuntime - The Office Manager
In Chapter 1: Agent, we met the workers (Agent
) of our system. In Chapter 2: Messaging System, we saw how they can communicate broadly using topics and subscriptions. But who hires these agents? Who actually delivers the messages, whether direct or published? And who keeps the whole system running smoothly?
This is where the AgentRuntime
comes in. It’s the central nervous system, the operating system, or perhaps the most fitting analogy: the office manager for all your agents.
Motivation: Why Do We Need an Office Manager?
Imagine an office full of employees (Agents). You have researchers, writers, maybe coders.
- How does a new employee get hired and set up?
- When one employee wants to send a memo directly to another, who makes sure it gets to the right desk?
- When someone posts an announcement on the company bulletin board (publishes to a topic), who ensures everyone who signed up for that type of announcement sees it?
- Who starts the workday and ensures everything keeps running?
Without an office manager, it would be chaos! The AgentRuntime
serves this crucial role in AutoGen Core. It handles:
- Agent Creation: “Onboarding” new agents when they are needed.
- Message Routing: Delivering direct messages (
send_message
) and published messages (publish_message
). - Lifecycle Management: Starting, running, and stopping the whole system.
- State Management: Keeping track of the overall system state (optional).
Key Concepts: Understanding the Manager’s Job
Let’s break down the main responsibilities of the AgentRuntime
:
- Agent Instantiation (Hiring):
- You don’t usually create agent objects directly (like
my_agent = ResearcherAgent()
). Why? Because the agent needs to know about the runtime (the office it works in) to send messages, publish announcements, etc. - Instead, you tell the
AgentRuntime
: “I need an agent of type ‘researcher’. Here’s a recipe (a factory function) for how to create one.” This is done usingruntime.register_factory(...)
. - When a message needs to go to a ‘researcher’ agent with a specific key (e.g., ‘researcher-01’), the runtime checks if it already exists. If not, it uses the registered factory function to create (instantiate) the agent.
- Crucially, while creating the agent, the runtime provides special context (
AgentInstantiationContext
) so the new agent automatically gets its uniqueAgentId
and a reference to theAgentRuntime
itself. This is like giving a new employee their ID badge and telling them who the office manager is.
# Simplified Concept - How a BaseAgent gets its ID and runtime access # From: _agent_instantiation.py and _base_agent.py # Inside the agent's __init__ method (when inheriting from BaseAgent): class MyAgent(BaseAgent): def __init__(self, description: str): # This magic happens *because* the AgentRuntime is creating the agent # inside a special context. self._runtime = AgentInstantiationContext.current_runtime() # Gets the manager self._id = AgentInstantiationContext.current_agent_id() # Gets its own ID self._description = description # ... rest of initialization ...
This ensures agents are properly integrated into the system from the moment they are created.
- You don’t usually create agent objects directly (like
- Message Delivery (Mail Room):
- Direct Send (
send_message
): When an agent callsawait agent_context.send_message(message, recipient_id)
, it’s actually telling theAgentRuntime
, “Please deliver thismessage
directly to the agent identified byrecipient_id
.” The runtime finds the recipient agent (creating it if necessary) and calls itson_message
method. It’s like putting a specific name on an envelope and handing it to the mail room. - Publish (
publish_message
): When an agent callsawait agent_context.publish_message(message, topic_id)
, it tells the runtime, “Post thismessage
to the announcement board namedtopic_id
.” The runtime then checks its list of subscriptions (who signed up for which boards). For every matching subscription, it figures out the correct recipient agent(s) (based on the subscription rule) and delivers the message to theiron_message
method.
- Direct Send (
- Lifecycle Management (Opening/Closing the Office):
- The runtime needs to be started to begin processing messages. Typically, you call
runtime.start()
. This usually kicks off a background process or loop that watches for incoming messages. - When work is done, you need to stop the runtime gracefully.
runtime.stop_when_idle()
is common – it waits until all messages currently in the queue have been processed, then stops.runtime.stop()
stops more abruptly.
- The runtime needs to be started to begin processing messages. Typically, you call
- State Management (Office Records):
- The runtime can save the state of all the agents it manages (
runtime.save_state()
) and load it back later (runtime.load_state()
). This is useful for pausing and resuming complex multi-agent interactions. It can also save/load state for individual agents (runtime.agent_save_state()
/runtime.agent_load_state()
). We’ll touch more on state in Chapter 7: Memory.
- The runtime can save the state of all the agents it manages (
Use Case Example: Running Our Researcher and Writer
Let’s finally run the Researcher/Writer scenario from Chapters 1 and 2. We need the AgentRuntime
to make it happen.
Goal:
- Create a runtime.
- Register factories for a ‘researcher’ and a ‘writer’ agent.
- Tell the runtime that ‘writer’ agents are interested in “research.facts.available” topics (add subscription).
- Start the runtime.
- Send an initial
ResearchTopic
message to a ‘researcher’ agent. - Let the system run (Researcher publishes facts, Runtime delivers to Writer via subscription, Writer processes).
- Stop the runtime when idle.
Code Snippets (Simplified):
# 0. Imports and Message Definitions (from previous chapters)
import asyncio
from dataclasses import dataclass
from autogen_core import (
AgentId, BaseAgent, SingleThreadedAgentRuntime, TopicId,
MessageContext, TypeSubscription, AgentInstantiationContext
)
@dataclass
class ResearchTopic: topic: str
@dataclass
class ResearchFacts: topic: str; facts: list[str]
These are the messages our agents will exchange.
# 1. Define Agent Logic (using BaseAgent)
class ResearcherAgent(BaseAgent):
async def on_message_impl(self, message: ResearchTopic, ctx: MessageContext):
print(f"Researcher ({self.id}) got topic: {message.topic}")
facts = [f"Fact 1 about {message.topic}", f"Fact 2"]
results_topic = TopicId("research.facts.available", message.topic)
# Use the runtime (via self.publish_message helper) to publish
await self.publish_message(
ResearchFacts(topic=message.topic, facts=facts), results_topic
)
print(f"Researcher ({self.id}) published facts to {results_topic}")
class WriterAgent(BaseAgent):
async def on_message_impl(self, message: ResearchFacts, ctx: MessageContext):
print(f"Writer ({self.id}) received facts via topic '{ctx.topic_id}': {message.facts}")
draft = f"Draft for {message.topic}: {'; '.join(message.facts)}"
print(f"Writer ({self.id}) created draft: '{draft}'")
# This agent doesn't send further messages in this example
Here we define the behavior of our two agent types, inheriting from BaseAgent
which gives us self.id
, self.publish_message
, etc.
# 2. Define Agent Factories
def researcher_factory():
# Gets runtime/id via AgentInstantiationContext inside BaseAgent.__init__
print("Runtime is creating a ResearcherAgent...")
return ResearcherAgent(description="I research topics.")
def writer_factory():
print("Runtime is creating a WriterAgent...")
return WriterAgent(description="I write drafts from facts.")
These simple functions tell the runtime how to create instances of our agents when needed.
# 3. Setup and Run the Runtime
async def main():
# Create the runtime (the office manager)
runtime = SingleThreadedAgentRuntime()
# Register the factories (tell the manager how to hire)
await runtime.register_factory("researcher", researcher_factory)
await runtime.register_factory("writer", writer_factory)
print("Registered agent factories.")
# Add the subscription (tell manager who listens to which announcements)
# Rule: Messages to topics of type "research.facts.available"
# should go to a "writer" agent whose key matches the topic source.
writer_sub = TypeSubscription(topic_type="research.facts.available", agent_type="writer")
await runtime.add_subscription(writer_sub)
print(f"Added subscription: {writer_sub.id}")
# Start the runtime (open the office)
runtime.start()
print("Runtime started.")
# Send the initial message to kick things off
research_task_topic = "AutoGen Agents"
researcher_instance_id = AgentId(type="researcher", key=research_task_topic)
print(f"Sending initial topic '{research_task_topic}' to {researcher_instance_id}")
await runtime.send_message(
message=ResearchTopic(topic=research_task_topic),
recipient=researcher_instance_id,
)
# Wait until all messages are processed (wait for work day to end)
print("Waiting for runtime to become idle...")
await runtime.stop_when_idle()
print("Runtime stopped.")
# Run the main function
asyncio.run(main())
This script sets up the SingleThreadedAgentRuntime
, registers the blueprints (factories) and communication rules (subscription), starts the process, and then shuts down cleanly.
Expected Output (Conceptual Order):
Registered agent factories.
Added subscription: type=research.facts.available=>agent=writer
Runtime started.
Sending initial topic 'AutoGen Agents' to researcher/AutoGen Agents
Waiting for runtime to become idle...
Runtime is creating a ResearcherAgent... # First time researcher/AutoGen Agents is needed
Researcher (researcher/AutoGen Agents) got topic: AutoGen Agents
Researcher (researcher/AutoGen Agents) published facts to research.facts.available/AutoGen Agents
Runtime is creating a WriterAgent... # First time writer/AutoGen Agents is needed (due to subscription)
Writer (writer/AutoGen Agents) received facts via topic 'research.facts.available/AutoGen Agents': ['Fact 1 about AutoGen Agents', 'Fact 2']
Writer (writer/AutoGen Agents) created draft: 'Draft for AutoGen Agents: Fact 1 about AutoGen Agents; Fact 2'
Runtime stopped.
You can see the runtime orchestrating the creation of agents and the flow of messages based on the initial request and the subscription rule.
Under the Hood: How the Manager Works
Let’s peek inside the SingleThreadedAgentRuntime
(a common implementation provided by AutoGen Core) to understand the flow.
Core Idea: It uses an internal queue (_message_queue
) to hold incoming requests (send_message
, publish_message
). A background task continuously takes items from the queue and processes them one by one (though the handling of a message might involve await
and allow other tasks to run).
1. Agent Creation (_get_agent
, _invoke_agent_factory
)
When the runtime needs an agent instance (e.g., to deliver a message) that hasn’t been created yet:
sequenceDiagram
participant Runtime as AgentRuntime
participant Factory as Agent Factory Func
participant AgentCtx as AgentInstantiationContext
participant Agent as New Agent Instance
Runtime->>Runtime: Check if agent instance exists (e.g., in `_instantiated_agents` dict)
alt Agent Not Found
Runtime->>Runtime: Find registered factory for agent type
Runtime->>AgentCtx: Set current runtime & agent_id
activate AgentCtx
Runtime->>Factory: Call factory function()
activate Factory
Factory->>AgentCtx: (Inside Agent.__init__) Get current runtime
AgentCtx-->>Factory: Return runtime
Factory->>AgentCtx: (Inside Agent.__init__) Get current agent_id
AgentCtx-->>Factory: Return agent_id
Factory-->>Runtime: Return new Agent instance
deactivate Factory
Runtime->>AgentCtx: Clear context
deactivate AgentCtx
Runtime->>Runtime: Store new agent instance
end
Runtime->>Runtime: Return agent instance
- The runtime looks up the factory function registered for the required
AgentId.type
. - It uses
AgentInstantiationContext.populate_context
to temporarily store its own reference and the targetAgentId
. - It calls the factory function.
- Inside the agent’s
__init__
(usually viaBaseAgent
),AgentInstantiationContext.current_runtime()
andAgentInstantiationContext.current_agent_id()
are called to retrieve the context set by the runtime. - The factory returns the fully initialized agent instance.
- The runtime stores this instance for future use.
# From: _agent_instantiation.py (Simplified)
class AgentInstantiationContext:
_CONTEXT_VAR = ContextVar("agent_context") # Stores (runtime, agent_id)
@classmethod
@contextmanager
def populate_context(cls, ctx: tuple[AgentRuntime, AgentId]):
token = cls._CONTEXT_VAR.set(ctx) # Store context for this block
try:
yield # Code inside the 'with' block runs here
finally:
cls._CONTEXT_VAR.reset(token) # Clean up context
@classmethod
def current_runtime(cls) -> AgentRuntime:
return cls._CONTEXT_VAR.get()[0] # Retrieve runtime from context
@classmethod
def current_agent_id(cls) -> AgentId:
return cls._CONTEXT_VAR.get()[1] # Retrieve agent_id from context
This context manager pattern ensures the correct runtime and ID are available only during the agent’s creation by the runtime.
2. Direct Messaging (send_message
-> _process_send
)
sequenceDiagram
participant Sender as Sending Agent/Code
participant Runtime as AgentRuntime
participant Queue as Internal Queue
participant Recipient as Recipient Agent
Sender->>+Runtime: send_message(msg, recipient_id, ...)
Runtime->>Runtime: Create Future (for response)
Runtime->>+Queue: Put SendMessageEnvelope(msg, recipient_id, future)
Runtime-->>-Sender: Return awaitable Future
Note over Queue, Runtime: Background task picks up envelope
Runtime->>Runtime: _process_send(envelope)
Runtime->>+Recipient: _get_agent(recipient_id) (creates if needed)
Recipient-->>-Runtime: Return Agent instance
Runtime->>+Recipient: on_message(msg, context)
Recipient->>Recipient: Process message...
Recipient-->>-Runtime: Return response value
Runtime->>Runtime: Set Future result with response value
send_message
creates aFuture
object (a placeholder for the eventual result) and wraps the message details in aSendMessageEnvelope
.- This envelope is put onto the internal
_message_queue
. - The background task picks up the envelope.
_process_send
gets the recipient agent instance (using_get_agent
).- It calls the recipient’s
on_message
method. - When
on_message
returns a result,_process_send
sets the result on theFuture
object, which makes the originalawait runtime.send_message(...)
call return the value.
3. Publish/Subscribe (publish_message
-> _process_publish
)
sequenceDiagram
participant Publisher as Publishing Agent/Code
participant Runtime as AgentRuntime
participant Queue as Internal Queue
participant SubManager as SubscriptionManager
participant Subscriber as Subscribed Agent
Publisher->>+Runtime: publish_message(msg, topic_id, ...)
Runtime->>+Queue: Put PublishMessageEnvelope(msg, topic_id)
Runtime-->>-Publisher: Return (None for publish)
Note over Queue, Runtime: Background task picks up envelope
Runtime->>Runtime: _process_publish(envelope)
Runtime->>+SubManager: get_subscribed_recipients(topic_id)
SubManager->>SubManager: Find matching subscriptions
SubManager->>SubManager: Map subscriptions to AgentIds
SubManager-->>-Runtime: Return list of recipient AgentIds
loop For each recipient AgentId
Runtime->>+Subscriber: _get_agent(recipient_id) (creates if needed)
Subscriber-->>-Runtime: Return Agent instance
Runtime->>+Subscriber: on_message(msg, context with topic_id)
Subscriber->>Subscriber: Process message...
Subscriber-->>-Runtime: Return (usually None for publish)
end
publish_message
wraps the message in aPublishMessageEnvelope
and puts it on the queue.- The background task picks it up.
_process_publish
asks theSubscriptionManager
(_subscription_manager
) for allAgentId
s that are subscribed to the giventopic_id
.- The
SubscriptionManager
checks its registeredSubscription
objects (_subscriptions
list, added viaadd_subscription
). For eachSubscription
whereis_match(topic_id)
is true, it callsmap_to_agent(topic_id)
to get the targetAgentId
. - For each resulting
AgentId
, the runtime gets the agent instance and calls itson_message
method, providing thetopic_id
in theMessageContext
.
# From: _runtime_impl_helpers.py (SubscriptionManager simplified)
class SubscriptionManager:
def __init__(self):
self._subscriptions: List[Subscription] = []
# Optimization cache can be added here
async def add_subscription(self, subscription: Subscription):
self._subscriptions.append(subscription)
# Clear cache if any
async def get_subscribed_recipients(self, topic: TopicId) -> List[AgentId]:
recipients = []
for sub in self._subscriptions:
if sub.is_match(topic):
recipients.append(sub.map_to_agent(topic))
return recipients
The SubscriptionManager
simply iterates through registered subscriptions to find matches when a message is published.
Next Steps
You now understand the AgentRuntime
- the essential coordinator that brings Agents to life, manages their communication, and runs the entire show. It handles agent creation via factories, routes direct and published messages, and manages the system’s lifecycle.
With the core concepts of Agent
, Messaging
, and AgentRuntime
covered, we can start looking at more specialized building blocks. Next, we’ll explore how agents can use external capabilities:
- Chapter 4: Tool: How to give agents tools (like functions or APIs) to perform specific actions beyond just processing messages.
Generated by AI Codebase Knowledge Builder