You have been lied to.

Does it really need to be this complicated?

What if we stripped everything away?

Your LLM Framework Only Needs

100 LINES

https://github.com/The-Pocket/PocketFlow


import asyncio, warnings, copy, time

class BaseNode:
    def __init__(self): self.params,self.successors={},{}
    def set_params(self,params): self.params=params
    def next(self,node,action="default"):
        if action in self.successors: warnings.warn(f"Overwriting successor for action '{action}'")
        self.successors[action]=node; return node
    def prep(self,shared): pass
    def exec(self,prep_res): pass
    def post(self,shared,prep_res,exec_res): pass
    def _exec(self,prep_res): return self.exec(prep_res)
    def _run(self,shared): p=self.prep(shared); e=self._exec(p); return self.post(shared,p,e)
    def run(self,shared): 
        if self.successors: warnings.warn("Node won't run successors. Use Flow.")  
        return self._run(shared)
    def __rshift__(self,other): return self.next(other)
    def __sub__(self,action):
        if isinstance(action,str): return _ConditionalTransition(self,action)
        raise TypeError("Action must be a string")

class _ConditionalTransition:
    def __init__(self,src,action): self.src,self.action=src,action
    def __rshift__(self,tgt): return self.src.next(tgt,self.action)

class Node(BaseNode):
    def __init__(self,max_retries=1,wait=0): super().__init__(); self.max_retries,self.wait=max_retries,wait
    def exec_fallback(self,prep_res,exc): raise exc
    def _exec(self,prep_res):
        for self.cur_retry in range(self.max_retries):
            try: return self.exec(prep_res)
            except Exception as e:
                if self.cur_retry==self.max_retries-1: return self.exec_fallback(prep_res,e)
                if self.wait>0: time.sleep(self.wait)

class BatchNode(Node):
    def _exec(self,items): return [super(BatchNode,self)._exec(i) for i in (items or [])]

class Flow(BaseNode):
    def __init__(self,start=None): super().__init__(); self.start_node=start
    def start(self,start): self.start_node=start; return start
    def get_next_node(self,curr,action):
        nxt=curr.successors.get(action or "default")
        if not nxt and curr.successors: warnings.warn(f"Flow ends: '{action}' not found in {list(curr.successors)}")
        return nxt
    def _orch(self,shared,params=None):
        curr,p,last_action =copy.copy(self.start_node),(params or {**self.params}),None
        while curr: curr.set_params(p); last_action=curr._run(shared); curr=copy.copy(self.get_next_node(curr,last_action))
        return last_action
    def _run(self,shared): p=self.prep(shared); o=self._orch(shared); return self.post(shared,p,o)
    def post(self,shared,prep_res,exec_res): return exec_res

class BatchFlow(Flow):
    def _run(self,shared):
        pr=self.prep(shared) or []
        for bp in pr: self._orch(shared,{**self.params,**bp})
        return self.post(shared,pr,None)

class AsyncNode(Node):
    async def prep_async(self,shared): pass
    async def exec_async(self,prep_res): pass
    async def exec_fallback_async(self,prep_res,exc): raise exc
    async def post_async(self,shared,prep_res,exec_res): pass
    async def _exec(self,prep_res): 
        for i in range(self.max_retries):
            try: return await self.exec_async(prep_res)
            except Exception as e:
                if i==self.max_retries-1: return await self.exec_fallback_async(prep_res,e)
                if self.wait>0: await asyncio.sleep(self.wait)
    async def run_async(self,shared): 
        if self.successors: warnings.warn("Node won't run successors. Use AsyncFlow.")  
        return await self._run_async(shared)
    async def _run_async(self,shared): p=await self.prep_async(shared); e=await self._exec(p); return await self.post_async(shared,p,e)
    def _run(self,shared): raise RuntimeError("Use run_async.")

class AsyncBatchNode(AsyncNode,BatchNode):
    async def _exec(self,items): return [await super(AsyncBatchNode,self)._exec(i) for i in items]

class AsyncParallelBatchNode(AsyncNode,BatchNode):
    async def _exec(self,items): return await asyncio.gather(*(super(AsyncParallelBatchNode,self)._exec(i) for i in items))

class AsyncFlow(Flow,AsyncNode):
    async def _orch_async(self,shared,params=None):
        curr,p,last_action =copy.copy(self.start_node),(params or {**self.params}),None
        while curr: curr.set_params(p); last_action=await curr._run_async(shared) if isinstance(curr,AsyncNode) else curr._run(shared); curr=copy.copy(self.get_next_node(curr,last_action))
        return last_action
    async def _run_async(self,shared): p=await self.prep_async(shared); o=await self._orch_async(shared); return await self.post_async(shared,p,o)
    async def post_async(self,shared,prep_res,exec_res): return exec_res

class AsyncBatchFlow(AsyncFlow,BatchFlow):
    async def _run_async(self,shared):
        pr=await self.prep_async(shared) or []
        for bp in pr: await self._orch_async(shared,{**self.params,**bp})
        return await self.post_async(shared,pr,None)

class AsyncParallelBatchFlow(AsyncFlow,BatchFlow):
    async def _run_async(self,shared): 
        pr=await self.prep_async(shared) or []
        await asyncio.gather(*(self._orch_async(shared,{**self.params,**bp}) for bp in pr))
        return await self.post_async(shared,pr,None)
                 
Abstraction App Wrappers Vendor Wrappers Lines Size
LangChain Agent, Chain Many
(e.g., QA, Summarization)
Many
(e.g., OpenAI, Pinecone, etc.)
405K +166MB
CrewAI Agent, Chain Many
(e.g., FileReadTool, SerperDevTool)
Many
(e.g., OpenAI, Anthropic, Pinecone, etc.)
18K +173MB
SmolAgent Agent Some
(e.g., CodeAgent, VisitWebTool)
Some
(e.g., DuckDuckGo, Hugging Face, etc.)
8K +198MB
LangGraph Agent, Graph Some
(e.g., Semantic Search)
Some
(e.g., PostgresStore, SqliteSaver, etc.)
37K +51MB
AutoGen Agent Some
(e.g., Tool Agent, Chat Agent)
Many [Optional]
(e.g., OpenAI, Pinecone, etc.)
7K
(core-only)
+26MB
(core-only)
PocketFlow Graph None None 100 +56KB

TEAR DOWN THE BLACK BOXES

NOT ANOTHER TUTORIAL

A MASTERCLASS IN FIRST PRINCIPLES

You will learn how agents ACTUALLY work.

You will learn how RAG is REALLY built.

You will learn from THE GROUND UP.

If you know a little bit of Python.

You're ready.

THESE 100 LINES.

How does it work?

3 Core Ideas:

The Node The Shared Store The Flow

1. The Node: The Worker

3 Steps:

preprocessing execution postprocessing

Step 1: prep


class BaseNode:
    def prep(self, shared): pass
    def exec(self, prep_res): pass
    def post(self, shared, prep_res, exec_res): pass
    def _run(self,shared): 
        p=self.prep(shared); e=self._exec(p); 
        return self.post(shared,p,e)
                    

prep: Reads from shared store to get ready.

Step 2: exec


class BaseNode:
    def prep(self, shared): pass
    def exec(self, prep_res): pass
    def post(self, shared, prep_res, exec_res): pass
    def _run(self,shared): 
        p=self.prep(shared); e=self._exec(p); 
        return self.post(shared,p,e)
                    

exec: Performs the core task in isolation.

Step 3: post


class BaseNode:
    def prep(self, shared): pass
    def exec(self, prep_res): pass
    def post(self, shared, prep_res, exec_res): pass
    def _run(self,shared): 
        p=self.prep(shared); e=self._exec(p); 
        return self.post(shared,p,e)
                    

post: Writes results back and returns an 'action' string for the next Node.

Node: Built-in Fault Tolerance


class Node(BaseNode):
    def __init__(self, max_retries=1, wait=0):
        super().__init__()
        self.max_retries, self.wait = max_retries, wait
    def _exec(self, prep_res):
        for self.cur_retry in range(self.max_retries):
            try: return self.exec(prep_res)
            except Exception as e:
                if self.cur_retry == self.max_retries - 1: 
                    return self.exec_fallback(prep_res, e)
                if self.wait > 0: time.sleep(self.wait)
    def exec_fallback(self, prep_res, exc): raise exc
                    

# A node that calls an LLM to summarize a file
class SummarizeFile(Node):
    def prep(self, shared):
        return shared.get("file_content")
    
    def exec(self, file_content):
        # This would make a real API call to an LLM
        return call_llm(f"Summarize this: {file_content}")

    def exec_fallback(self, file_content, error):
        # If the LLM call fails after all retries...
        print(f"LLM call failed with error: {error}")
        return "Sorry, summarizing the file failed."

    def post(self, shared, prep_res, exec_res):
        shared["summary"] = exec_res
        return "next"

# Instantiate the node with retry logic
summarizer = SummarizeFile(
    max_retries=3,
    wait=5
)
                    

Node = A single, reliable worker.

3 Core Ideas:

The Node The Shared Store The Flow

{ }

# The Shared Store is just a dictionary.
shared = {}

# Node A writes to it.
shared["result_from_A"] = 42

# Node B reads from it.
data_for_b = shared["result_from_A"]
                    

Shared Store in Action


class LoadData(Node):
    # This node's only job is to add data to the shared store.
    def post(self, shared, prep_res, exec_res):
        shared["data"] = "Some important text content"
                    

Shared Store in Action


class Summarize(Node):
    def prep(self, shared):
        # Reads the data that the previous node wrote.
        return shared["data"]

    def exec(self, prep_res):
        # Does its work on the prepared data.
        return call_llm(f"Summarize: {prep_res}")

    def post(self, shared, prep_res, exec_res):
        # Writes its own result back to the shared store.
        shared["summary"] = exec_res
                    

3 Core Ideas:

The Node The Shared Store The Flow

Defining the Flow

%%{init: {'theme': 'dark'}}%% graph LR A[Node A] --> B[Node B]

# A simple, default transition
node_a >> node_b
                

Defining the Flow

%%{init: {'theme': 'dark'}}%% graph TD A[Review Node] -->|approved| B[Payment Node] A -->|rejected| C[Finish Node]

# Named transitions for branching logic
review_node - "approved" >> payment_node
review_node - "rejected" >> finish_node
                

How It Works: The Address Book


class BaseNode:
    def __init__(self):
        self.params, self.successors = {}, {}

    def next(self, node, action="default"):
        # Adds an entry to the node's "address book"
        self.successors[action] = node
        return node
                    

How It Works: The Orchestrator


class Flow(BaseNode):
    def _orch(self, shared, params=None):
        # Runs a node, gets the action, finds the next node, repeats
        while curr:
            last_action = curr._run(shared)
            curr = self.get_next_node(curr, last_action)
    
    def get_next_node(self, curr, action):
        # Looks up the action in the current node's successors
        return curr.successors.get(action or "default")
                    

A Flow is a Node

%%{init: {'theme': 'dark'}}%% graph LR subgraph Flow A[Node A] --> B[Node B] end Flow --> C[Node C]

This allows for powerful, nested workflows.

Where is the LLM?

No OpenAI. No Anthropic. No Google.

No Vendor Lock-in.

%%{init: {'theme': 'dark'}}%% graph LR subgraph B[LLM Framework] A[LLM API] end C[You] --> B

Maintenance Nightmare


from openai import OpenAI
import os

def call_llm(messages):
    client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages
    )

    return response.choices[0].message.content
                

Write the function yourself!

Let's Build a Chatbot.

%%{init: {'theme': 'dark'}}%% graph LR chat[ChatNode] -- "continue" --> chat

A chatbot is just a self-loop.


class ChatNode(Node):
    def prep(self, shared):
        # Initialize messages if this is the first run
        if "messages" not in shared:
            shared["messages"] = []
            print("Welcome to the chat!")
        
        # Get user input
        user_input = input("\nYou: ")
        # Add user message to history
        shared["messages"].append({"role": "user", "content": user_input})
        # Return all messages for the LLM
        return shared["messages"]

    def exec(self, messages):
        # Call LLM with the entire conversation history
        response = call_llm(messages)
        return response

    def post(self, shared, prep_res, exec_res):
        # Print the assistant's response
        print(f"\nAssistant: {exec_res}")
        # Add assistant message to history
        shared["messages"].append({"role": "assistant", "content": exec_res})
        # Loop back to continue the conversation
        return "continue"

# Create the flow with self-loop
chat_node = ChatNode()
chat_node - "continue" >> chat_node

flow = Flow(start=chat_node)

# Start the chat
if __name__ == "__main__":
    shared = {}
    flow.run(shared)
                

Conversation Walkthrough

%%{init: {'theme': 'dark'}}%% graph TD prep["prep()"] --> exec["exec()"] --> post["post()"]

# 1. Initial State
shared = {}
                        

Conversation Walkthrough

%%{init: {'theme': 'dark'}}%% graph TD style prep fill:#4CAF50,stroke:#333 prep["prep()"] --> exec["exec()"] --> post["post()"]

# 2. After prep()
shared = {
    "messages": [
        {"role": "user", "content": "Hello, who are you?"}
    ]
}
                        

Conversation Walkthrough

%%{init: {'theme': 'dark'}}%% graph TD style exec fill:#4CAF50,stroke:#333 prep["prep()"] --> exec["exec()"] --> post["post()"]

# 3. During exec()
# No change to shared store
shared = {
    "messages": [
        {"role": "user", "content": "Hello, who are you?"}
    ]
}
                        

Conversation Walkthrough

%%{init: {'theme': 'dark'}}%% graph TD style post fill:#4CAF50,stroke:#333 prep["prep()"] --> exec["exec()"] --> post["post()"]

# 4. After post()
shared = {
    "messages": [
        {"role": "user", "content": "Hello, who are you?"},
        {"role": "assistant", "content": "I am a helpful AI assistant."}
    ]
}
                        

Chatbot = 1 Node + 1 Loop + 1 Dictionary.

The Next Challenge

Structured Output

What you WANT


name: Jane Doe
email: jane@example.com
                        

What you GET


"Certainly! The name mentioned in the
document is Jane Doe, and you can find
her email at jane@example.com."
                        

Brittle

Unreliable

Frustrating

Solution: Just Ask

Pro Tip: Speak YAML, Not JSON

JSON - Fragile

 
{
  "dialogue": "Alice said: \"Hello Bob.\\nHow are you?\""
}
                        

escaping

YAML - Robust


dialogue: |
  Alice said: "Hello Bob.
  How are you?"
                        

no escaping

Building the Resume Parser


class ResumeParserNode(Node):
    def prep(self, shared):
        # The resume text is expected to be in the shared store
        return shared["resume_text"]

    def exec(self, resume_text):
        prompt = f"""Analyze the resume below. Output ONLY the requested
information in YAML format.

**Resume:**
```
{resume_text}
```

**YAML Output Requirements:**
- Extract `name` (string).
- Extract `email` (string).
- Extract `experience` (list of objects).

**Example Format:**
```yaml
name: Jane Doe
email: jane@example.com
experience:
  - title: Manager
    company: Corp A
```

Generate the YAML output now:
"""
        response = call_llm(prompt)
        # A simple way to extract the YAML block from the response
        yaml_str = response.split("```yaml")[1].split("```")[0].strip()
        structured_result = yaml.safe_load(yaml_str)

        # Assertions to enforce the output structure
        assert "name" in structured_result, "Name is required"
        assert "email" in structured_result, "Email is required"
        return structured_result

    def post(self, shared, prep_res, exec_res):
        shared["structured_data"] = exec_res
                

Before


!! JOHN SMTIH !!
contact: 123-456-7890.
email me at johnsmtih1983@gnail.com

Work Experience:
- at ABC Corportaion i was the SALES MANAGER
- XYZ Industries, position: ASST. MANAGER
                        

After


# The clean, structured data
{
  "name": "JOHN SMTIH",
  "email": "johnsmtih1983@gnail.com",
  "experience": [
    {"title": "SALES MANAGER", "company": "ABC Corportaion"},
    {"title": "ASST. MANAGER", "company": "XYZ Industries"}
  ]
}
                        

The Next Challenge: Processing in Bulk

📄

1 Resume

🗃️

1000 Resumes?

Solution: The BatchNode

Standard Node


class Node:
    # Returns one item to process
    def prep(self, shared):
        return one_item

    # Processes one item
    def exec(self, one_item):
        return one_result

    # Receives one result
    def post(self, shared, prep_res, exec_res):
        pass
                        

BatchNode


class BatchNode(Node):
    # Returns a LIST of items
    def prep(self, shared):
        return [item1, item2, item3]

    # Processes ONE item at a time
    def exec(self, one_item_from_list):
        return one_result

    # Receives a LIST of all results
    def post(self, shared, prep_res, exec_res_list):
        pass
                        

Solution: The BatchNode

Standard Node


class Node:
    # Returns one item to process
    def prep(self, shared):
        return one_item

    # Processes one item
    def exec(self, one_item):
        return one_result

    # Receives one result
    def post(self, shared, prep_res, exec_res):
        pass
                        

BatchNode


class BatchNode(Node):
    # Returns a LIST of items
    def prep(self, shared):
        return [item1, item2, item3]

    # Processes ONE item at a time
    def exec(self, one_item_from_list):
        return one_result

    # Receives a LIST of all results
    def post(self, shared, prep_res, exec_res_list):
        pass
                        

Solution: The BatchNode

Standard Node


class Node:
    # Returns one item to process
    def prep(self, shared):
        return one_item

    # Processes one item
    def exec(self, one_item):
        return one_result

    # Receives one result
    def post(self, shared, prep_res, exec_res):
        pass
                        

BatchNode


class BatchNode(Node):
    # Returns a LIST of items
    def prep(self, shared):
        return [item1, item2, item3]

    # Processes ONE item at a time
    def exec(self, one_item_from_list):
        return one_result

    # Receives a LIST of all results
    def post(self, shared, prep_res, exec_res_list):
        pass
                        

Solution: The BatchNode

Standard Node


class Node:
    # Returns one item to process
    def prep(self, shared):
        return one_item

    # Processes one item
    def exec(self, one_item):
        return one_result

    # Receives one result
    def post(self, shared, prep_res, exec_res):
        pass
                        

BatchNode


class BatchNode(Node):
    # Returns a LIST of items
    def prep(self, shared):
        return [item1, item2, item3]

    # Processes ONE item at a time
    def exec(self, one_item_from_list):
        return one_result

    # Receives a LIST of all results
    def post(self, shared, prep_res, exec_res_list):
        pass
                        

The Implementation: Dumb Simple


class BatchNode(Node):
    # This is the entire implementation.
    def _exec(self, items):
        # It's just a simple for loop.
        return [super(BatchNode, self)._exec(i) for i in (items or [])]
                

class BatchResumeParserNode(BatchNode):
    def prep(self, shared):
        # Assumes a list of resumes is already in the shared store.
        return shared.get("resumes", [])

    def exec(self, resume_text):
        prompt = f"""Analyze the resume below. Output ONLY the requested
information in YAML format.

**Resume:**
```
{resume_text}
```

**YAML Output Requirements:**
- Extract `name` (string).
- Extract `email` (string).
- Extract `experience` (list of objects).

**Example Format:**
```yaml
name: Jane Doe
email: jane@example.com
experience:
  - title: Manager
    company: Corp A
```

Generate the YAML output now:
"""
        response = call_llm(prompt)
        # A simple way to extract the YAML block from the response
        yaml_str = response.split("```yaml")[1].split("```")[0].strip()
        structured_result = yaml.safe_load(yaml_str)

        # Assertions to enforce the output structure
        assert "name" in structured_result, "Name is required"
        assert "email" in structured_result, "Email is required"
        return structured_result

    def post(self, shared, prep_res, all_results_list):
        # This runs only ONCE, after all items are processed.
        # It receives a list of all individual results.
        shared["all_parsed_resumes"] = all_results_list
                

Batch Node Walkthrough

%%{init: {'theme': 'dark'}}%% graph TD A["prep():
returns [resume1, resume2, resume3]"] --> exec_loop subgraph exec_loop ["_exec() loop"] C["exec(resume1)"] --> E["exec(resume2)"] --> G["exec(resume3)"] end exec_loop --> H["post():
receives [result1, result2, result3]"]

But... how is this any faster?

It isn't.

%%{init: {'theme': 'dark'}}%% graph TD subgraph exec_loop ["_exec() loop"] C["exec(resume1)"] --> E["exec(resume2)"] --> G["exec(resume3)"] end

Total Time = Sum of All Tasks

Unleashing Parallel Speed with Async

The True Villain: WAITING

%%{init: {'theme': 'dark'}}%% sequenceDiagram participant You participant LLM You->>LLM: Process Resume 1? activate LLM Note over You, LLM: 10+ seconds of waiting... LLM-->>You: Result 1 deactivate LLM You->>LLM: Process Resume 2? activate LLM Note over You, LLM: 10+ seconds of waiting... LLM-->>You: Result 2 deactivate LLM

5-20 seconds of pure, wasted time... per resume.

The Smart Chef Analogy

The Slow Chef

%%{init: {'theme': 'dark'}}%% graph TD A[Start Eggs] --> B[Wait for Eggs] B --> C[Serve Eggs] C --> D[Start Toast] D --> E[Wait for Toast] E --> F[Serve Toast]

The Smart Chef

%%{init: {'theme': 'dark'}}%% graph TD subgraph " " A[Start Eggs] --> B[Wait for Eggs] C[Start Toast] --> D[Wait for Toast] end B --> E[Serve Eggs] D --> F[Serve Toast]

2 Magic Words in Python

async

A label for functions that might wait

await

Pause one task and let others run


import asyncio
import time

async def make_coffee():  # Takes 3s
    print("Start coffee...")
    await asyncio.sleep(3)
    print("Coffee ready!")

async def make_toast():   # Takes 2s
    print("Start toast...")
    await asyncio.sleep(2)
    print("Toast ready!")

async def main():
    start_time = time.time()
    # Run both tasks concurrently
    await asyncio.gather(
        make_coffee(),
        make_toast()
    )
    print(f"Total time: {time.time() - start_time:.2f}s")

asyncio.run(main())
                

The Output

Start coffee...
Start toast...
Toast ready!
Coffee ready!

Total time: 3.01s

Total Time = Time of Longest Task

Upgrading to an Async LLM Call

Before - Synchronous


from openai import OpenAI
import os

def call_llm(messages):
    client = OpenAI(api_key=...)

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages
    )

    return response.choices[0].message.content
                        

After - Asynchronous


from openai import AsyncOpenAI
import os

async def call_llm_async(messages):
    client = AsyncOpenAI(api_key=...)

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=messages
    )

    return response.choices[0].message.content
                        

Upgrading the Batch Node

Before - BatchNode


# Before
class BatchResumeParserNode(BatchNode):
    def prep(self, shared):
        return shared.get("resumes", [])

    def exec(self, one_resume_text):
        ...
        response = call_llm(prompt)
        ...
    
    def post(self, shared, prep_res, exec_res_list):
        shared["all_parsed_resumes"] = exec_res_list
                        

After - AsyncParallelBatchNode


# After
class BatchResumeParserNode(AsyncParallelBatchNode):
    async def prep_async(self, shared):
        return shared.get("resumes", [])

    async def exec_async(self, one_resume_text):
        ...
        response = await call_llm_async(prompt)
        ...

    async def post_async(self, shared, prep_res, 
                                       exec_res_list):
        shared["all_parsed_resumes"] = exec_res_list
                        

The "Unbelievably Simple" Implementation

BatchNode - Sequential


class BatchNode(Node):
    def _exec(self, items):
        # A simple, sequential for loop
        return [super(BatchNode, self)._exec(i) for i in (items or [])]
                

AsyncParallelBatchNode - Parallel


class AsyncParallelBatchNode(AsyncNode, BatchNode):
    async def _exec(self, items):
        # Creates tasks and runs them all concurrently
        return await asyncio.gather(*(super(AsyncParallelBatchNode, self)._exec(i) for i in items))
                

A loop becomes a gather. That's it.

The Performance Payoff

Sequential Time

10 resumes × 10 seconds

100s

Parallel Time

Time of Longest Task

10s

⚠️ Heads Up!

1. Watch for API Rate Limits

2. Ensure Tasks are Independent

Multi-Step Workflows

"Write a comprehensive blog post about AI safety..."

C+

Generic rambling output

Task Decomposition

Break big tasks into small tasks

%%{init: {'theme': 'dark'}}%% graph LR A[Outliner] --> B[BatchDrafter] --> C[Combiner]

Workflow + Batch = Power

List → Parallel Processing → Combine


class GenerateOutline(Node):
    def prep(self, shared):
        return shared["topic"]

    def exec(self, topic):
        prompt = f"List section titles for {topic}"
        return call_llm(prompt)

    def post(self, shared, prep_res, exec_res):
        shared["section_titles"] = exec_res.strip().split('\n')
                    

class BatchDraftSections(BatchNode):
    def prep(self, shared):
        return shared["section_titles"]

    def exec(self, section_title):
        prompt = f"Write content for: {section_title}"
        return call_llm(prompt)

    def post(self, shared, prep_res, exec_res):
        shared["section_drafts"] = exec_res
                    

class CombineAndRefine(Node):
    def prep(self, shared):
        return shared["section_drafts"]

    def exec(self, section_drafts):
        combined = "\n\n".join(section_drafts)
        prompt = f"Combine and polish: {combined}"
        return call_llm(prompt)

    def post(self, shared, prep_res, exec_res):
        shared["final_article"] = exec_res
                    

# Connect the chain
outline_node >> batch_draft_node >> combine_node

# Create and run
flow = Flow(start=outline_node)
shared = {"topic": "AI Safety"}
flow.run(shared)
                    

Workflow Walkthrough

%%{init: {'theme': 'dark'}}%% graph LR A[Outliner] --> B[BatchDrafter] --> C[Combiner]

# Initial State
shared = {"topic": "AI Safety"}
                        
%%{init: {'theme': 'dark'}}%% graph LR A["[Outliner]"] --> B[BatchDrafter] --> C[Combiner] style A fill:#4CAF50,stroke:#333

# After Outliner - now we have a LIST!
shared = {
    "topic": "AI Safety",
    "section_titles": ["Introduction", "Key Risks", "Solutions"]
}
                        
%%{init: {'theme': 'dark'}}%% graph LR A[Outliner] --> B["[BatchDrafter]"] --> C[Combiner] style B fill:#4CAF50,stroke:#333

# After BatchDrafter - LIST of drafted sections!
shared = {
    "topic": "AI Safety",
    "section_titles": ["Introduction", "Key Risks", "Solutions"],
    "section_drafts": [
        "AI safety introduction content...",
        "Key risks in AI development...", 
        "Proposed solutions and approaches..."
    ]
}
                        
%%{init: {'theme': 'dark'}}%% graph LR A[Outliner] --> B[BatchDrafter] --> C["[Combiner]"] style C fill:#4CAF50,stroke:#333

# Final result - combined and polished
shared = {
    "topic": "AI Safety",
    "section_titles": ["Introduction", "Key Risks", "Solutions"],
    "section_drafts": [...],
    "final_article": "AI Safety: A Comprehensive Guide..."
}
                        

By breaking one complex request into 3 simple requests...

Much higher-quality output!

%%{init: {'theme': 'dark'}}%% graph LR A[Outline] --> B[Draft] --> C[Combine]

The path is fixed. It can't adapt.

What if it could REWRITE its plan?

What if it could ADAPT on the fly?

What if it could actually THINK?

The AGENT

Magic? Consciousness? A Black Box?

An Agent = A Flow with a Loop & a Branch

%%{init: {'theme': 'dark'}}%% graph TD A[Decide Node] -->|action: 'search'| B[Search Node] A -->|action: 'answer'| C[Answer Node] B --> A

Building a Web Search Agent

The Brain: DecideAction Node


class DecideAction(Node):
    def prep(self, shared):
        context = shared.get("context", "No previous search")
        question = shared["question"]
        return question, context

    def exec(self, inputs):
        question, context = inputs
        prompt = f"""Given the context, should you 'search' for more
info or 'answer' now?

Context: {context}
Query: {question}

Actions:
- search
- answer

Respond in YAML:
```yaml
action: <your_choice>
search_query: <if searching>
```"""
        response = call_llm(prompt)
        yaml_str = response.split("```yaml")[1].split("```")[0].strip()
        decision = yaml.safe_load(yaml_str)
        return decision

    def post(self, shared, prep_res, exec_res):
        if exec_res["action"] == "search":
            shared["search_query"] = exec_res["search_query"]
        return exec_res["action"]
                

The Tool: SearchWeb Node


class SearchWeb(Node):
    def prep(self, shared):
        return shared["search_query"]

    def exec(self, search_query):
        # Search the web for information
        search_client = GoogleSearchAPI(api_key="YOUR_API_KEY")
        results = search_client.search({
            "query": search_query,
            "num_results": 3,
            "language": "en"
        })
        
        formatted_results = f"Results for: {search_query}\n"
        for result in results:
            formatted_results += f"- {result.title}: {result.snippet}\n"
        return formatted_results

    def post(self, shared, prep_res, exec_res):
        previous = shared.get("context", "")
        shared["context"] = previous + "\n\nSEARCH: " + \
                            prep_res + "\nRESULTS: " + exec_res
        return "decide"  # Loop back to the brain
                

The Exit: DirectAnswer Node


class DirectAnswer(Node):
    def prep(self, shared):
        question = shared["question"]
        context = shared.get("context", "")
        return question, context

    def exec(self, inputs):
        question, context = inputs
        prompt = f"""Based on the following information, answer the question.

Question: {question}
Research: {context}

Provide a comprehensive answer using the research results."""
        return call_llm(prompt)

    def post(self, shared, prep_res, exec_res):
        shared["answer"] = exec_res
        return "done"  # End the flow
                

Wiring the Graph


# Connect the nodes together
decide_node - "search" >> search_node
decide_node - "answer" >> answer_node
search_node - "decide" >> decide_node  # The loop

flow = Flow(start=decide_node)
                

Tracing a Live Run

Initialization

%%{init: {'theme': 'dark'}}%% graph TD A[DecideAction] -->|search| B[SearchWeb] A -->|answer| C[Answer] B --> A

# Initial State
shared = {
    "question": "Who won the 2024 Physics Nobel Prize?"
}

# Start the flow
flow.run(shared)
                        

Round 1: Decide

%%{init: {'theme': 'dark'}}%% graph TD A["[DecideAction]"] -->|search| B[SearchWeb] A -->|answer| C[Answer] B --> A style A fill:#4CAF50,stroke:#333 linkStyle 0 stroke:#FFD700,stroke-width:4px

# After Round 1: Decide
shared = {
    "question": "Who won the 2024 Physics Nobel Prize?",
    "search_query": "2024 Physics Nobel Prize winner"
}
# LLM Output: 'search'
                        

Round 2: Search

%%{init: {'theme': 'dark'}}%% graph TD A[DecideAction] -->|search| B["[SearchWeb]"] A -->|answer| C[Answer] B --> A style B fill:#4CAF50,stroke:#333

# After Round 2: Search
shared = {
    "question": "Who won the 2024 Physics Nobel Prize?",
    "search_query": "2024 Physics Nobel Prize winner",
    "context": "SEARCH: 2024 Physics Nobel Prize winner\nRESULTS: The 2024 Nobel Prize in Physics was awarded to John Hopfield and Geoffrey Hinton for foundational discoveries and inventions in machine learning..."
}
# Node returns: 'decide'
                        

Round 3: Decide (with context)

%%{init: {'theme': 'dark'}}%% graph TD A["[DecideAction]"] -->|search| B[SearchWeb] A -->|answer| C[Answer] B --> A style A fill:#4CAF50,stroke:#333 linkStyle 1 stroke:#FFD700,stroke-width:4px

# After Round 3: Decide (with context)
shared = {
    "question": "Who won the 2024 Physics Nobel Prize?",
    "search_query": "2024 Physics Nobel Prize winner", 
    "context": "SEARCH: 2024 Physics Nobel Prize winner\nRESULTS: The 2024 Nobel Prize in Physics was awarded to John Hopfield and Geoffrey Hinton for foundational discoveries and inventions in machine learning..."
}
# LLM Output: 'answer' (no new data added)
                        

Final Step: Answer

%%{init: {'theme': 'dark'}}%% graph TD A[DecideAction] -->|search| B[SearchWeb] A -->|answer| C["[Answer]"] B --> A style C fill:#4CAF50,stroke:#333

# After Final Step: Answer
shared = {
    "question": "Who won the 2024 Physics Nobel Prize?",
    "search_query": "2024 Physics Nobel Prize winner",
    "context": "SEARCH: 2024 Physics Nobel Prize winner\nRESULTS: The 2024 Nobel Prize in Physics was awarded to John Hopfield and Geoffrey Hinton...",
    "answer": "The 2024 Nobel Prize in Physics was awarded to John Hopfield and Geoffrey Hinton for their foundational discoveries and inventions that enable machine learning with artificial neural networks."
}
# Flow ends.
                        

An Agent is Just a Graph.

What We Just Built:

✓ Chatbots

✓ Batch Processing

✓ Parallel Workflows

✓ Agents

...all from 100 lines.

No Black Boxes

3 Core Ideas:

The Node The Shared Store The Flow

Want to try it yourself?

Of course you do.

https://github.com/The-Pocket/PocketFlow

All built on the same 100-line foundation.

If you think these are just toys...

Let me show you something.

Just a more complex workflow. A bigger graph.

https://github.com/The-Pocket/PocketFlow-Tutorial-Codebase-Knowledge

%%{init: {'theme': 'dark'}}%% flowchart TD A[FetchRepo] --> B[IdentifyAbstractions] B --> C[AnalyzeRelationships] C --> D[OrderChapters] D --> E[Batch WriteChapters] E --> F[CombineTutorial]

More popular than the framework itself.

There's one more thing....

This video was generated by ...

a PocketFlow AGENT.

To be continued...