Chapter 2: Node (BaseNode, Node, AsyncNode)
In Chapter 1: Shared State (shared dictionary), we learned how different parts of a PocketFlow workflow can communicate using the shared dictionary. Now, let’s meet the actual “workers” that perform the tasks and use this shared information: Nodes.
What are Nodes and Why Do We Need Them?
Imagine you’re building an AI that helps you write a story. This process might involve several steps:
- Generate a story idea.
- Write an outline based on the idea.
- Write the first draft of a chapter using the outline.
- Review and edit the chapter.
Each of these steps is a distinct task. In PocketFlow, each such task would be handled by a Node.
A Node is the fundamental building block in PocketFlow. It represents a single, atomic step in your workflow. Think of it as a highly specialized worker on an assembly line, responsible for one specific job. This job could be:
- Calling a Large Language Model (LLM) to generate text.
- Searching the web for information.
- Making a decision based on some data.
- Reading user input.
- Saving results to a file.
By breaking down a complex process into a series of Nodes, we make our AI applications:
- Modular: Each Node focuses on one thing, making it easier to develop, test, and understand.
- Reusable: A Node designed for web search can be used in many different AI applications.
- Manageable: It’s easier to build and debug a sequence of simple steps than one giant, monolithic piece of code.
The Anatomy of a Node: prep, exec, and post
Most Nodes in PocketFlow have a similar structure, typically involving three key methods:
prep(self, shared)(Prepare):- Purpose: This method is called before the Node does its main work. Its job is to get any necessary input data from the shared dictionary.
- Analogy: An assembly line worker picking up the necessary parts from a shared bin before starting their task.
- Input: It receives the
shareddictionary. - Output: It usually returns the specific data the Node needs for its core logic.
exec(self, prep_res)(Execute):- Purpose: This is where the Node performs its core task. This is the “brain” or “muscle” of the Node.
- Analogy: The assembly line worker actually assembling the parts or performing their specialized action.
- Input: It receives the result from the
prepmethod (prep_res). - Output: It returns the result of its execution (e.g., a summary, search results, a decision).
post(self, shared, prep_res, exec_res)(Post-process):- Purpose: This method is called after the Node has finished its main work. Its jobs are:
- To process the results from
exec. - To update the shared dictionary with these results or any other new information.
- To decide what should happen next in the workflow (this is crucial for Actions / Transitions, which we’ll cover in the next chapter).
- To process the results from
- Analogy: The assembly line worker placing the finished component onto the conveyor belt (updating
shared) and signaling if the item needs to go to a different station next (deciding the next action). - Input: It receives the
shareddictionary, the result fromprep(prep_res), and the result fromexec(exec_res). - Output: It can return an “action” string that tells the Flow (
Flow,AsyncFlow) which Node to execute next. If it returns nothing (orNone), a default transition is usually followed.
- Purpose: This method is called after the Node has finished its main work. Its jobs are:
Let’s make this concrete with a simple example: a SummarizeNode whose job is to take some text and produce a short summary.
# This is a conceptual Node, actual implementation details might vary slightly
from pocketflow import Node # We'll import the base class
class SummarizeNode(Node):
def prep(self, shared):
# 1. Prepare: Get the text to summarize from 'shared'
print("SummarizeNode: Preparing...")
text_to_summarize = shared.get("document_text", "No text found.")
return text_to_summarize
def exec(self, text_input):
# 2. Execute: Perform the summarization (e.g., call an LLM)
print(f"SummarizeNode: Executing with text: '{text_input[:30]}...'")
if not text_input or text_input == "No text found.":
return "Cannot summarize empty or missing text."
# In a real scenario, this would call an LLM or a summarization library
summary = f"This is a summary of: {text_input[:20]}..."
return summary
def post(self, shared, prep_res, exec_res):
# 3. Post-process: Store the summary in 'shared'
print(f"SummarizeNode: Posting summary: '{exec_res}'")
shared["summary_output"] = exec_res
# We might decide the next step here, e.g., return "summarization_done"
# For now, we'll just let it end by returning nothing (None)
Let’s imagine how this SummarizeNode would work:
- Initialization: You’d start with some text in the
shareddictionary.shared_data = {"document_text": "PocketFlow is a cool framework for building AI."} - Running the Node (simplified):
prep(shared_data)is called: It looks intoshared_dataand finds"PocketFlow is a cool framework for building AI.". It returns this text.exec("PocketFlow is a cool framework...")is called: It takes the text and (in our simplified example) creates a summary like"This is a summary of: PocketFlow is a cool...". It returns this summary.post(shared_data, text_from_prep, summary_from_exec)is called: It takes theshared_dataand thesummary_from_exec. It then adds a new entry:shared_data["summary_output"] = "This is a summary of: PocketFlow is a cool...".
After the Node runs, shared_data would look like this:
{
"document_text": "PocketFlow is a cool framework for building AI.",
"summary_output": "This is a summary of: PocketFlow is a cool..."
}
The summary is now available in the shared dictionary for other Nodes or for final output!
Types of Nodes: BaseNode, Node, AsyncNode
PocketFlow provides a few variations of Nodes, built on top of each other:
BaseNode:- This is the most fundamental type of Node. It provides the basic structure with
prep,exec, andpostmethods. - It’s like the basic blueprint for any worker.
- This is the most fundamental type of Node. It provides the basic structure with
Node(inherits fromBaseNode):- This is the standard synchronous Node you’ll often use. “Synchronous” means it performs its task and waits for it to complete before anything else happens.
- It adds helpful features on top of
BaseNode, like automatic retries if theexecmethod fails (e.g., a network error when calling an LLM) and anexec_fallbackmethod that can be called if all retries fail. - From
cookbook/pocketflow-node/flow.py, ourSummarizeNode is an example ofNode: ```python from pocketflow import Node # … other imports …
class Summarize(Node): # Inherits from Node # … prep, exec, post methods … def exec_fallback(self, shared, prep_res, exc): “"”Provide a simple fallback instead of crashing.””” return “There was an error processing your request.” ``` This
SummarizeNode, if itsexecmethod fails (e.g.,call_llmraises an error), will retry (default is 1 retry, but can be configured likeSummarize(max_retries=3)). If all retries fail,exec_fallbackis called.AsyncNode(inherits fromNode):- This type of Node is for asynchronous tasks. Asynchronous tasks are those that might take some time to complete (like waiting for a web request or a user to type something) but don’t need to block the entire program while they wait. They can “pause” and let other things run.
AsyncNodeusesasyncandawaitkeywords from Python’sasynciolibrary.- It has asynchronous versions of the core methods:
prep_async,exec_async, andpost_async. - We’ll dive much deeper into asynchronous operations in Chapter 5: Asynchronous Processing (
AsyncNode,AsyncFlow). For now, just know it exists for tasks that involve waiting. - Example from
cookbook/pocketflow-async-basic/nodes.py: ```python from pocketflow import AsyncNode # … other imports …
class FetchRecipes(AsyncNode): # Inherits from AsyncNode async def prep_async(self, shared): # … prepare input asynchronously … ingredient = await get_user_input(“Enter ingredient: “) # get_user_input is async return ingredient
async def exec_async(self, ingredient): # ... execute task asynchronously ... recipes = await fetch_recipes(ingredient) # fetch_recipes is async return recipes async def post_async(self, shared, prep_res, recipes): # ... post-process asynchronously ... shared["recipes"] = recipes return "suggest" # Action for the next step ``` Notice the `async def` and `await` keywords. This `FetchRecipes` Node can wait for user input and web requests without freezing the application.
How a Node Runs: Under the Hood (Simplified)
When PocketFlow decides it’s time for a particular Node to run (as part of a Flow (Flow, AsyncFlow)), it essentially orchestrates the calling of its prep, exec, and post methods in sequence.
Here’s a simplified view of what happens when a synchronous Node’s internal _run method is invoked:
- Call
prep:prep_result = self.prep(shared)- Your Node’s
prepmethod is called with the currentshareddictionary. - Whatever
prepreturns is stored.
- Your Node’s
- Call
_exec(which internally calls yourexecwith retries):exec_result = self._exec(prep_result)- The Node’s
_execmethod is called with theprep_result. - This
_execmethod in theNodeclass handles the retry logic. It will try to call yourexec(prep_result)method. - If your
execsucceeds, its result is stored. - If your
execraises an exception,_execmight wait and try again (up tomax_retries). - If all retries fail,
exec_fallback(prep_result, exception)is called, and its result is used asexec_result.
- The Node’s
- Call
post:action = self.post(shared, prep_result, exec_result)- Your Node’s
postmethod is called with theshareddictionary, theprep_result, and theexec_result. postcan modifysharedand returns an action string (orNone).
- Your Node’s
- Return Action: The
actionreturned bypostis then used by the Flow (Flow,AsyncFlow) to determine the next Node to run.
Let’s visualize this with a sequence diagram:
sequenceDiagram
participant FlowEngine as PocketFlow Engine
participant YourNode as Your Node Instance
participant SharedDict as Shared Dictionary
FlowEngine->>YourNode: _run(SharedDict)
YourNode->>YourNode: prep(SharedDict)
Note right of YourNode: Reads from SharedDict
YourNode-->>SharedDict: Access data (e.g., shared['input'])
YourNode->>YourNode: _exec(prep_result)
Note right of YourNode: Calls your exec(), handles retries/fallback
YourNode->>YourNode: post(SharedDict, prep_result, exec_result)
Note right of YourNode: Writes to SharedDict, decides next action
YourNode-->>SharedDict: Update data (e.g., shared['output'] = ...)
YourNode-->>FlowEngine: Returns action string
Code Glimpse (from pocketflow/__init__.py):
The BaseNode class defines the fundamental execution flow in its _run method (this is a direct, slightly simplified version):
# Inside BaseNode class from pocketflow/__init__.py
def _run(self, shared):
prep_output = self.prep(shared)
exec_output = self._exec(prep_output) # _exec calls self.exec
action = self.post(shared, prep_output, exec_output)
return action
This is the core loop for a single Node’s execution.
The Node class (which inherits from BaseNode) overrides _exec to add retry and fallback logic:
# Simplified concept from Node class in pocketflow/__init__.py
def _exec(self, prep_res):
for self.cur_retry in range(self.max_retries): # Loop for retries
try:
return self.exec(prep_res) # Call your Node's exec method
except Exception as e:
if self.cur_retry == self.max_retries - 1: # If last retry
return self.exec_fallback(prep_res, e) # Call fallback
if self.wait > 0:
time.sleep(self.wait) # Wait before retrying
This shows how Node makes your worker more robust by automatically handling temporary failures.
For AsyncNode, the methods are prep_async, exec_async, post_async, and they are awaited, allowing other tasks to run while waiting for I/O operations. This will be detailed in Chapter 5.
Conclusion
You’ve now been introduced to Nodes, the workhorses of PocketFlow!
- They represent single, atomic steps in your workflow.
- They typically follow a
prep->exec->postlifecycle. prepgets data from the shared dictionary.execperforms the core logic.postupdates theshareddictionary and can decide what happens next.Nodeprovides synchronous execution with retries and fallbacks.AsyncNodeprovides asynchronous execution for I/O-bound tasks.
Nodes are the building blocks you’ll use to define the individual capabilities of your AI agents and applications. But how do these Nodes connect to form a sequence or a more complex workflow? And how does the post method’s return value actually control the flow? That’s where Actions / Transitions come in, which we’ll explore in the next chapter!
Next up: Chapter 3: Actions / Transitions
Generated by AI Codebase Knowledge Builder