Node

A Node is the smallest building block. Each Node has 3 steps prep->exec->post:

  1. prep(shared)
    • Read and preprocess data from shared store.
    • Examples: query DB, read files, or serialize data into a string.
    • Return prep_res, which is used by exec() and post().
  2. exec(prep_res)
    • Execute compute logic, with optional retries and error handling (below).
    • Examples: (mostly) LLM calls, remote APIs, tool use.
    • ⚠️ This shall be only for compute and NOT access shared.
    • ⚠️ If retries enabled, ensure idempotent implementation.
    • Return exec_res, which is passed to post().
  3. post(shared, prep_res, exec_res)
    • Postprocess and write data back to shared.
    • Examples: update DB, change states, log results.
    • Decide the next action by returning a string (action = "default" if None).

Why 3 steps? To enforce the principle of separation of concerns. The data storage and data processing are operated separately.

All steps are optional. E.g., you can only implement prep and post if you just need to process data.

Fault Tolerance & Retries

You can retry exec() if it raises an exception via two parameters when define the Node:

  • max_retries (int): Max times to run exec(). The default is 1 (no retry).
  • wait (int): The time to wait (in seconds) before next retry. By default, wait=0 (no waiting). wait is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off.
my_node = SummarizeFile(max_retries=3, wait=10)

When an exception occurs in exec(), the Node automatically retries until:

  • It either succeeds, or
  • The Node has retried max_retries - 1 times already and fails on the last attempt.

You can get the current retry times (0-based) from self.cur_retry.

class RetryNode(Node):
    def exec(self, prep_res):
        print(f"Retry {self.cur_retry} times")
        raise Exception("Failed")

Graceful Fallback

To gracefully handle the exception (after all retries) rather than raising it, override:

def exec_fallback(self, shared, prep_res, exc):
    raise exc

By default, it just re-raises exception. But you can return a fallback result instead, which becomes the exec_res passed to post().

Example: Summarize file

class SummarizeFile(Node):
    def prep(self, shared):
        return shared["data"]

    def exec(self, prep_res):
        if not prep_res:
            return "Empty file content"
        prompt = f"Summarize this text in 10 words: {prep_res}"
        summary = call_llm(prompt)  # might fail
        return summary

    def exec_fallback(self, shared, prep_res, exc):
        # Provide a simple fallback instead of crashing
        return "There was an error processing your request."

    def post(self, shared, prep_res, exec_res):
        shared["summary"] = exec_res
        # Return "default" by not returning

summarize_node = SummarizeFile(max_retries=3)

# node.run() calls prep->exec->post
# If exec() fails, it retries up to 3 times before calling exec_fallback()
action_result = summarize_node.run(shared)

print("Action returned:", action_result)  # "default"
print("Summary stored:", shared["summary"])