Node
A Node is the smallest building block. Each Node has 3 steps prep->exec->post
:

prep(shared)
- Read and preprocess data from
shared
store. - Examples: query DB, read files, or serialize data into a string.
- Return
prep_res
, which is used byexec()
andpost()
.
- Read and preprocess data from
exec(prep_res)
- Execute compute logic, with optional retries and error handling (below).
- Examples: (mostly) LLM calls, remote APIs, tool use.
- ⚠️ This shall be only for compute and NOT access
shared
. - ⚠️ If retries enabled, ensure idempotent implementation.
- Return
exec_res
, which is passed topost()
.
post(shared, prep_res, exec_res)
- Postprocess and write data back to
shared
. - Examples: update DB, change states, log results.
- Decide the next action by returning a string (
action = "default"
if None).
- Postprocess and write data back to
Why 3 steps? To enforce the principle of separation of concerns. The data storage and data processing are operated separately.
All steps are optional. E.g., you can only implement
prep
andpost
if you just need to process data.
Fault Tolerance & Retries
You can retry exec()
if it raises an exception via two parameters when define the Node:
max_retries
(int): Max times to runexec()
. The default is1
(no retry).wait
(int): The time to wait (in seconds) before next retry. By default,wait=0
(no waiting).wait
is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off.
my_node = SummarizeFile(max_retries=3, wait=10)
When an exception occurs in exec()
, the Node automatically retries until:
- It either succeeds, or
- The Node has retried
max_retries - 1
times already and fails on the last attempt.
You can get the current retry times (0-based) from self.cur_retry
.
class RetryNode(Node):
def exec(self, prep_res):
print(f"Retry {self.cur_retry} times")
raise Exception("Failed")
Graceful Fallback
To gracefully handle the exception (after all retries) rather than raising it, override:
def exec_fallback(self, shared, prep_res, exc):
raise exc
By default, it just re-raises exception. But you can return a fallback result instead, which becomes the exec_res
passed to post()
.
Example: Summarize file
class SummarizeFile(Node):
def prep(self, shared):
return shared["data"]
def exec(self, prep_res):
if not prep_res:
return "Empty file content"
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt) # might fail
return summary
def exec_fallback(self, shared, prep_res, exc):
# Provide a simple fallback instead of crashing
return "There was an error processing your request."
def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
# Return "default" by not returning
summarize_node = SummarizeFile(max_retries=3)
# node.run() calls prep->exec->post
# If exec() fails, it retries up to 3 times before calling exec_fallback()
action_result = summarize_node.run(shared)
print("Action returned:", action_result) # "default"
print("Summary stored:", shared["summary"])