Chapter 6: Checkpointer (BaseCheckpointSaver
) - Saving Your Progress
In Chapter 5: Pregel Execution Engine, we saw how the engine runs our graph step-by-step. But what happens if a graph takes hours to run, or if it needs to pause and wait for a human? If the program crashes or we need to stop it, do we lose all the progress?
That’s where Checkpointers come to the rescue!
What Problem Do Checkpointers Solve?
Imagine you’re playing a long video game. You wouldn’t want to start from the very beginning every time you stop playing, right? Games have save points or checkpoints that record your progress.
LangGraph’s Checkpointer does the same thing for your graph execution. It automatically saves the graph’s state at certain points, usually after each step completed by the Pregel Execution Engine.
This is incredibly useful for:
- Long-Running Processes: If your graph involves many steps or calls to slow tools/LLMs, you can stop it and resume later without losing work.
- Resilience: If your program crashes unexpectedly, you can restart it from the last saved checkpoint.
- Human-in-the-Loop (HITL): As we saw with
Interrupt
in Chapter 4: Control Flow Primitives, pausing the graph requires saving its state so it can be perfectly restored when the human provides input. Checkpointers are essential for this.
Analogy: Think of a checkpointer as an automatic “Save” button for your graph’s progress. It takes snapshots of the shared “whiteboard” (Channels) so you can always pick up where you left off.
Key Concepts
- What is Saved? The checkpointer saves the current value and version of every Channel in your graph’s state. It also keeps track of which step the graph was on and any pending tasks (like those created by
Send
). - When is it Saved? The Pregel Execution Engine typically triggers the checkpointer to save after each “superstep” (a round of node executions and state updates).
- Where is it Saved? This depends on the specific checkpointer implementation you choose. LangGraph provides several:
MemorySaver
: Stores checkpoints in your computer’s RAM. Simple for testing, but lost when your script ends.SqliteSaver
: Stores checkpoints in a local SQLite database file, making them persistent across script runs.- Other savers might store checkpoints in cloud databases or other persistent storage.
thread_id
(The Save Slot Name): To save and load progress correctly, you need a way to identify which specific run of the graph you want to work with. Think of this like naming your save file in a game. In LangGraph, this identifier is called thethread_id
. You provide it in theconfig
when you run the graph. Each uniquethread_id
represents an independent “conversation” or execution history.
How to Use a Checkpointer
Using a checkpointer is straightforward. You just need to tell LangGraph which checkpointer to use when you compile your graph.
Step 1: Import a Checkpointer
Let’s start with the simplest one, MemorySaver
.
# Import the simplest checkpointer
from langgraph.checkpoint.memory import MemorySaver
Step 2: Instantiate the Checkpointer
# Create an instance of the memory checkpointer
memory_saver = MemorySaver()
Step 3: Compile Your Graph with the Checkpointer
Let’s reuse our simple adder -> multiplier
graph. The graph definition itself doesn’t change.
# --- Define State and Nodes (same as Chapter 1) ---
from typing import TypedDict
from langgraph.graph import StateGraph, END, START
class MyState(TypedDict):
value: int
def add_one(state: MyState) -> dict:
print(f"Adder: Adding 1 to {state['value']}")
return {"value": state['value'] + 1}
def multiply_by_two(state: MyState) -> dict:
print(f"Multiplier: Doubling {state['value']}")
return {"value": state['value'] * 2}
# --- Build the Graph (same as Chapter 1) ---
workflow = StateGraph(MyState)
workflow.add_node("adder", add_one)
workflow.add_node("multiplier", multiply_by_two)
workflow.set_entry_point("adder")
workflow.add_edge("adder", "multiplier")
workflow.add_edge("multiplier", END)
# --- Compile WITH the checkpointer ---
# Pass the checkpointer instance to the compile method
app = workflow.compile(checkpointer=memory_saver)
That’s it! By passing checkpointer=memory_saver
to compile()
, you’ve enabled automatic checkpointing for this graph.
Step 4: Run with a thread_id
To use the checkpointer, you need to provide a configuration dictionary (config
) containing a unique identifier for this specific execution thread.
import uuid
# Create a unique ID for this run
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
# Define the initial state
initial_state = {"value": 5}
print("--- Running Graph (First Time) ---")
# Run the graph with the config
final_state = app.invoke(initial_state, config=config)
print("\n--- Final State (First Run) ---")
print(final_state)
Expected Output (First Run):
--- Running Graph (First Time) ---
Adder: Adding 1 to 5
Multiplier: Doubling 6
--- Final State (First Run) ---
{'value': 12}
Behind the scenes, MemorySaver
saved the state after the adder
step and after the multiplier
step, associating it with the thread_id
you provided.
Step 5: Resume the Graph
Now, let’s imagine we stopped the process. If we run the same graph with the same thread_id
, the checkpointer allows the Pregel Execution Engine to load the last saved state and continue. Since the first run finished completely, running invoke
again will just load the final state.
print("\n--- Running Graph Again with SAME thread_id ---")
# Use the SAME config (containing the same thread_id)
# Provide NO initial state, as it will be loaded from the checkpoint
resumed_state = app.invoke(None, config=config)
print("\n--- Final State (Resumed Run) ---")
print(resumed_state)
# Let's check the saved states using the checkpointer directly
print("\n--- Checkpoints Saved ---")
for checkpoint in memory_saver.list(config):
print(checkpoint)
Expected Output (Second Run):
--- Running Graph Again with SAME thread_id ---
# Notice: No node printouts because the graph already finished!
# It just loads the final saved state.
--- Final State (Resumed Run) ---
{'value': 12}
--- Checkpoints Saved ---
# You'll see checkpoint objects representing saved states
CheckpointTuple(config={'configurable': {'thread_id': '...'}}, checkpoint={'v': 1, 'ts': '...', 'id': '...', 'channel_values': {'value': 6}, 'channel_versions': {'adder': 1}, 'versions_seen': {'adder': {}}}, metadata={'source': 'loop', 'step': 1, ...}, ...)
CheckpointTuple(config={'configurable': {'thread_id': '...'}}, checkpoint={'v': 1, 'ts': '...', 'id': '...', 'channel_values': {'value': 12}, 'channel_versions': {'adder': 1, 'multiplier': 2}, 'versions_seen': {'adder': {}, 'multiplier': {'adder': 1}}}, metadata={'source': 'loop', 'step': 2, ...}, ...)
CheckpointTuple(config={'configurable': {'thread_id': '...'}}, checkpoint={'v': 1, 'ts': '...', 'id': '...', 'channel_values': {'value': 12}, 'channel_versions': {'adder': 1, 'multiplier': 2}, 'versions_seen': {'adder': {}, 'multiplier': {'adder': 1}}}, metadata={'source': 'loop', 'step': 3, ...}, ...)
The checkpointer successfully loaded the final state ({'value': 12}
) associated with that thread_id
.
Checkpointers and Interrupt
(Human-in-the-Loop)
Remember the Interrupt
example from Chapter 4?
# (Simplified HITL example from Chapter 4)
from langgraph.types import interrupt, Command
# ... (State, Nodes: create_plan, request_approval, execute_plan) ...
# Compile WITH checkpointer (REQUIRED for interrupt)
memory_saver_hitl = MemorySaver()
app_hitl = workflow.compile(checkpointer=memory_saver_hitl)
# Run, get interrupted
config_hitl = {"configurable": {"thread_id": str(uuid.uuid4())}}
for chunk in app_hitl.stream({"plan": ""}, config=config_hitl):
# ... (detect interrupt) ...
print("Graph interrupted!")
break
# Resume after human decision
human_decision = "Approved"
for chunk in app_hitl.stream(Command(resume=human_decision), config=config_hitl):
# ... (process remaining steps) ...
print("Graph resumed and finished!")
When interrupt()
was called inside the request_approval
node, the Pregel Execution Engine automatically used the memory_saver_hitl
checkpointer to save the exact state of the graph at that moment (including the plan). When we called stream
again with Command(resume=...)
and the same config_hitl
, the engine loaded that saved state using the checkpointer, allowing the graph to continue exactly where it left off, now with the human’s feedback.
Without a checkpointer, Interrupt
cannot work.
How Checkpointing Works Internally
What happens behind the scenes when a checkpointer is configured?
Saving:
- Step Complete: The Pregel Execution Engine finishes a step (e.g., after running the
adder
node and updating the state). - Signal Checkpointer: The engine tells the configured checkpointer (
MemorySaver
in our example) that it’s time to save. - Gather State: The checkpointer (or the engine on its behalf) accesses all the active Channels.
- Serialize State: For each channel, it calls the channel’s internal
checkpoint()
method to get a serializable representation of its current value (e.g., the number6
for the"value"
channel). - Store Checkpoint: The checkpointer bundles the serialized channel values, their versions, the current step number, and other metadata into a
Checkpoint
object. It then stores thisCheckpoint
associated with the currentthread_id
provided in theconfig
.MemorySaver
stores it in a dictionary in RAM;SqliteSaver
writes it to a database table.
Loading (Resuming):
- Invoke with
thread_id
: You callapp.invoke(None, config=config)
whereconfig
contains athread_id
that has been previously saved. - Request Checkpoint: The Pregel Execution Engine asks the checkpointer to load the latest checkpoint for the given
thread_id
. - Retrieve Checkpoint: The checkpointer retrieves the saved
Checkpoint
object (e.g., from its memory dictionary or the database). - Restore State: The engine takes the saved channel values from the checkpoint. For each channel, it calls the channel’s
from_checkpoint()
method (or similar internal logic) to restore its state. The “whiteboard” (Channels) is now exactly as it was when the checkpoint was saved. - Continue Execution: The engine looks at the saved step number and metadata to figure out where to resume execution, typically by preparing the tasks for the next step.
Here’s a simplified view of the interaction:
sequenceDiagram
participant User
participant App as CompiledGraph
participant Engine as Pregel Engine
participant Saver as Checkpointer (e.g., MemorySaver)
participant Storage as Underlying Storage (RAM, DB)
%% Saving %%
Engine->>Engine: Finishes Step N
Engine->>Saver: Save checkpoint for config (thread_id)
Saver->>Engine: Request current channel states & versions
Engine-->>Saver: Provides states & versions
Saver->>Storage: Store Checkpoint(Step N, states, versions) linked to thread_id
Storage-->>Saver: Acknowledge Save
Saver-->>Engine: Save Complete
%% Loading %%
User->>App: invoke(None, config with thread_id)
App->>Engine: Start/Resume Execution
Engine->>Saver: Get latest checkpoint for config (thread_id)
Saver->>Storage: Retrieve Checkpoint linked to thread_id
Storage-->>Saver: Returns Checkpoint(Step N, states, versions)
Saver-->>Engine: Provides Checkpoint
Engine->>Engine: Restore channel states from checkpoint
Engine->>Engine: Prepare tasks for Step N+1
Engine->>App: Continue execution...
A Peek at the Code (checkpoint/base.py
, checkpoint/memory.py
, pregel/loop.py
)
Let’s look at the core components:
-
BaseCheckpointSaver
(checkpoint/base.py
): This is the abstract base class (like a template) that all checkpointers must implement. It defines the essential methods the engine needs.# checkpoint/base.py (Highly Simplified) from abc import ABC, abstractmethod from typing import Any, Mapping, Optional, Sequence, Tuple, TypedDict # Represents a saved checkpoint class Checkpoint(TypedDict): channel_values: Mapping[str, Any] # Saved state of channels channel_versions: Mapping[str, int] # Internal versions versions_seen: Mapping[str, Mapping[str, int]] # Tracking for node execution # ... other metadata like v, ts, id, pending_sends ... # Represents the checkpoint tuple retrieved from storage class CheckpointTuple(NamedTuple): config: dict # The config used (includes thread_id) checkpoint: Checkpoint metadata: dict # ... other fields like parent_config, pending_writes ... class BaseCheckpointSaver(ABC): # --- Sync Methods --- @abstractmethod def get_tuple(self, config: dict) -> Optional[CheckpointTuple]: """Load the checkpoint tuple for the given config.""" ... @abstractmethod def put(self, config: dict, checkpoint: Checkpoint, metadata: dict) -> dict: """Save a checkpoint.""" ... # --- Async Methods (similar structure) --- @abstractmethod async def aget_tuple(self, config: dict) -> Optional[CheckpointTuple]: """Async load the checkpoint tuple.""" ... @abstractmethod async def aput(self, config: dict, checkpoint: Checkpoint, metadata: dict) -> dict: """Async save a checkpoint.""" ... # --- Other methods (list, put_writes) omitted for brevity ---
The key methods are
get_tuple
(to load) andput
(to save), along with their async counterparts (aget_tuple
,aput
). Any specific checkpointer (likeMemorySaver
,SqliteSaver
) must provide concrete implementations for these methods. -
MemorySaver
(checkpoint/memory.py
): A simple implementation that uses an in-memory dictionary.# checkpoint/memory.py (Highly Simplified) import threading from collections import defaultdict class MemorySaver(BaseCheckpointSaver): def __init__(self): # Use a dictionary to store checkpoints in RAM # Key: thread_id, Value: List of CheckpointTuples self._checkpoints: defaultdict[str, list[CheckpointTuple]] = defaultdict(list) self._lock = threading.RLock() # To handle multiple threads safely def get_tuple(self, config: dict) -> Optional[CheckpointTuple]: thread_id = config["configurable"]["thread_id"] with self._lock: if checkpoints := self._checkpoints.get(thread_id): # Return the latest checkpoint for this thread_id return checkpoints[-1] return None def put(self, config: dict, checkpoint: Checkpoint, metadata: dict) -> dict: thread_id = config["configurable"]["thread_id"] with self._lock: # Append the new checkpoint to the list for this thread_id self._checkpoints[thread_id].append( CheckpointTuple(config, checkpoint, metadata) ) return {"configurable": {"thread_id": thread_id}} # ... async methods (aget_tuple, aput) are similar using the same dict ... # ... list method iterates through the dictionary ...
As you can see,
MemorySaver
just uses a standard Python dictionary (self._checkpoints
) to store theCheckpointTuple
for eachthread_id
. This is simple but not persistent. -
Integration (
pregel/loop.py
): The Pregel Execution Engine (PregelLoop
classes) interacts with the checkpointer during its execution cycle.# pregel/loop.py (Conceptual Snippets) class PregelLoop: # Base class for Sync/Async loops def __init__(self, ..., checkpointer: Optional[BaseCheckpointSaver], ...): self.checkpointer = checkpointer # ... other init ... def _put_checkpoint(self, metadata: CheckpointMetadata) -> None: # Called by the loop after a step or input processing if self.checkpointer: # 1. Create the Checkpoint object from current channels/state checkpoint_data = create_checkpoint(self.checkpoint, self.channels, ...) # 2. Call the checkpointer's put method (sync or async) # (Uses self.submit to potentially run in background) self.submit(self.checkpointer.put, self.checkpoint_config, checkpoint_data, metadata) # 3. Update internal config with the new checkpoint ID self.checkpoint_config = {"configurable": {"thread_id": ..., "checkpoint_id": checkpoint_data["id"]}} def __enter__(self): # Or __aenter__ for async # Called when the loop starts if self.checkpointer: # 1. Try to load an existing checkpoint tuple saved = self.checkpointer.get_tuple(self.checkpoint_config) else: saved = None if saved: # 2. Restore state from the loaded checkpoint self.checkpoint = saved.checkpoint self.checkpoint_config = saved.config # ... restore channels from saved.checkpoint['channel_values'] ... else: # Initialize with an empty checkpoint self.checkpoint = empty_checkpoint() # ... setup channels based on restored or empty checkpoint ... return self
The
PregelLoop
uses the checkpointer’sget_tuple
method when it starts (in__enter__
or__aenter__
) to load any existing state. It uses theput
method (inside_put_checkpoint
) during execution to save progress.
Conclusion
You’ve learned about Checkpointers (BaseCheckpointSaver
), the mechanism that gives your LangGraph applications memory and resilience.
- Checkpointers save the state of your graph’s Channels periodically.
- They load saved states to resume execution.
- This is crucial for long-running graphs, human-in-the-loop workflows (using
Interrupt
), and recovering from failures. - You enable checkpointing by passing a
checkpointer
instance (likeMemorySaver
orSqliteSaver
) tograph.compile()
. - You manage different execution histories using a unique
thread_id
in theconfig
. MemorySaver
is simple for testing but lost when the script ends; use database savers (likeSqliteSaver
) for true persistence.
This chapter concludes our tour of the core concepts in LangGraph! You now understand the fundamental building blocks: the blueprint (StateGraph
), the workers (Nodes
), the communication system (Channels
), the traffic signals (Control Flow Primitives), the engine room (Pregel Execution Engine), and the save system (Checkpointer).
With these concepts, you’re well-equipped to start building your own sophisticated, stateful applications with LangGraph! Explore the documentation for more examples, advanced patterns, and different checkpointer implementations. Happy building!
Generated by AI Codebase Knowledge Builder