Flow
A Flow orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the Actions returned from each Node’s post()
.
1. Action-based Transitions
Each Node’s post()
returns an Action string. By default, if post()
doesn’t return anything, we treat that as "default"
.
You define transitions with the syntax:
-
Basic default transition:
node_a >> node_b
This means ifnode_a.post()
returns"default"
, go tonode_b
. (Equivalent tonode_a - "default" >> node_b
) -
Named action transition:
node_a - "action_name" >> node_b
This means ifnode_a.post()
returns"action_name"
, go tonode_b
.
It’s possible to create loops, branching, or multi-step flows.
2. Creating a Flow
A Flow begins with a start node. You call Flow(start=some_node)
to specify the entry point. When you call flow.run(shared)
, it executes the start node, looks at its returned Action from post()
, follows the transition, and continues until there’s no next node.
Example: Simple Sequence
Here’s a minimal flow of two nodes in a chain:
node_a >> node_b
flow = Flow(start=node_a)
flow.run(shared)
- When you run the flow, it executes
node_a
. - Suppose
node_a.post()
returns"default"
. - The flow then sees
"default"
Action is linked tonode_b
and runsnode_b
. node_b.post()
returns"default"
but we didn’t definenode_b >> something_else
. So the flow ends there.
Example: Branching & Looping
Here’s a simple expense approval flow that demonstrates branching and looping. The ReviewExpense
node can return three possible Actions:
"approved"
: expense is approved, move to payment processing"needs_revision"
: expense needs changes, send back for revision"rejected"
: expense is denied, finish the process
We can wire them like this:
# Define the flow connections
review - "approved" >> payment # If approved, process payment
review - "needs_revision" >> revise # If needs changes, go to revision
review - "rejected" >> finish # If rejected, finish the process
revise >> review # After revision, go back for another review
payment >> finish # After payment, finish the process
flow = Flow(start=review)
Let’s see how it flows:
- If
review.post()
returns"approved"
, the expense moves to thepayment
node - If
review.post()
returns"needs_revision"
, it goes to therevise
node, which then loops back toreview
- If
review.post()
returns"rejected"
, it moves to thefinish
node and stops
flowchart TD
review[Review Expense] -->|approved| payment[Process Payment]
review -->|needs_revision| revise[Revise Report]
review -->|rejected| finish[Finish Process]
revise --> review
payment --> finish
Running Individual Nodes vs. Running a Flow
node.run(shared)
: Just runs that node alone (callsprep->exec->post()
), returns an Action.flow.run(shared)
: Executes from the start node, follows Actions to the next node, and so on until the flow can’t continue.
node.run(shared)
does not proceed to the successor. This is mainly for debugging or testing a single node.Always use
flow.run(...)
in production to ensure the full pipeline runs correctly.
3. Nested Flows
A Flow can act like a Node, which enables powerful composition patterns. This means you can:
- Use a Flow as a Node within another Flow’s transitions.
- Combine multiple smaller Flows into a larger Flow for reuse.
- Node
params
will be a merging of all parents’params
.
Flow’s Node Methods
A Flow is also a Node, so it will run prep()
and post()
. However:
- It won’t run
exec()
, as its main logic is to orchestrate its nodes. post()
always receivesNone
forexec_res
and should instead get the flow execution results from the shared store.
Basic Flow Nesting
Here’s how to connect a flow to another node:
# Create a sub-flow
node_a >> node_b
subflow = Flow(start=node_a)
# Connect it to another node
subflow >> node_c
# Create the parent flow
parent_flow = Flow(start=subflow)
When parent_flow.run()
executes:
- It starts
subflow
subflow
runs through its nodes (node_a->node_b
)- After
subflow
completes, execution continues tonode_c
Example: Order Processing Pipeline
Here’s a practical example that breaks down order processing into nested flows:
# Payment processing sub-flow
validate_payment >> process_payment >> payment_confirmation
payment_flow = Flow(start=validate_payment)
# Inventory sub-flow
check_stock >> reserve_items >> update_inventory
inventory_flow = Flow(start=check_stock)
# Shipping sub-flow
create_label >> assign_carrier >> schedule_pickup
shipping_flow = Flow(start=create_label)
# Connect the flows into a main order pipeline
payment_flow >> inventory_flow >> shipping_flow
# Create the master flow
order_pipeline = Flow(start=payment_flow)
# Run the entire pipeline
order_pipeline.run(shared_data)
This creates a clean separation of concerns while maintaining a clear execution path:
flowchart LR
subgraph order_pipeline[Order Pipeline]
subgraph paymentFlow["Payment Flow"]
A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation]
end
subgraph inventoryFlow["Inventory Flow"]
D[Check Stock] --> E[Reserve Items] --> F[Update Inventory]
end
subgraph shippingFlow["Shipping Flow"]
G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup]
end
paymentFlow --> inventoryFlow
inventoryFlow --> shippingFlow
end