Chapter 4: Flow (Flow
, AsyncFlow
)
In Chapter 3: Actions / Transitions, we saw how individual Nodes (BaseNode
, Node
, AsyncNode
) can decide what should happen next by returning “action” strings, and how these actions lead to “transitions” between Nodes. But what actually manages this sequence? What’s the conductor of this orchestra of Nodes? That’s where Flows come in!
What Problem Do Flows Solve? Meet the Orchestrator!
Imagine you’re building a simple AI application that interacts with a user:
- Greet User Node: Displays a welcome message.
- Get Name Node: Asks the user for their name and stores it.
- Personalized Message Node: Uses the name to give a personalized response.
Each step is a Node (BaseNode
, Node
, AsyncNode
). But how do you ensure they run in the correct order? How does the “Get Name Node” know to run after “Greet User Node”, and how is the name passed along? This is the job of a Flow.
A Flow is like the blueprint or the manager of an assembly line. It defines the sequence of operations by connecting multiple Nodes (BaseNode
, Node
, AsyncNode
) into a complete workflow. It dictates:
- Which Node (
BaseNode
,Node
,AsyncNode
) starts the process. - How to move from one Node (
BaseNode
,Node
,AsyncNode
) to another based on the Actions / Transitions we learned about. - It ensures the shared dictionary is passed along, so all Nodes (
BaseNode
,Node
,AsyncNode
) have access to the data they need.
PocketFlow offers two main types of Flows:
Flow
: For workflows that consist primarily of synchronous Nodes (BaseNode
,Node
,AsyncNode
) (tasks that run one after another, blocking until complete).AsyncFlow
: For workflows that include asynchronous Nodes (BaseNode
,Node
,AsyncNode
) (tasks that can “pause” and let other operations run, like waiting for user input or a network request).
Let’s see how to build and use them!
Building Your First Flow
Let’s create a simple text transformation workflow using Flow
. It will:
- Get text input from the user.
- Offer transformation choices (uppercase, lowercase, etc.).
- Transform the text.
- Ask if the user wants to do another transformation or exit.
This example is inspired by cookbook/pocketflow-flow/flow.py
.
Step 1: Define Your Nodes
First, we need our worker Nodes (BaseNode
, Node
, AsyncNode
). (We’ll use conceptual Node definitions here for brevity; refer to Chapter 2 for Node details).
# Assume these Nodes are defined (simplified from cookbook/pocketflow-flow/flow.py)
# from pocketflow import Node
class TextInput(Node): # Gets input and choice
def post(self, shared, prep_res, exec_res):
# ... (gets user input for text and choice) ...
# shared["text"] = user_text
# shared["choice"] = user_choice
if shared["choice"] == "5": # Exit choice
return "exit"
return "transform" # Action to proceed to transformation
class TextTransform(Node): # Transforms text based on choice
def post(self, shared, prep_res, exec_res):
# ... (transforms text, prints result) ...
# shared["transformed_text"] = result
if input("Convert another? (y/n): ") == 'y':
shared.pop("text", None) # Clear for next input
return "input" # Action to go back to TextInput
return "exit" # Action to end
class EndNode(Node): # A simple Node to mark the end
pass
TextInput
: Itspost
method will return"transform"
to move to theTextTransform
Node, or"exit"
.TextTransform
: Itspost
method will return"input"
to loop back toTextInput
, or"exit"
.
Step 2: Instantiate Your Nodes
Create an instance of each Node class:
text_input = TextInput()
text_transform = TextTransform()
end_node = EndNode()
Step 3: Connect Nodes Using Transitions
Now, tell PocketFlow how these Nodes (BaseNode
, Node
, AsyncNode
) connect based on the actions they return. We learned this in Chapter 3: Actions / Transitions.
# If text_input returns "transform", go to text_transform
text_input - "transform" >> text_transform
# If text_input returns "exit" (or any other unhandled action by default for this setup)
# we'll eventually want it to go to end_node or the flow just ends.
# For simplicity here, let's make "exit" explicit if we want a dedicated end.
text_input - "exit" >> end_node # Or simply let it end if no "exit" transition
# If text_transform returns "input", go back to text_input
text_transform - "input" >> text_input
# If text_transform returns "exit", go to end_node
text_transform - "exit" >> end_node
Step 4: Create the Flow
Now, create an instance of the Flow
class, telling it which Node (BaseNode
, Node
, AsyncNode
) to start with.
from pocketflow import Flow
# Create the flow, starting with the text_input node
app_flow = Flow(start=text_input)
And that’s it! app_flow
is now a complete, runnable workflow.
Step 5: Run the Flow
To execute your workflow, you call its run
method, usually with an initial shared dictionary.
initial_shared_data = {} # Start with an empty shared dictionary
app_flow.run(initial_shared_data)
# After the flow finishes, initial_shared_data might contain final results
# if your nodes were designed to store them there.
print("Flow finished!")
When you run this:
app_flow
will start withtext_input
.text_input
will execute (prompting you for text and choice).- Based on the action returned by
text_input
(e.g.,"transform"
), theFlow
will look at the transitions you defined and execute the next Node (BaseNode
,Node
,AsyncNode
) (e.g.,text_transform
). - This continues until a Node (
BaseNode
,Node
,AsyncNode
) returns an action for which no transition is defined, or it transitions to a Node (BaseNode
,Node
,AsyncNode
) likeend_node
that doesn’t lead anywhere else.
Orchestrating Asynchronous Tasks with AsyncFlow
What if your workflow involves tasks that wait for external operations, like fetching data from a website or waiting for a user to type something slowly? If you use a regular Flow
and synchronous Nodes (BaseNode
, Node
, AsyncNode
) for these, your whole application would freeze during these waits.
This is where AsyncFlow
and Asynchronous Processing (AsyncNode
, AsyncFlow
) come in. AsyncFlow
is designed to work with AsyncNode
s, which can perform tasks asynchronously.
Let’s look at a conceptual recipe finder flow (inspired by cookbook/pocketflow-async-basic/flow.py
).
Step 1: Define Your AsyncNodes You’d define Nodes (BaseNode
, Node
, AsyncNode
) using AsyncNode
and async def
methods.
# from pocketflow import AsyncNode, Node
class FetchRecipes(AsyncNode): # Gets ingredient & fetches recipes (async)
async def post_async(self, shared, prep_res, exec_res):
# ... (stores recipes in shared) ...
return "suggest" # Action to suggest a recipe
class SuggestRecipe(Node): # Suggests a recipe (can be sync)
def post(self, shared, prep_res, exec_res):
# ... (prints suggestion) ...
return "approve" # Action to get approval
class GetApproval(AsyncNode): # Gets user approval (async)
async def post_async(self, shared, prep_res, exec_res):
# ... (gets approval) ...
if approved: return "accept"
return "retry" # Action to suggest another
class EndFlowNode(Node): pass # Simple synchronous end node
Step 2 & 3: Instantiate and Connect This is very similar to Flow
:
fetch_recipes = FetchRecipes()
suggest_recipe = SuggestRecipe()
get_approval = GetApproval()
end_node = EndFlowNode()
fetch_recipes - "suggest" >> suggest_recipe
suggest_recipe - "approve" >> get_approval
get_approval - "retry" >> suggest_recipe # Loop back
get_approval - "accept" >> end_node
Step 4: Create the AsyncFlow
from pocketflow import AsyncFlow
recipe_flow = AsyncFlow(start=fetch_recipes)
Notice we use AsyncFlow
here.
Step 5: Run the AsyncFlow
Running an AsyncFlow
involves async
and await
because the flow itself is asynchronous.
import asyncio
async def main():
initial_shared = {}
await recipe_flow.run_async(initial_shared) # Use run_async()
print("Recipe flow finished!")
# To run the main async function
# asyncio.run(main())
The AsyncFlow
will manage the AsyncNode
s, allowing them to await
their operations without blocking the entire event loop (if you’re running other async tasks). We’ll explore this more in Chapter 5: Asynchronous Processing (AsyncNode
, AsyncFlow
).
Nesting Flows: Managing Complexity
What if your workflow becomes very large and complex? You can break it down! A Flow can itself be treated as a Node and nested within another Flow. This is like having a project manager who oversees several team leads, and each team lead manages their own team’s tasks.
Consider the cookbook/pocketflow-supervisor/flow.py
example. It has an agent_inner_flow
which handles research, and then an outer Flow
that uses this agent_inner_flow
as a step, followed by a SupervisorNode
to check the agent’s work.
# Conceptual: from cookbook/pocketflow-supervisor/flow.py
# agent_inner_flow is a complete Flow instance itself
agent_inner_flow = create_agent_inner_flow()
supervisor = SupervisorNode()
# The inner flow is treated like a node in the outer flow's transitions
agent_inner_flow >> supervisor # Default transition
supervisor - "retry" >> agent_inner_flow
supervised_flow = Flow(start=agent_inner_flow)
Here, agent_inner_flow
runs completely. When it finishes, the supervised_flow
transitions to the supervisor
Node. This is a powerful way to create hierarchical and modular workflows.
Under the Hood: How Do Flows Orchestrate?
At its core, a Flow
(or AsyncFlow
) runs a loop that:
- Identifies the current Node (
BaseNode
,Node
,AsyncNode
) to run (starting with itsstart_node
). - Executes this Node (
BaseNode
,Node
,AsyncNode
) (which involves itsprep
,exec
, andpost
methods). - Gets the “action” string returned by the Node’s
post
method. - Uses this action string to look up the next Node (
BaseNode
,Node
,AsyncNode
) based on the transitions you defined (e.g.,current_node - "action" >> next_node
). - If a next Node (
BaseNode
,Node
,AsyncNode
) is found, it becomes the current Node (BaseNode
,Node
,AsyncNode
), and the loop continues. - If no next Node (
BaseNode
,Node
,AsyncNode
) is found (no matching transition), the flow (or that branch of it) ends.
Here’s a simplified sequence diagram:
sequenceDiagram
participant You
participant MyFlow as Flow Object
participant NodeA as Start Node
participant NodeB as Next Node
participant SharedDict as Shared Dictionary
You->>MyFlow: flow.run(initial_shared)
MyFlow->>SharedDict: Initialize with initial_shared
MyFlow->>NodeA: _run(SharedDict)
NodeA-->>MyFlow: returns action_A (from NodeA's post method)
MyFlow->>MyFlow: get_next_node(NodeA, action_A)
Note right of MyFlow: Finds NodeB based on NodeA's transitions
MyFlow->>NodeB: _run(SharedDict)
NodeB-->>MyFlow: returns action_B (from NodeB's post method)
MyFlow->>MyFlow: get_next_node(NodeB, action_B)
Note right of MyFlow: No more nodes or no transition found. Flow ends.
MyFlow-->>You: Flow execution complete
A Glimpse into the Code (pocketflow/__init__.py
):
The Flow
class inherits from BaseNode
, so it also has prep
, exec
, post
methods. Its main job is done in its orchestration logic.
- Initialization: When you create a
Flow
, you give it a starting Node (BaseNode
,Node
,AsyncNode
).# Inside Flow class def __init__(self, start=None): super().__init__() # Initialize BaseNode parts self.start_node = start # Store the starting node
- Getting the Next Node: The
get_next_node
method is crucial. It checks the current node’ssuccessors
dictionary (which was populated by your transition definitions likenodeA - "action" >> nodeB
).# Inside Flow class def get_next_node(self, current_node, action_str): # Try specific action, then "default" next_node = current_node.successors.get(action_str) if not next_node: # If specific action's successor not found next_node = current_node.successors.get("default") # ... (warnings if no successor found but some exist) ... return next_node
- The Orchestration Loop (
_orch
): This is the heart of theFlow
.# Inside Flow class (simplified) def _orch(self, shared, params=None): current_node = self.start_node last_action = None while current_node: # ... (set parameters for current_node if any) ... last_action = current_node._run(shared) # Run the node # Get the next node based on the action from the current one current_node = self.get_next_node(current_node, last_action) return last_action # Returns the very last action from the flow
The
current_node._run(shared)
call is what executes theprep -> exec -> post
cycle of that Node (BaseNode
,Node
,AsyncNode
).
For AsyncFlow
, the structure is very similar. It has an _orch_async
method:
# Inside AsyncFlow class (conceptual)
async def _orch_async(self, shared, params=None):
current_node = self.start_node
last_action = None
while current_node:
# ...
if isinstance(current_node, AsyncNode):
last_action = await current_node._run_async(shared) # Await async nodes
else:
last_action = current_node._run(shared) # Run sync nodes normally
current_node = self.get_next_node(current_node, last_action)
return last_action
The key difference is that it await
s the _run_async
method of AsyncNode
s, allowing for non-blocking execution.
Conclusion
You’ve now learned about Flow
and AsyncFlow
, the orchestrators that bring your Nodes (BaseNode
, Node
, AsyncNode
) together to form complete, dynamic workflows!
- Flows define the sequence and logic of how Nodes (
BaseNode
,Node
,AsyncNode
) are executed. - They use the “action” strings returned by Nodes (
BaseNode
,Node
,AsyncNode
) and the transition rules you define (e.g.,nodeA - "action" >> nodeB
) to decide the path of execution. Flow
is for synchronous workflows, whileAsyncFlow
handles workflows with asynchronous tasks usingAsyncNode
s.- Flows can be nested to manage complexity.
With Flows, you can build anything from simple linear sequences to complex, branching, and looping AI applications.
In the next chapter, we’ll take a much deeper dive into the world of asynchronous operations specifically, exploring how AsyncNode
and AsyncFlow
enable you to build responsive, I/O-bound applications efficiently.
Next up: Chapter 5: Asynchronous Processing (AsyncNode
, AsyncFlow
)
Generated by AI Codebase Knowledge Builder