Communication
Nodes and Flows communicate in two ways:
-
Shared Store (recommended)
- A global data structure (often an in-mem dict) that all nodes can read and write by
prep()
andpost()
. - Great for data results, large content, or anything multiple nodes need.
- You shall design the data structure and populate it ahead.
- A global data structure (often an in-mem dict) that all nodes can read and write by
-
Params (only for Batch)
- Each node has a local, ephemeral
params
dict passed in by the parent Flow, used as an identifier for tasks. Parameter keys and values shall be immutable. - Good for identifiers like filenames or numeric IDs, in Batch mode.
- Each node has a local, ephemeral
If you know memory management, think of the Shared Store like a heap (shared by all function calls), and Params like a stack (assigned by the caller).
Use
Shared Store
for almost all cases. It’s flexible and easy to manage. It separates Data Schema from Compute Logic, making the code easier to maintain.Params
is more a syntax sugar for Batch.
1. Shared Store
Overview
A shared store is typically an in-mem dictionary, like:
shared = {"data": {}, "summary": {}, "config": {...}, ...}
It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements.
Example
class LoadData(Node):
def post(self, shared, prep_res, exec_res):
# We write data to shared store
shared["data"] = "Some text content"
return None
class Summarize(Node):
def prep(self, shared):
# We read data from shared store
return shared["data"]
def exec(self, prep_res):
# Call LLM to summarize
prompt = f"Summarize: {prep_res}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res):
# We write summary to shared store
shared["summary"] = exec_res
return "default"
load_data = LoadData()
summarize = Summarize()
load_data >> summarize
flow = Flow(start=load_data)
shared = {}
flow.run(shared)
Here:
LoadData
writes toshared["data"]
.Summarize
reads fromshared["data"]
, summarizes, and writes toshared["summary"]
.
2. Params
Params let you store per-Node or per-Flow config that doesn’t need to live in the shared store. They are:
- Immutable during a Node’s run cycle (i.e., they don’t change mid-
prep->exec->post
). - Set via
set_params()
. - Cleared and updated each time a parent Flow calls it.
Only set the uppermost Flow params because others will be overwritten by the parent Flow.
If you need to set child node params, see Batch.
Typically, Params are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store.
Example
# 1) Create a Node that uses params
class SummarizeFile(Node):
def prep(self, shared):
# Access the node's param
filename = self.params["filename"]
return shared["data"].get(filename, "")
def exec(self, prep_res):
prompt = f"Summarize: {prep_res}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
filename = self.params["filename"]
shared["summary"][filename] = exec_res
return "default"
# 2) Set params
node = SummarizeFile()
# 3) Set Node params directly (for testing)
node.set_params({"filename": "doc1.txt"})
node.run(shared)
# 4) Create Flow
flow = Flow(start=node)
# 5) Set Flow params (overwrites node params)
flow.set_params({"filename": "doc2.txt"})
flow.run(shared) # The node summarizes doc2, not doc1