Chapter 2: Node (BaseNode
, Node
, AsyncNode
)
In Chapter 1: Shared State (shared
dictionary), we learned how different parts of a PocketFlow workflow can communicate using the shared
dictionary. Now, let’s meet the actual “workers” that perform the tasks and use this shared information: Nodes.
What are Nodes and Why Do We Need Them?
Imagine you’re building an AI that helps you write a story. This process might involve several steps:
- Generate a story idea.
- Write an outline based on the idea.
- Write the first draft of a chapter using the outline.
- Review and edit the chapter.
Each of these steps is a distinct task. In PocketFlow, each such task would be handled by a Node.
A Node is the fundamental building block in PocketFlow. It represents a single, atomic step in your workflow. Think of it as a highly specialized worker on an assembly line, responsible for one specific job. This job could be:
- Calling a Large Language Model (LLM) to generate text.
- Searching the web for information.
- Making a decision based on some data.
- Reading user input.
- Saving results to a file.
By breaking down a complex process into a series of Nodes, we make our AI applications:
- Modular: Each Node focuses on one thing, making it easier to develop, test, and understand.
- Reusable: A Node designed for web search can be used in many different AI applications.
- Manageable: It’s easier to build and debug a sequence of simple steps than one giant, monolithic piece of code.
The Anatomy of a Node: prep
, exec
, and post
Most Nodes in PocketFlow have a similar structure, typically involving three key methods:
prep(self, shared)
(Prepare):- Purpose: This method is called before the Node does its main work. Its job is to get any necessary input data from the shared dictionary.
- Analogy: An assembly line worker picking up the necessary parts from a shared bin before starting their task.
- Input: It receives the
shared
dictionary. - Output: It usually returns the specific data the Node needs for its core logic.
exec(self, prep_res)
(Execute):- Purpose: This is where the Node performs its core task. This is the “brain” or “muscle” of the Node.
- Analogy: The assembly line worker actually assembling the parts or performing their specialized action.
- Input: It receives the result from the
prep
method (prep_res
). - Output: It returns the result of its execution (e.g., a summary, search results, a decision).
post(self, shared, prep_res, exec_res)
(Post-process):- Purpose: This method is called after the Node has finished its main work. Its jobs are:
- To process the results from
exec
. - To update the shared dictionary with these results or any other new information.
- To decide what should happen next in the workflow (this is crucial for Actions / Transitions, which we’ll cover in the next chapter).
- To process the results from
- Analogy: The assembly line worker placing the finished component onto the conveyor belt (updating
shared
) and signaling if the item needs to go to a different station next (deciding the next action). - Input: It receives the
shared
dictionary, the result fromprep
(prep_res
), and the result fromexec
(exec_res
). - Output: It can return an “action” string that tells the Flow (
Flow
,AsyncFlow
) which Node to execute next. If it returns nothing (orNone
), a default transition is usually followed.
- Purpose: This method is called after the Node has finished its main work. Its jobs are:
Let’s make this concrete with a simple example: a SummarizeNode
whose job is to take some text and produce a short summary.
# This is a conceptual Node, actual implementation details might vary slightly
from pocketflow import Node # We'll import the base class
class SummarizeNode(Node):
def prep(self, shared):
# 1. Prepare: Get the text to summarize from 'shared'
print("SummarizeNode: Preparing...")
text_to_summarize = shared.get("document_text", "No text found.")
return text_to_summarize
def exec(self, text_input):
# 2. Execute: Perform the summarization (e.g., call an LLM)
print(f"SummarizeNode: Executing with text: '{text_input[:30]}...'")
if not text_input or text_input == "No text found.":
return "Cannot summarize empty or missing text."
# In a real scenario, this would call an LLM or a summarization library
summary = f"This is a summary of: {text_input[:20]}..."
return summary
def post(self, shared, prep_res, exec_res):
# 3. Post-process: Store the summary in 'shared'
print(f"SummarizeNode: Posting summary: '{exec_res}'")
shared["summary_output"] = exec_res
# We might decide the next step here, e.g., return "summarization_done"
# For now, we'll just let it end by returning nothing (None)
Let’s imagine how this SummarizeNode
would work:
- Initialization: You’d start with some text in the
shared
dictionary.shared_data = {"document_text": "PocketFlow is a cool framework for building AI."}
- Running the Node (simplified):
prep(shared_data)
is called: It looks intoshared_data
and finds"PocketFlow is a cool framework for building AI."
. It returns this text.exec("PocketFlow is a cool framework...")
is called: It takes the text and (in our simplified example) creates a summary like"This is a summary of: PocketFlow is a cool..."
. It returns this summary.post(shared_data, text_from_prep, summary_from_exec)
is called: It takes theshared_data
and thesummary_from_exec
. It then adds a new entry:shared_data["summary_output"] = "This is a summary of: PocketFlow is a cool..."
.
After the Node runs, shared_data
would look like this:
{
"document_text": "PocketFlow is a cool framework for building AI.",
"summary_output": "This is a summary of: PocketFlow is a cool..."
}
The summary is now available in the shared
dictionary for other Nodes or for final output!
Types of Nodes: BaseNode
, Node
, AsyncNode
PocketFlow provides a few variations of Nodes, built on top of each other:
BaseNode
:- This is the most fundamental type of Node. It provides the basic structure with
prep
,exec
, andpost
methods. - It’s like the basic blueprint for any worker.
- This is the most fundamental type of Node. It provides the basic structure with
Node
(inherits fromBaseNode
):- This is the standard synchronous Node you’ll often use. “Synchronous” means it performs its task and waits for it to complete before anything else happens.
- It adds helpful features on top of
BaseNode
, like automatic retries if theexec
method fails (e.g., a network error when calling an LLM) and anexec_fallback
method that can be called if all retries fail. - From
cookbook/pocketflow-node/flow.py
, ourSummarize
Node is an example ofNode
: ```python from pocketflow import Node # … other imports …
class Summarize(Node): # Inherits from Node # … prep, exec, post methods … def exec_fallback(self, shared, prep_res, exc): “"”Provide a simple fallback instead of crashing.””” return “There was an error processing your request.” ``` This
Summarize
Node, if itsexec
method fails (e.g.,call_llm
raises an error), will retry (default is 1 retry, but can be configured likeSummarize(max_retries=3)
). If all retries fail,exec_fallback
is called.AsyncNode
(inherits fromNode
):- This type of Node is for asynchronous tasks. Asynchronous tasks are those that might take some time to complete (like waiting for a web request or a user to type something) but don’t need to block the entire program while they wait. They can “pause” and let other things run.
AsyncNode
usesasync
andawait
keywords from Python’sasyncio
library.- It has asynchronous versions of the core methods:
prep_async
,exec_async
, andpost_async
. - We’ll dive much deeper into asynchronous operations in Chapter 5: Asynchronous Processing (
AsyncNode
,AsyncFlow
). For now, just know it exists for tasks that involve waiting. - Example from
cookbook/pocketflow-async-basic/nodes.py
: ```python from pocketflow import AsyncNode # … other imports …
class FetchRecipes(AsyncNode): # Inherits from AsyncNode async def prep_async(self, shared): # … prepare input asynchronously … ingredient = await get_user_input(“Enter ingredient: “) # get_user_input is async return ingredient
async def exec_async(self, ingredient): # ... execute task asynchronously ... recipes = await fetch_recipes(ingredient) # fetch_recipes is async return recipes async def post_async(self, shared, prep_res, recipes): # ... post-process asynchronously ... shared["recipes"] = recipes return "suggest" # Action for the next step ``` Notice the `async def` and `await` keywords. This `FetchRecipes` Node can wait for user input and web requests without freezing the application.
How a Node Runs: Under the Hood (Simplified)
When PocketFlow decides it’s time for a particular Node to run (as part of a Flow (Flow
, AsyncFlow
)), it essentially orchestrates the calling of its prep
, exec
, and post
methods in sequence.
Here’s a simplified view of what happens when a synchronous Node
’s internal _run
method is invoked:
- Call
prep
:prep_result = self.prep(shared)
- Your Node’s
prep
method is called with the currentshared
dictionary. - Whatever
prep
returns is stored.
- Your Node’s
- Call
_exec
(which internally calls yourexec
with retries):exec_result = self._exec(prep_result)
- The Node’s
_exec
method is called with theprep_result
. - This
_exec
method in theNode
class handles the retry logic. It will try to call yourexec(prep_result)
method. - If your
exec
succeeds, its result is stored. - If your
exec
raises an exception,_exec
might wait and try again (up tomax_retries
). - If all retries fail,
exec_fallback(prep_result, exception)
is called, and its result is used asexec_result
.
- The Node’s
- Call
post
:action = self.post(shared, prep_result, exec_result)
- Your Node’s
post
method is called with theshared
dictionary, theprep_result
, and theexec_result
. post
can modifyshared
and returns an action string (orNone
).
- Your Node’s
- Return Action: The
action
returned bypost
is then used by the Flow (Flow
,AsyncFlow
) to determine the next Node to run.
Let’s visualize this with a sequence diagram:
sequenceDiagram
participant FlowEngine as PocketFlow Engine
participant YourNode as Your Node Instance
participant SharedDict as Shared Dictionary
FlowEngine->>YourNode: _run(SharedDict)
YourNode->>YourNode: prep(SharedDict)
Note right of YourNode: Reads from SharedDict
YourNode-->>SharedDict: Access data (e.g., shared['input'])
YourNode->>YourNode: _exec(prep_result)
Note right of YourNode: Calls your exec(), handles retries/fallback
YourNode->>YourNode: post(SharedDict, prep_result, exec_result)
Note right of YourNode: Writes to SharedDict, decides next action
YourNode-->>SharedDict: Update data (e.g., shared['output'] = ...)
YourNode-->>FlowEngine: Returns action string
Code Glimpse (from pocketflow/__init__.py
):
The BaseNode
class defines the fundamental execution flow in its _run
method (this is a direct, slightly simplified version):
# Inside BaseNode class from pocketflow/__init__.py
def _run(self, shared):
prep_output = self.prep(shared)
exec_output = self._exec(prep_output) # _exec calls self.exec
action = self.post(shared, prep_output, exec_output)
return action
This is the core loop for a single Node’s execution.
The Node
class (which inherits from BaseNode
) overrides _exec
to add retry and fallback logic:
# Simplified concept from Node class in pocketflow/__init__.py
def _exec(self, prep_res):
for self.cur_retry in range(self.max_retries): # Loop for retries
try:
return self.exec(prep_res) # Call your Node's exec method
except Exception as e:
if self.cur_retry == self.max_retries - 1: # If last retry
return self.exec_fallback(prep_res, e) # Call fallback
if self.wait > 0:
time.sleep(self.wait) # Wait before retrying
This shows how Node
makes your worker more robust by automatically handling temporary failures.
For AsyncNode
, the methods are prep_async
, exec_async
, post_async
, and they are await
ed, allowing other tasks to run while waiting for I/O operations. This will be detailed in Chapter 5.
Conclusion
You’ve now been introduced to Nodes, the workhorses of PocketFlow!
- They represent single, atomic steps in your workflow.
- They typically follow a
prep
->exec
->post
lifecycle. prep
gets data from the shared dictionary.exec
performs the core logic.post
updates theshared
dictionary and can decide what happens next.Node
provides synchronous execution with retries and fallbacks.AsyncNode
provides asynchronous execution for I/O-bound tasks.
Nodes are the building blocks you’ll use to define the individual capabilities of your AI agents and applications. But how do these Nodes connect to form a sequence or a more complex workflow? And how does the post
method’s return value actually control the flow? That’s where Actions / Transitions come in, which we’ll explore in the next chapter!
Next up: Chapter 3: Actions / Transitions
Generated by AI Codebase Knowledge Builder