https://github.com/The-Pocket/PocketFlow
import asyncio, warnings, copy, time
class BaseNode:
def __init__(self): self.params,self.successors={},{}
def set_params(self,params): self.params=params
def next(self,node,action="default"):
if action in self.successors: warnings.warn(f"Overwriting successor for action '{action}'")
self.successors[action]=node; return node
def prep(self,shared): pass
def exec(self,prep_res): pass
def post(self,shared,prep_res,exec_res): pass
def _exec(self,prep_res): return self.exec(prep_res)
def _run(self,shared): p=self.prep(shared); e=self._exec(p); return self.post(shared,p,e)
def run(self,shared):
if self.successors: warnings.warn("Node won't run successors. Use Flow.")
return self._run(shared)
def __rshift__(self,other): return self.next(other)
def __sub__(self,action):
if isinstance(action,str): return _ConditionalTransition(self,action)
raise TypeError("Action must be a string")
class _ConditionalTransition:
def __init__(self,src,action): self.src,self.action=src,action
def __rshift__(self,tgt): return self.src.next(tgt,self.action)
class Node(BaseNode):
def __init__(self,max_retries=1,wait=0): super().__init__(); self.max_retries,self.wait=max_retries,wait
def exec_fallback(self,prep_res,exc): raise exc
def _exec(self,prep_res):
for self.cur_retry in range(self.max_retries):
try: return self.exec(prep_res)
except Exception as e:
if self.cur_retry==self.max_retries-1: return self.exec_fallback(prep_res,e)
if self.wait>0: time.sleep(self.wait)
class BatchNode(Node):
def _exec(self,items): return [super(BatchNode,self)._exec(i) for i in (items or [])]
class Flow(BaseNode):
def __init__(self,start=None): super().__init__(); self.start_node=start
def start(self,start): self.start_node=start; return start
def get_next_node(self,curr,action):
nxt=curr.successors.get(action or "default")
if not nxt and curr.successors: warnings.warn(f"Flow ends: '{action}' not found in {list(curr.successors)}")
return nxt
def _orch(self,shared,params=None):
curr,p,last_action =copy.copy(self.start_node),(params or {**self.params}),None
while curr: curr.set_params(p); last_action=curr._run(shared); curr=copy.copy(self.get_next_node(curr,last_action))
return last_action
def _run(self,shared): p=self.prep(shared); o=self._orch(shared); return self.post(shared,p,o)
def post(self,shared,prep_res,exec_res): return exec_res
class BatchFlow(Flow):
def _run(self,shared):
pr=self.prep(shared) or []
for bp in pr: self._orch(shared,{**self.params,**bp})
return self.post(shared,pr,None)
class AsyncNode(Node):
async def prep_async(self,shared): pass
async def exec_async(self,prep_res): pass
async def exec_fallback_async(self,prep_res,exc): raise exc
async def post_async(self,shared,prep_res,exec_res): pass
async def _exec(self,prep_res):
for i in range(self.max_retries):
try: return await self.exec_async(prep_res)
except Exception as e:
if i==self.max_retries-1: return await self.exec_fallback_async(prep_res,e)
if self.wait>0: await asyncio.sleep(self.wait)
async def run_async(self,shared):
if self.successors: warnings.warn("Node won't run successors. Use AsyncFlow.")
return await self._run_async(shared)
async def _run_async(self,shared): p=await self.prep_async(shared); e=await self._exec(p); return await self.post_async(shared,p,e)
def _run(self,shared): raise RuntimeError("Use run_async.")
class AsyncBatchNode(AsyncNode,BatchNode):
async def _exec(self,items): return [await super(AsyncBatchNode,self)._exec(i) for i in items]
class AsyncParallelBatchNode(AsyncNode,BatchNode):
async def _exec(self,items): return await asyncio.gather(*(super(AsyncParallelBatchNode,self)._exec(i) for i in items))
class AsyncFlow(Flow,AsyncNode):
async def _orch_async(self,shared,params=None):
curr,p,last_action =copy.copy(self.start_node),(params or {**self.params}),None
while curr: curr.set_params(p); last_action=await curr._run_async(shared) if isinstance(curr,AsyncNode) else curr._run(shared); curr=copy.copy(self.get_next_node(curr,last_action))
return last_action
async def _run_async(self,shared): p=await self.prep_async(shared); o=await self._orch_async(shared); return await self.post_async(shared,p,o)
async def post_async(self,shared,prep_res,exec_res): return exec_res
class AsyncBatchFlow(AsyncFlow,BatchFlow):
async def _run_async(self,shared):
pr=await self.prep_async(shared) or []
for bp in pr: await self._orch_async(shared,{**self.params,**bp})
return await self.post_async(shared,pr,None)
class AsyncParallelBatchFlow(AsyncFlow,BatchFlow):
async def _run_async(self,shared):
pr=await self.prep_async(shared) or []
await asyncio.gather(*(self._orch_async(shared,{**self.params,**bp}) for bp in pr))
return await self.post_async(shared,pr,None)
Abstraction | App Wrappers | Vendor Wrappers | Lines | Size | |
---|---|---|---|---|---|
LangChain | Agent, Chain | Many (e.g., QA, Summarization) |
Many (e.g., OpenAI, Pinecone, etc.) |
405K | +166MB |
CrewAI | Agent, Chain | Many (e.g., FileReadTool, SerperDevTool) |
Many (e.g., OpenAI, Anthropic, Pinecone, etc.) |
18K | +173MB |
SmolAgent | Agent | Some (e.g., CodeAgent, VisitWebTool) |
Some (e.g., DuckDuckGo, Hugging Face, etc.) |
8K | +198MB |
LangGraph | Agent, Graph | Some (e.g., Semantic Search) |
Some (e.g., PostgresStore, SqliteSaver, etc.) |
37K | +51MB |
AutoGen | Agent | Some (e.g., Tool Agent, Chat Agent) |
Many [Optional] (e.g., OpenAI, Pinecone, etc.) |
7K (core-only) |
+26MB (core-only) |
PocketFlow | Graph | None | None | 100 | +56KB |
You will learn how agents ACTUALLY work.
You will learn how RAG is REALLY built.
You will learn from THE GROUND UP.
If you know a little bit of Python.
preprocessing
• execution
• postprocessing
class BaseNode:
def prep(self, shared): pass
def exec(self, prep_res): pass
def post(self, shared, prep_res, exec_res): pass
def _run(self,shared):
p=self.prep(shared); e=self._exec(p);
return self.post(shared,p,e)
prep
: Reads from shared store to get ready.
class BaseNode:
def prep(self, shared): pass
def exec(self, prep_res): pass
def post(self, shared, prep_res, exec_res): pass
def _run(self,shared):
p=self.prep(shared); e=self._exec(p);
return self.post(shared,p,e)
exec
: Performs the core task in isolation.
class BaseNode:
def prep(self, shared): pass
def exec(self, prep_res): pass
def post(self, shared, prep_res, exec_res): pass
def _run(self,shared):
p=self.prep(shared); e=self._exec(p);
return self.post(shared,p,e)
post
: Writes results back and returns an 'action' string for the next Node.
class Node(BaseNode):
def __init__(self, max_retries=1, wait=0):
super().__init__()
self.max_retries, self.wait = max_retries, wait
def _exec(self, prep_res):
for self.cur_retry in range(self.max_retries):
try: return self.exec(prep_res)
except Exception as e:
if self.cur_retry == self.max_retries - 1:
return self.exec_fallback(prep_res, e)
if self.wait > 0: time.sleep(self.wait)
def exec_fallback(self, prep_res, exc): raise exc
# A node that calls an LLM to summarize a file
class SummarizeFile(Node):
def prep(self, shared):
return shared.get("file_content")
def exec(self, file_content):
# This would make a real API call to an LLM
return call_llm(f"Summarize this: {file_content}")
def exec_fallback(self, file_content, error):
# If the LLM call fails after all retries...
print(f"LLM call failed with error: {error}")
return "Sorry, summarizing the file failed."
def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
return "next"
# Instantiate the node with retry logic
summarizer = SummarizeFile(
max_retries=3,
wait=5
)
# The Shared Store is just a dictionary.
shared = {}
# Node A writes to it.
shared["result_from_A"] = 42
# Node B reads from it.
data_for_b = shared["result_from_A"]
class LoadData(Node):
# This node's only job is to add data to the shared store.
def post(self, shared, prep_res, exec_res):
shared["data"] = "Some important text content"
class Summarize(Node):
def prep(self, shared):
# Reads the data that the previous node wrote.
return shared["data"]
def exec(self, prep_res):
# Does its work on the prepared data.
return call_llm(f"Summarize: {prep_res}")
def post(self, shared, prep_res, exec_res):
# Writes its own result back to the shared store.
shared["summary"] = exec_res
# A simple, default transition
node_a >> node_b
# Named transitions for branching logic
review_node - "approved" >> payment_node
review_node - "rejected" >> finish_node
class BaseNode:
def __init__(self):
self.params, self.successors = {}, {}
def next(self, node, action="default"):
# Adds an entry to the node's "address book"
self.successors[action] = node
return node
class Flow(BaseNode):
def _orch(self, shared, params=None):
# Runs a node, gets the action, finds the next node, repeats
while curr:
last_action = curr._run(shared)
curr = self.get_next_node(curr, last_action)
def get_next_node(self, curr, action):
# Looks up the action in the current node's successors
return curr.successors.get(action or "default")
from openai import OpenAI
import os
def call_llm(messages):
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
response = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return response.choices[0].message.content
class ChatNode(Node):
def prep(self, shared):
# Initialize messages if this is the first run
if "messages" not in shared:
shared["messages"] = []
print("Welcome to the chat!")
# Get user input
user_input = input("\nYou: ")
# Add user message to history
shared["messages"].append({"role": "user", "content": user_input})
# Return all messages for the LLM
return shared["messages"]
def exec(self, messages):
# Call LLM with the entire conversation history
response = call_llm(messages)
return response
def post(self, shared, prep_res, exec_res):
# Print the assistant's response
print(f"\nAssistant: {exec_res}")
# Add assistant message to history
shared["messages"].append({"role": "assistant", "content": exec_res})
# Loop back to continue the conversation
return "continue"
# Create the flow with self-loop
chat_node = ChatNode()
chat_node - "continue" >> chat_node
flow = Flow(start=chat_node)
# Start the chat
if __name__ == "__main__":
shared = {}
flow.run(shared)
# 1. Initial State
shared = {}
# 2. After prep()
shared = {
"messages": [
{"role": "user", "content": "Hello, who are you?"}
]
}
# 3. During exec()
# No change to shared store
shared = {
"messages": [
{"role": "user", "content": "Hello, who are you?"}
]
}
# 4. After post()
shared = {
"messages": [
{"role": "user", "content": "Hello, who are you?"},
{"role": "assistant", "content": "I am a helpful AI assistant."}
]
}
name: Jane Doe
email: jane@example.com
"Certainly! The name mentioned in the
document is Jane Doe, and you can find
her email at jane@example.com."
{
"dialogue": "Alice said: \"Hello Bob.\\nHow are you?\""
}
escaping
dialogue: |
Alice said: "Hello Bob.
How are you?"
no escaping
class ResumeParserNode(Node):
def prep(self, shared):
# The resume text is expected to be in the shared store
return shared["resume_text"]
def exec(self, resume_text):
prompt = f"""Analyze the resume below. Output ONLY the requested
information in YAML format.
**Resume:**
```
{resume_text}
```
**YAML Output Requirements:**
- Extract `name` (string).
- Extract `email` (string).
- Extract `experience` (list of objects).
**Example Format:**
```yaml
name: Jane Doe
email: jane@example.com
experience:
- title: Manager
company: Corp A
```
Generate the YAML output now:
"""
response = call_llm(prompt)
# A simple way to extract the YAML block from the response
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
structured_result = yaml.safe_load(yaml_str)
# Assertions to enforce the output structure
assert "name" in structured_result, "Name is required"
assert "email" in structured_result, "Email is required"
return structured_result
def post(self, shared, prep_res, exec_res):
shared["structured_data"] = exec_res
!! JOHN SMTIH !!
contact: 123-456-7890.
email me at johnsmtih1983@gnail.com
Work Experience:
- at ABC Corportaion i was the SALES MANAGER
- XYZ Industries, position: ASST. MANAGER
# The clean, structured data
{
"name": "JOHN SMTIH",
"email": "johnsmtih1983@gnail.com",
"experience": [
{"title": "SALES MANAGER", "company": "ABC Corportaion"},
{"title": "ASST. MANAGER", "company": "XYZ Industries"}
]
}
📄
1 Resume
🗃️
1000 Resumes?
BatchNode
class Node:
# Returns one item to process
def prep(self, shared):
return one_item
# Processes one item
def exec(self, one_item):
return one_result
# Receives one result
def post(self, shared, prep_res, exec_res):
pass
class BatchNode(Node):
# Returns a LIST of items
def prep(self, shared):
return [item1, item2, item3]
# Processes ONE item at a time
def exec(self, one_item_from_list):
return one_result
# Receives a LIST of all results
def post(self, shared, prep_res, exec_res_list):
pass
BatchNode
class Node:
# Returns one item to process
def prep(self, shared):
return one_item
# Processes one item
def exec(self, one_item):
return one_result
# Receives one result
def post(self, shared, prep_res, exec_res):
pass
class BatchNode(Node):
# Returns a LIST of items
def prep(self, shared):
return [item1, item2, item3]
# Processes ONE item at a time
def exec(self, one_item_from_list):
return one_result
# Receives a LIST of all results
def post(self, shared, prep_res, exec_res_list):
pass
BatchNode
class Node:
# Returns one item to process
def prep(self, shared):
return one_item
# Processes one item
def exec(self, one_item):
return one_result
# Receives one result
def post(self, shared, prep_res, exec_res):
pass
class BatchNode(Node):
# Returns a LIST of items
def prep(self, shared):
return [item1, item2, item3]
# Processes ONE item at a time
def exec(self, one_item_from_list):
return one_result
# Receives a LIST of all results
def post(self, shared, prep_res, exec_res_list):
pass
BatchNode
class Node:
# Returns one item to process
def prep(self, shared):
return one_item
# Processes one item
def exec(self, one_item):
return one_result
# Receives one result
def post(self, shared, prep_res, exec_res):
pass
class BatchNode(Node):
# Returns a LIST of items
def prep(self, shared):
return [item1, item2, item3]
# Processes ONE item at a time
def exec(self, one_item_from_list):
return one_result
# Receives a LIST of all results
def post(self, shared, prep_res, exec_res_list):
pass
class BatchNode(Node):
# This is the entire implementation.
def _exec(self, items):
# It's just a simple for loop.
return [super(BatchNode, self)._exec(i) for i in (items or [])]
class BatchResumeParserNode(BatchNode):
def prep(self, shared):
# Assumes a list of resumes is already in the shared store.
return shared.get("resumes", [])
def exec(self, resume_text):
prompt = f"""Analyze the resume below. Output ONLY the requested
information in YAML format.
**Resume:**
```
{resume_text}
```
**YAML Output Requirements:**
- Extract `name` (string).
- Extract `email` (string).
- Extract `experience` (list of objects).
**Example Format:**
```yaml
name: Jane Doe
email: jane@example.com
experience:
- title: Manager
company: Corp A
```
Generate the YAML output now:
"""
response = call_llm(prompt)
# A simple way to extract the YAML block from the response
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
structured_result = yaml.safe_load(yaml_str)
# Assertions to enforce the output structure
assert "name" in structured_result, "Name is required"
assert "email" in structured_result, "Email is required"
return structured_result
def post(self, shared, prep_res, all_results_list):
# This runs only ONCE, after all items are processed.
# It receives a list of all individual results.
shared["all_parsed_resumes"] = all_results_list
Total Time = Sum of All Tasks
async
A label for functions that might wait
await
Pause one task and let others run
import asyncio
import time
async def make_coffee(): # Takes 3s
print("Start coffee...")
await asyncio.sleep(3)
print("Coffee ready!")
async def make_toast(): # Takes 2s
print("Start toast...")
await asyncio.sleep(2)
print("Toast ready!")
async def main():
start_time = time.time()
# Run both tasks concurrently
await asyncio.gather(
make_coffee(),
make_toast()
)
print(f"Total time: {time.time() - start_time:.2f}s")
asyncio.run(main())
Start coffee...
Start toast...
Toast ready!
Coffee ready!
Total time: 3.01s
from openai import OpenAI
import os
def call_llm(messages):
client = OpenAI(api_key=...)
response = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return response.choices[0].message.content
from openai import AsyncOpenAI
import os
async def call_llm_async(messages):
client = AsyncOpenAI(api_key=...)
response = await client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return response.choices[0].message.content
# Before
class BatchResumeParserNode(BatchNode):
def prep(self, shared):
return shared.get("resumes", [])
def exec(self, one_resume_text):
...
response = call_llm(prompt)
...
def post(self, shared, prep_res, exec_res_list):
shared["all_parsed_resumes"] = exec_res_list
# After
class BatchResumeParserNode(AsyncParallelBatchNode):
async def prep_async(self, shared):
return shared.get("resumes", [])
async def exec_async(self, one_resume_text):
...
response = await call_llm_async(prompt)
...
async def post_async(self, shared, prep_res,
exec_res_list):
shared["all_parsed_resumes"] = exec_res_list
class BatchNode(Node):
def _exec(self, items):
# A simple, sequential for loop
return [super(BatchNode, self)._exec(i) for i in (items or [])]
class AsyncParallelBatchNode(AsyncNode, BatchNode):
async def _exec(self, items):
# Creates tasks and runs them all concurrently
return await asyncio.gather(*(super(AsyncParallelBatchNode, self)._exec(i) for i in items))
10 resumes × 10 seconds
Time of Longest Task
class GenerateOutline(Node):
def prep(self, shared):
return shared["topic"]
def exec(self, topic):
prompt = f"List section titles for {topic}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
shared["section_titles"] = exec_res.strip().split('\n')
class BatchDraftSections(BatchNode):
def prep(self, shared):
return shared["section_titles"]
def exec(self, section_title):
prompt = f"Write content for: {section_title}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
shared["section_drafts"] = exec_res
class CombineAndRefine(Node):
def prep(self, shared):
return shared["section_drafts"]
def exec(self, section_drafts):
combined = "\n\n".join(section_drafts)
prompt = f"Combine and polish: {combined}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
shared["final_article"] = exec_res
# Connect the chain
outline_node >> batch_draft_node >> combine_node
# Create and run
flow = Flow(start=outline_node)
shared = {"topic": "AI Safety"}
flow.run(shared)
# Initial State
shared = {"topic": "AI Safety"}
# After Outliner - now we have a LIST!
shared = {
"topic": "AI Safety",
"section_titles": ["Introduction", "Key Risks", "Solutions"]
}
# After BatchDrafter - LIST of drafted sections!
shared = {
"topic": "AI Safety",
"section_titles": ["Introduction", "Key Risks", "Solutions"],
"section_drafts": [
"AI safety introduction content...",
"Key risks in AI development...",
"Proposed solutions and approaches..."
]
}
# Final result - combined and polished
shared = {
"topic": "AI Safety",
"section_titles": ["Introduction", "Key Risks", "Solutions"],
"section_drafts": [...],
"final_article": "AI Safety: A Comprehensive Guide..."
}
class DecideAction(Node):
def prep(self, shared):
context = shared.get("context", "No previous search")
question = shared["question"]
return question, context
def exec(self, inputs):
question, context = inputs
prompt = f"""Given the context, should you 'search' for more
info or 'answer' now?
Context: {context}
Query: {question}
Actions:
- search
- answer
Respond in YAML:
```yaml
action: <your_choice>
search_query: <if searching>
```"""
response = call_llm(prompt)
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
decision = yaml.safe_load(yaml_str)
return decision
def post(self, shared, prep_res, exec_res):
if exec_res["action"] == "search":
shared["search_query"] = exec_res["search_query"]
return exec_res["action"]
class SearchWeb(Node):
def prep(self, shared):
return shared["search_query"]
def exec(self, search_query):
# Search the web for information
search_client = GoogleSearchAPI(api_key="YOUR_API_KEY")
results = search_client.search({
"query": search_query,
"num_results": 3,
"language": "en"
})
formatted_results = f"Results for: {search_query}\n"
for result in results:
formatted_results += f"- {result.title}: {result.snippet}\n"
return formatted_results
def post(self, shared, prep_res, exec_res):
previous = shared.get("context", "")
shared["context"] = previous + "\n\nSEARCH: " + \
prep_res + "\nRESULTS: " + exec_res
return "decide" # Loop back to the brain
class DirectAnswer(Node):
def prep(self, shared):
question = shared["question"]
context = shared.get("context", "")
return question, context
def exec(self, inputs):
question, context = inputs
prompt = f"""Based on the following information, answer the question.
Question: {question}
Research: {context}
Provide a comprehensive answer using the research results."""
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
shared["answer"] = exec_res
return "done" # End the flow
# Connect the nodes together
decide_node - "search" >> search_node
decide_node - "answer" >> answer_node
search_node - "decide" >> decide_node # The loop
flow = Flow(start=decide_node)
# Initial State
shared = {
"question": "Who won the 2024 Physics Nobel Prize?"
}
# Start the flow
flow.run(shared)
# After Round 1: Decide
shared = {
"question": "Who won the 2024 Physics Nobel Prize?",
"search_query": "2024 Physics Nobel Prize winner"
}
# LLM Output: 'search'
# After Round 2: Search
shared = {
"question": "Who won the 2024 Physics Nobel Prize?",
"search_query": "2024 Physics Nobel Prize winner",
"context": "SEARCH: 2024 Physics Nobel Prize winner\nRESULTS: The 2024 Nobel Prize in Physics was awarded to John Hopfield and Geoffrey Hinton for foundational discoveries and inventions in machine learning..."
}
# Node returns: 'decide'
# After Round 3: Decide (with context)
shared = {
"question": "Who won the 2024 Physics Nobel Prize?",
"search_query": "2024 Physics Nobel Prize winner",
"context": "SEARCH: 2024 Physics Nobel Prize winner\nRESULTS: The 2024 Nobel Prize in Physics was awarded to John Hopfield and Geoffrey Hinton for foundational discoveries and inventions in machine learning..."
}
# LLM Output: 'answer' (no new data added)
# After Final Step: Answer
shared = {
"question": "Who won the 2024 Physics Nobel Prize?",
"search_query": "2024 Physics Nobel Prize winner",
"context": "SEARCH: 2024 Physics Nobel Prize winner\nRESULTS: The 2024 Nobel Prize in Physics was awarded to John Hopfield and Geoffrey Hinton...",
"answer": "The 2024 Nobel Prize in Physics was awarded to John Hopfield and Geoffrey Hinton for their foundational discoveries and inventions that enable machine learning with artificial neural networks."
}
# Flow ends.
✓ Chatbots
✓ Batch Processing
✓ Parallel Workflows
✓ Agents
...all from 100 lines.
https://github.com/The-Pocket/PocketFlow
https://github.com/The-Pocket/PocketFlow-Tutorial-Codebase-Knowledge