Chapter 2: Messaging System (Topic & Subscription)
In Chapter 1: Agent, we learned about Agents as individual workers. But how do they coordinate when one agent doesn’t know exactly who needs the information it produces? Imagine our Researcher finds some facts. Maybe the Writer needs them, but maybe a Fact-Checker agent or a Summary agent also needs them later. How can the Researcher just announce “Here are the facts!” without needing a specific mailing list?
This is where the Messaging System, specifically Topics and Subscriptions, comes in. It allows agents to broadcast messages to anyone interested, like posting on a company announcement board.
Motivation: Broadcasting Information
Let’s refine our blog post example:
- The
Researcher
agent finds facts about “AutoGen Agents”. - Instead of sending directly to the
Writer
, theResearcher
publishes these facts to a general “research-results” Topic. - The
Writer
agent has previously told the system it’s subscribed to the “research-results” Topic. - The system sees the new message on the Topic and delivers it to the
Writer
(and any other subscribers).
This way, the Researcher
doesn’t need to know who the Writer
is, or even if a Writer
exists! It just broadcasts the results. If we later add a FactChecker
agent that also needs the results, it simply subscribes to the same Topic.
Key Concepts: Topics and Subscriptions
Let’s break down the components of this broadcasting system:
- Topic (
TopicId
): The Announcement Board- A
TopicId
represents a specific channel or category for messages. Think of it like the name of an announcement board (e.g., “Project Updates”, “General Announcements”). - It has two main parts:
type
: What kind of event or information is this? (e.g., “research.completed”, “user.request”). This helps categorize messages.source
: Where or why did this event originate? Often, this relates to the specific task or context (e.g., the specific blog post being researched like “autogen-agents-blog-post”, or the team generating the event like “research-team”).
# From: _topic.py (Simplified) from dataclasses import dataclass @dataclass(frozen=True) # Immutable: can't change after creation class TopicId: type: str source: str def __str__(self) -> str: # Creates an id like "research.completed/autogen-agents-blog-post" return f"{self.type}/{self.source}"
This structure allows for flexible filtering. Agents might subscribe to all topics of a certain
type
, regardless of thesource
, or only to topics with a specificsource
. - A
- Publishing: Posting the Announcement
- When an agent has information to share broadly, it publishes a message to a specific
TopicId
. - This is like pinning a note to the designated announcement board. The agent doesn’t need to know who will read it.
- When an agent has information to share broadly, it publishes a message to a specific
- Subscription (
Subscription
): Signing Up for Updates- A
Subscription
is how an agent declares its interest in certainTopicId
s. - It acts like a rule: “If a message is published to a Topic that matches this pattern, please deliver it to this kind of agent”.
- The
Subscription
links aTopicId
pattern (e.g., “all topics with typeresearch.completed
”) to anAgentId
(or a way to determine theAgentId
).
- A
- Routing: Delivering the Mail
- The
AgentRuntime
(the system manager we’ll meet in Chapter 3: AgentRuntime) keeps track of all activeSubscription
s. - When a message is published to a
TopicId
, theAgentRuntime
checks whichSubscription
s match thatTopicId
. - For each match, it uses the
Subscription
’s rule to figure out which specificAgentId
should receive the message and delivers it.
- The
Use Case Example: Researcher Publishes, Writer Subscribes
Let’s see how our Researcher and Writer can use this system.
Goal: Researcher publishes facts to a topic, Writer receives them via subscription.
1. Define the Topic: We need a TopicId
for research results. Let’s say the type
is “research.facts.available” and the source
identifies the specific research task (e.g., “blog-post-autogen”).
# From: _topic.py
from autogen_core import TopicId
# Define the topic for this specific research task
research_topic_id = TopicId(type="research.facts.available", source="blog-post-autogen")
print(f"Topic ID: {research_topic_id}")
# Output: Topic ID: research.facts.available/blog-post-autogen
This defines the “announcement board” we’ll use.
2. Researcher Publishes: The Researcher
agent, after finding facts, will use its agent_context
(provided by the runtime) to publish the ResearchFacts
message to this topic.
# Simplified concept - Researcher agent logic
# Assume 'agent_context' and 'message' (ResearchTopic) are provided
# Define the facts message (from Chapter 1)
@dataclass
class ResearchFacts:
topic: str
facts: list[str]
async def researcher_publish_logic(agent_context, message: ResearchTopic, msg_context):
print(f"Researcher working on: {message.topic}")
facts_data = ResearchFacts(
topic=message.topic,
facts=[f"Fact A about {message.topic}", f"Fact B about {message.topic}"]
)
# Define the specific topic for this task's results
results_topic = TopicId(type="research.facts.available", source=message.topic) # Use message topic as source
# Publish the facts to the topic
await agent_context.publish_message(message=facts_data, topic_id=results_topic)
print(f"Researcher published facts to topic: {results_topic}")
# No direct reply needed
return None
Notice the agent_context.publish_message
call. The Researcher doesn’t specify a recipient, only the topic.
3. Writer Subscribes: The Writer
agent needs to tell the system it’s interested in messages on topics like “research.facts.available”. We can use a predefined Subscription
type called TypeSubscription
. This subscription typically means: “I am interested in all topics with this exact type. When a message arrives, create/use an agent of my type whose key
matches the topic’s source
.”
# From: _type_subscription.py (Simplified Concept)
from autogen_core import TypeSubscription, BaseAgent
class WriterAgent(BaseAgent):
# ... agent implementation ...
async def on_message_impl(self, message: ResearchFacts, ctx):
# This method gets called when a subscribed message arrives
print(f"Writer ({self.id}) received facts via subscription: {message.facts}")
# ... process facts and write draft ...
# How the Writer subscribes (usually done during runtime setup - Chapter 3)
# This tells the runtime: "Messages on topics with type 'research.facts.available'
# should go to a 'writer' agent whose key matches the topic source."
writer_subscription = TypeSubscription(
topic_type="research.facts.available",
agent_type="writer" # The type of agent that should handle this
)
print(f"Writer subscription created for topic type: {writer_subscription.topic_type}")
# Output: Writer subscription created for topic type: research.facts.available
When the Researcher
publishes to TopicId(type="research.facts.available", source="blog-post-autogen")
, the AgentRuntime
will see that writer_subscription
matches the topic_type
. It will then use the rule: “Find (or create) an agent with AgentId(type='writer', key='blog-post-autogen')
and deliver the message.”
Benefit: Decoupling! The Researcher just broadcasts. The Writer just listens for relevant broadcasts. We can add more listeners (like a FactChecker
subscribing to the same topic_type
) without changing the Researcher
at all.
Under the Hood: How Publishing Works
Let’s trace the journey of a published message.
Conceptual Flow:
sequenceDiagram
participant Publisher as Publisher Agent
participant Runtime as AgentRuntime
participant SubRegistry as Subscription Registry
participant Subscriber as Subscriber Agent
Publisher->>+Runtime: publish_message(message, topic_id)
Runtime->>+SubRegistry: Find subscriptions matching topic_id
SubRegistry-->>-Runtime: Return list of matching Subscriptions
loop For each matching Subscription
Runtime->>Subscription: map_to_agent(topic_id)
Subscription-->>Runtime: Return target AgentId
Runtime->>+Subscriber: Locate/Create Agent instance by AgentId
Runtime->>Subscriber: on_message(message, context)
Subscriber-->>-Runtime: Process message (optional return)
end
Runtime-->>-Publisher: Return (usually None for publish)
- Publish: An agent calls
agent_context.publish_message(message, topic_id)
. This internally calls theAgentRuntime
’s publish method. - Lookup: The
AgentRuntime
takes thetopic_id
and consults its internalSubscription Registry
. - Match: The Registry checks all registered
Subscription
objects. EachSubscription
has anis_match(topic_id)
method. The registry finds all subscriptions whereis_match
returnsTrue
. - Map: For each matching
Subscription
, the Runtime calls itsmap_to_agent(topic_id)
method. This method returns the specificAgentId
that should handle this message based on the subscription rule and the topic details. - Deliver: The
AgentRuntime
finds the agent instance corresponding to the returnedAgentId
(potentially creating it if it doesn’t exist yet, especially withTypeSubscription
). It then calls that agent’son_message
method, delivering the original publishedmessage
.
Code Glimpse:
-
TopicId
(_topic.py
): As shown before, a simple dataclass holdingtype
andsource
. It includes validation to ensure thetype
follows certain naming conventions.# From: _topic.py @dataclass(eq=True, frozen=True) class TopicId: type: str source: str # ... validation and __str__ ... @classmethod def from_str(cls, topic_id: str) -> Self: # Helper to parse "type/source" string # ... implementation ...
-
Subscription
Protocol (_subscription.py
): This defines the contract for any subscription rule.# From: _subscription.py (Simplified Protocol) from typing import Protocol # ... other imports class Subscription(Protocol): @property def id(self) -> str: ... # Unique ID for this subscription instance def is_match(self, topic_id: TopicId) -> bool: """Check if a topic matches this subscription's rule.""" ... def map_to_agent(self, topic_id: TopicId) -> AgentId: """Determine the target AgentId if is_match was True.""" ...
Any class implementing these methods can act as a subscription rule.
-
TypeSubscription
(_type_subscription.py
): A common implementation of theSubscription
protocol.# From: _type_subscription.py (Simplified) class TypeSubscription(Subscription): def __init__(self, topic_type: str, agent_type: str, ...): self._topic_type = topic_type self._agent_type = agent_type # ... generates a unique self._id ... def is_match(self, topic_id: TopicId) -> bool: # Matches if the topic's type is exactly the one we want return topic_id.type == self._topic_type def map_to_agent(self, topic_id: TopicId) -> AgentId: # Maps to an agent of the specified type, using the # topic's source as the agent's unique key. if not self.is_match(topic_id): raise CantHandleException(...) # Should not happen if used correctly return AgentId(type=self._agent_type, key=topic_id.source) # ... id property ...
This implementation provides the “one agent instance per source” behavior for a specific topic type.
-
DefaultSubscription
(_default_subscription.py
): This is often used via a decorator (@default_subscription
) and provides a convenient way to create aTypeSubscription
where theagent_type
is automatically inferred from the agent class being defined, and thetopic_type
defaults to “default” (but can be overridden). It simplifies common use cases.# From: _default_subscription.py (Conceptual Usage) from autogen_core import BaseAgent, default_subscription, ResearchFacts @default_subscription # Uses 'default' topic type, infers agent type 'writer' class WriterAgent(BaseAgent): # Agent logic here... async def on_message_impl(self, message: ResearchFacts, ctx): ... # Or specify the topic type @default_subscription(topic_type="research.facts.available") class SpecificWriterAgent(BaseAgent): # Agent logic here... async def on_message_impl(self, message: ResearchFacts, ctx): ...
The actual sending (publish_message
) and routing logic reside within the AgentRuntime
, which we’ll explore next.
Next Steps
You’ve learned how AutoGen Core uses a publish/subscribe system (TopicId
, Subscription
) to allow agents to communicate without direct coupling. This is crucial for building flexible and scalable multi-agent applications.
- Topic (
TopicId
): Named channels (type
/source
) for broadcasting messages. - Publish: Sending a message to a Topic.
- Subscription: An agent’s declared interest in messages on certain Topics, defining a routing rule.
Now, let’s dive into the orchestrator that manages agents and makes this messaging system work:
- Chapter 3: AgentRuntime: The manager responsible for creating, running, and connecting agents, including handling message publishing and subscription routing.
Generated by AI Codebase Knowledge Builder