Batch
Batch makes it easier to handle large inputs in one Node or rerun a Flow multiple times. Example use cases:
- Chunk-based processing (e.g., splitting large texts).
- Iterative processing over lists of input items (e.g., user queries, files, URLs).
1. BatchNode
A BatchNode extends Node
but changes prep()
and exec()
:
prep(shared)
: returns an iterable (e.g., list, generator).exec(item)
: called once per item in that iterable.post(shared, prep_res, exec_res_list)
: after all items are processed, receives a list of results (exec_res_list
) and returns an Action.
Example: Summarize a Large File
class MapSummaries(BatchNode):
def prep(self, shared):
# Suppose we have a big file; chunk it
content = shared["data"]
chunk_size = 10000
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
return chunks
def exec(self, chunk):
prompt = f"Summarize this chunk in 10 words: {chunk}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res_list):
combined = "\n".join(exec_res_list)
shared["summary"] = combined
return "default"
map_summaries = MapSummaries()
flow = Flow(start=map_summaries)
flow.run(shared)
2. BatchFlow
A BatchFlow runs a Flow multiple times, each time with different params
. Think of it as a loop that replays the Flow for each parameter set.
Key Differences from BatchNode
Important: Unlike BatchNode, which processes items and modifies the shared store:
- BatchFlow returns parameters to pass to the child Flow, not data to process
- These parameters are accessed in child nodes via
self.params
, not from the shared store - Each child Flow runs independently with a different set of parameters
- Child nodes can be regular Nodes, not BatchNodes (the batching happens at the Flow level)
Example: Summarize Many Files
class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
# IMPORTANT: Return a list of param dictionaries (not data for processing)
filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...]
return [{"filename": fn} for fn in filenames]
# Child node that accesses filename from params, not shared store
class LoadFile(Node):
def prep(self, shared):
# Access filename from params (not from shared)
filename = self.params["filename"] # Important! Use self.params, not shared
return filename
def exec(self, filename):
with open(filename, 'r') as f:
return f.read()
def post(self, shared, prep_res, exec_res):
# Store file content in shared
shared["current_file_content"] = exec_res
return "default"
# Summarize node that works on the currently loaded file
class Summarize(Node):
def prep(self, shared):
return shared["current_file_content"]
def exec(self, content):
prompt = f"Summarize this file in 50 words: {content}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
# Store summary in shared, indexed by current filename
filename = self.params["filename"] # Again, using params
if "summaries" not in shared:
shared["summaries"] = {}
shared["summaries"][filename] = exec_res
return "default"
# Create a per-file flow
load_file = LoadFile()
summarize = Summarize()
load_file >> summarize
summarize_file = Flow(start=load_file)
# Wrap in a BatchFlow to process all files
summarize_all_files = SummarizeAllFiles(start=summarize_file)
summarize_all_files.run(shared)
Under the Hood
prep(shared)
in the BatchFlow returns a list of param dicts—e.g.,[{"filename": "file1.txt"}, {"filename": "file2.txt"}, ...]
.- The BatchFlow loops through each dict. For each one:
- It merges the dict with the BatchFlow’s own
params
(if any):{**batch_flow.params, **dict_from_prep}
- It calls
flow.run(shared)
using the merged parameters - IMPORTANT: These parameters are passed to the child Flow’s nodes via
self.params
, NOT via the shared store
- It merges the dict with the BatchFlow’s own
- This means the sub-Flow is run repeatedly, once for every param dict, with each node in the flow accessing the parameters via
self.params
.
3. Nested or Multi-Level Batches
You can nest a BatchFlow in another BatchFlow. For instance:
- Outer batch: returns a list of directory param dicts (e.g.,
{"directory": "/pathA"}
,{"directory": "/pathB"}
, …). - Inner batch: returning a list of per-file param dicts.
At each level, BatchFlow merges its own param dict with the parent’s. By the time you reach the innermost node, the final params
is the merged result of all parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once.
class FileBatchFlow(BatchFlow):
def prep(self, shared):
# Access directory from params (set by parent)
directory = self.params["directory"]
# e.g., files = ["file1.txt", "file2.txt", ...]
files = [f for f in os.listdir(directory) if f.endswith(".txt")]
return [{"filename": f} for f in files]
class DirectoryBatchFlow(BatchFlow):
def prep(self, shared):
directories = [ "/path/to/dirA", "/path/to/dirB"]
return [{"directory": d} for d in directories]
# The actual processing node
class ProcessFile(Node):
def prep(self, shared):
# Access both directory and filename from params
directory = self.params["directory"] # From outer batch
filename = self.params["filename"] # From inner batch
full_path = os.path.join(directory, filename)
return full_path
def exec(self, full_path):
# Process the file...
return f"Processed {full_path}"
def post(self, shared, prep_res, exec_res):
# Store results, perhaps indexed by path
if "results" not in shared:
shared["results"] = {}
shared["results"][prep_res] = exec_res
return "default"
# Set up the nested batch structure
process_node = ProcessFile()
inner_flow = FileBatchFlow(start=process_node)
outer_flow = DirectoryBatchFlow(start=inner_flow)
# Run it
outer_flow.run(shared)