Chapter 6: Checkpointer (BaseCheckpointSaver) - Saving Your Progress

In Chapter 5: Pregel Execution Engine, we saw how the engine runs our graph step-by-step. But what happens if a graph takes hours to run, or if it needs to pause and wait for a human? If the program crashes or we need to stop it, do we lose all the progress?

That’s where Checkpointers come to the rescue!

What Problem Do Checkpointers Solve?

Imagine you’re playing a long video game. You wouldn’t want to start from the very beginning every time you stop playing, right? Games have save points or checkpoints that record your progress.

LangGraph’s Checkpointer does the same thing for your graph execution. It automatically saves the graph’s state at certain points, usually after each step completed by the Pregel Execution Engine.

This is incredibly useful for:

  1. Long-Running Processes: If your graph involves many steps or calls to slow tools/LLMs, you can stop it and resume later without losing work.
  2. Resilience: If your program crashes unexpectedly, you can restart it from the last saved checkpoint.
  3. Human-in-the-Loop (HITL): As we saw with Interrupt in Chapter 4: Control Flow Primitives, pausing the graph requires saving its state so it can be perfectly restored when the human provides input. Checkpointers are essential for this.

Analogy: Think of a checkpointer as an automatic “Save” button for your graph’s progress. It takes snapshots of the shared “whiteboard” (Channels) so you can always pick up where you left off.

Key Concepts

  1. What is Saved? The checkpointer saves the current value and version of every Channel in your graph’s state. It also keeps track of which step the graph was on and any pending tasks (like those created by Send).
  2. When is it Saved? The Pregel Execution Engine typically triggers the checkpointer to save after each “superstep” (a round of node executions and state updates).
  3. Where is it Saved? This depends on the specific checkpointer implementation you choose. LangGraph provides several:
    • MemorySaver: Stores checkpoints in your computer’s RAM. Simple for testing, but lost when your script ends.
    • SqliteSaver: Stores checkpoints in a local SQLite database file, making them persistent across script runs.
    • Other savers might store checkpoints in cloud databases or other persistent storage.
  4. thread_id (The Save Slot Name): To save and load progress correctly, you need a way to identify which specific run of the graph you want to work with. Think of this like naming your save file in a game. In LangGraph, this identifier is called the thread_id. You provide it in the config when you run the graph. Each unique thread_id represents an independent “conversation” or execution history.

How to Use a Checkpointer

Using a checkpointer is straightforward. You just need to tell LangGraph which checkpointer to use when you compile your graph.

Step 1: Import a Checkpointer

Let’s start with the simplest one, MemorySaver.

# Import the simplest checkpointer
from langgraph.checkpoint.memory import MemorySaver

Step 2: Instantiate the Checkpointer

# Create an instance of the memory checkpointer
memory_saver = MemorySaver()

Step 3: Compile Your Graph with the Checkpointer

Let’s reuse our simple adder -> multiplier graph. The graph definition itself doesn’t change.

# --- Define State and Nodes (same as Chapter 1) ---
from typing import TypedDict
from langgraph.graph import StateGraph, END, START

class MyState(TypedDict):
    value: int

def add_one(state: MyState) -> dict:
    print(f"Adder: Adding 1 to {state['value']}")
    return {"value": state['value'] + 1}

def multiply_by_two(state: MyState) -> dict:
    print(f"Multiplier: Doubling {state['value']}")
    return {"value": state['value'] * 2}

# --- Build the Graph (same as Chapter 1) ---
workflow = StateGraph(MyState)
workflow.add_node("adder", add_one)
workflow.add_node("multiplier", multiply_by_two)
workflow.set_entry_point("adder")
workflow.add_edge("adder", "multiplier")
workflow.add_edge("multiplier", END)

# --- Compile WITH the checkpointer ---
# Pass the checkpointer instance to the compile method
app = workflow.compile(checkpointer=memory_saver)

That’s it! By passing checkpointer=memory_saver to compile(), you’ve enabled automatic checkpointing for this graph.

Step 4: Run with a thread_id

To use the checkpointer, you need to provide a configuration dictionary (config) containing a unique identifier for this specific execution thread.

import uuid

# Create a unique ID for this run
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}

# Define the initial state
initial_state = {"value": 5}

print("--- Running Graph (First Time) ---")
# Run the graph with the config
final_state = app.invoke(initial_state, config=config)

print("\n--- Final State (First Run) ---")
print(final_state)

Expected Output (First Run):

--- Running Graph (First Time) ---
Adder: Adding 1 to 5
Multiplier: Doubling 6

--- Final State (First Run) ---
{'value': 12}

Behind the scenes, MemorySaver saved the state after the adder step and after the multiplier step, associating it with the thread_id you provided.

Step 5: Resume the Graph

Now, let’s imagine we stopped the process. If we run the same graph with the same thread_id, the checkpointer allows the Pregel Execution Engine to load the last saved state and continue. Since the first run finished completely, running invoke again will just load the final state.

print("\n--- Running Graph Again with SAME thread_id ---")
# Use the SAME config (containing the same thread_id)
# Provide NO initial state, as it will be loaded from the checkpoint
resumed_state = app.invoke(None, config=config)

print("\n--- Final State (Resumed Run) ---")
print(resumed_state)

# Let's check the saved states using the checkpointer directly
print("\n--- Checkpoints Saved ---")
for checkpoint in memory_saver.list(config):
    print(checkpoint)

Expected Output (Second Run):

--- Running Graph Again with SAME thread_id ---
# Notice: No node printouts because the graph already finished!
# It just loads the final saved state.

--- Final State (Resumed Run) ---
{'value': 12}

--- Checkpoints Saved ---
# You'll see checkpoint objects representing saved states
CheckpointTuple(config={'configurable': {'thread_id': '...'}}, checkpoint={'v': 1, 'ts': '...', 'id': '...', 'channel_values': {'value': 6}, 'channel_versions': {'adder': 1}, 'versions_seen': {'adder': {}}}, metadata={'source': 'loop', 'step': 1, ...}, ...)
CheckpointTuple(config={'configurable': {'thread_id': '...'}}, checkpoint={'v': 1, 'ts': '...', 'id': '...', 'channel_values': {'value': 12}, 'channel_versions': {'adder': 1, 'multiplier': 2}, 'versions_seen': {'adder': {}, 'multiplier': {'adder': 1}}}, metadata={'source': 'loop', 'step': 2, ...}, ...)
CheckpointTuple(config={'configurable': {'thread_id': '...'}}, checkpoint={'v': 1, 'ts': '...', 'id': '...', 'channel_values': {'value': 12}, 'channel_versions': {'adder': 1, 'multiplier': 2}, 'versions_seen': {'adder': {}, 'multiplier': {'adder': 1}}}, metadata={'source': 'loop', 'step': 3, ...}, ...)

The checkpointer successfully loaded the final state ({'value': 12}) associated with that thread_id.

Checkpointers and Interrupt (Human-in-the-Loop)

Remember the Interrupt example from Chapter 4?

# (Simplified HITL example from Chapter 4)
from langgraph.types import interrupt, Command
# ... (State, Nodes: create_plan, request_approval, execute_plan) ...

# Compile WITH checkpointer (REQUIRED for interrupt)
memory_saver_hitl = MemorySaver()
app_hitl = workflow.compile(checkpointer=memory_saver_hitl)

# Run, get interrupted
config_hitl = {"configurable": {"thread_id": str(uuid.uuid4())}}
for chunk in app_hitl.stream({"plan": ""}, config=config_hitl):
    # ... (detect interrupt) ...
    print("Graph interrupted!")
    break

# Resume after human decision
human_decision = "Approved"
for chunk in app_hitl.stream(Command(resume=human_decision), config=config_hitl):
     # ... (process remaining steps) ...
     print("Graph resumed and finished!")

When interrupt() was called inside the request_approval node, the Pregel Execution Engine automatically used the memory_saver_hitl checkpointer to save the exact state of the graph at that moment (including the plan). When we called stream again with Command(resume=...) and the same config_hitl, the engine loaded that saved state using the checkpointer, allowing the graph to continue exactly where it left off, now with the human’s feedback.

Without a checkpointer, Interrupt cannot work.

How Checkpointing Works Internally

What happens behind the scenes when a checkpointer is configured?

Saving:

  1. Step Complete: The Pregel Execution Engine finishes a step (e.g., after running the adder node and updating the state).
  2. Signal Checkpointer: The engine tells the configured checkpointer (MemorySaver in our example) that it’s time to save.
  3. Gather State: The checkpointer (or the engine on its behalf) accesses all the active Channels.
  4. Serialize State: For each channel, it calls the channel’s internal checkpoint() method to get a serializable representation of its current value (e.g., the number 6 for the "value" channel).
  5. Store Checkpoint: The checkpointer bundles the serialized channel values, their versions, the current step number, and other metadata into a Checkpoint object. It then stores this Checkpoint associated with the current thread_id provided in the config. MemorySaver stores it in a dictionary in RAM; SqliteSaver writes it to a database table.

Loading (Resuming):

  1. Invoke with thread_id: You call app.invoke(None, config=config) where config contains a thread_id that has been previously saved.
  2. Request Checkpoint: The Pregel Execution Engine asks the checkpointer to load the latest checkpoint for the given thread_id.
  3. Retrieve Checkpoint: The checkpointer retrieves the saved Checkpoint object (e.g., from its memory dictionary or the database).
  4. Restore State: The engine takes the saved channel values from the checkpoint. For each channel, it calls the channel’s from_checkpoint() method (or similar internal logic) to restore its state. The “whiteboard” (Channels) is now exactly as it was when the checkpoint was saved.
  5. Continue Execution: The engine looks at the saved step number and metadata to figure out where to resume execution, typically by preparing the tasks for the next step.

Here’s a simplified view of the interaction:

sequenceDiagram
    participant User
    participant App as CompiledGraph
    participant Engine as Pregel Engine
    participant Saver as Checkpointer (e.g., MemorySaver)
    participant Storage as Underlying Storage (RAM, DB)

    %% Saving %%
    Engine->>Engine: Finishes Step N
    Engine->>Saver: Save checkpoint for config (thread_id)
    Saver->>Engine: Request current channel states & versions
    Engine-->>Saver: Provides states & versions
    Saver->>Storage: Store Checkpoint(Step N, states, versions) linked to thread_id
    Storage-->>Saver: Acknowledge Save
    Saver-->>Engine: Save Complete

    %% Loading %%
    User->>App: invoke(None, config with thread_id)
    App->>Engine: Start/Resume Execution
    Engine->>Saver: Get latest checkpoint for config (thread_id)
    Saver->>Storage: Retrieve Checkpoint linked to thread_id
    Storage-->>Saver: Returns Checkpoint(Step N, states, versions)
    Saver-->>Engine: Provides Checkpoint
    Engine->>Engine: Restore channel states from checkpoint
    Engine->>Engine: Prepare tasks for Step N+1
    Engine->>App: Continue execution...

A Peek at the Code (checkpoint/base.py, checkpoint/memory.py, pregel/loop.py)

Let’s look at the core components:

  • BaseCheckpointSaver (checkpoint/base.py): This is the abstract base class (like a template) that all checkpointers must implement. It defines the essential methods the engine needs.

    # checkpoint/base.py (Highly Simplified)
    from abc import ABC, abstractmethod
    from typing import Any, Mapping, Optional, Sequence, Tuple, TypedDict
    
    # Represents a saved checkpoint
    class Checkpoint(TypedDict):
        channel_values: Mapping[str, Any] # Saved state of channels
        channel_versions: Mapping[str, int] # Internal versions
        versions_seen: Mapping[str, Mapping[str, int]] # Tracking for node execution
        # ... other metadata like v, ts, id, pending_sends ...
    
    # Represents the checkpoint tuple retrieved from storage
    class CheckpointTuple(NamedTuple):
        config: dict # The config used (includes thread_id)
        checkpoint: Checkpoint
        metadata: dict
        # ... other fields like parent_config, pending_writes ...
    
    class BaseCheckpointSaver(ABC):
        # --- Sync Methods ---
        @abstractmethod
        def get_tuple(self, config: dict) -> Optional[CheckpointTuple]:
            """Load the checkpoint tuple for the given config."""
            ...
    
        @abstractmethod
        def put(self, config: dict, checkpoint: Checkpoint, metadata: dict) -> dict:
            """Save a checkpoint."""
            ...
    
        # --- Async Methods (similar structure) ---
        @abstractmethod
        async def aget_tuple(self, config: dict) -> Optional[CheckpointTuple]:
            """Async load the checkpoint tuple."""
            ...
    
        @abstractmethod
        async def aput(self, config: dict, checkpoint: Checkpoint, metadata: dict) -> dict:
            """Async save a checkpoint."""
            ...
    
        # --- Other methods (list, put_writes) omitted for brevity ---
    

    The key methods are get_tuple (to load) and put (to save), along with their async counterparts (aget_tuple, aput). Any specific checkpointer (like MemorySaver, SqliteSaver) must provide concrete implementations for these methods.

  • MemorySaver (checkpoint/memory.py): A simple implementation that uses an in-memory dictionary.

    # checkpoint/memory.py (Highly Simplified)
    import threading
    from collections import defaultdict
    
    class MemorySaver(BaseCheckpointSaver):
        def __init__(self):
            # Use a dictionary to store checkpoints in RAM
            # Key: thread_id, Value: List of CheckpointTuples
            self._checkpoints: defaultdict[str, list[CheckpointTuple]] = defaultdict(list)
            self._lock = threading.RLock() # To handle multiple threads safely
    
        def get_tuple(self, config: dict) -> Optional[CheckpointTuple]:
            thread_id = config["configurable"]["thread_id"]
            with self._lock:
                if checkpoints := self._checkpoints.get(thread_id):
                    # Return the latest checkpoint for this thread_id
                    return checkpoints[-1]
                return None
    
        def put(self, config: dict, checkpoint: Checkpoint, metadata: dict) -> dict:
            thread_id = config["configurable"]["thread_id"]
            with self._lock:
                # Append the new checkpoint to the list for this thread_id
                self._checkpoints[thread_id].append(
                    CheckpointTuple(config, checkpoint, metadata)
                )
            return {"configurable": {"thread_id": thread_id}}
    
        # ... async methods (aget_tuple, aput) are similar using the same dict ...
        # ... list method iterates through the dictionary ...
    

    As you can see, MemorySaver just uses a standard Python dictionary (self._checkpoints) to store the CheckpointTuple for each thread_id. This is simple but not persistent.

  • Integration (pregel/loop.py): The Pregel Execution Engine (PregelLoop classes) interacts with the checkpointer during its execution cycle.

    # pregel/loop.py (Conceptual Snippets)
    
    class PregelLoop: # Base class for Sync/Async loops
        def __init__(self, ..., checkpointer: Optional[BaseCheckpointSaver], ...):
            self.checkpointer = checkpointer
            # ... other init ...
    
        def _put_checkpoint(self, metadata: CheckpointMetadata) -> None:
            # Called by the loop after a step or input processing
            if self.checkpointer:
                # 1. Create the Checkpoint object from current channels/state
                checkpoint_data = create_checkpoint(self.checkpoint, self.channels, ...)
    
                # 2. Call the checkpointer's put method (sync or async)
                #    (Uses self.submit to potentially run in background)
                self.submit(self.checkpointer.put, self.checkpoint_config, checkpoint_data, metadata)
    
                # 3. Update internal config with the new checkpoint ID
                self.checkpoint_config = {"configurable": {"thread_id": ..., "checkpoint_id": checkpoint_data["id"]}}
    
        def __enter__(self): # Or __aenter__ for async
            # Called when the loop starts
            if self.checkpointer:
                # 1. Try to load an existing checkpoint tuple
                saved = self.checkpointer.get_tuple(self.checkpoint_config)
            else:
                saved = None
    
            if saved:
                # 2. Restore state from the loaded checkpoint
                self.checkpoint = saved.checkpoint
                self.checkpoint_config = saved.config
                # ... restore channels from saved.checkpoint['channel_values'] ...
            else:
                # Initialize with an empty checkpoint
                self.checkpoint = empty_checkpoint()
    
            # ... setup channels based on restored or empty checkpoint ...
            return self
    

    The PregelLoop uses the checkpointer’s get_tuple method when it starts (in __enter__ or __aenter__) to load any existing state. It uses the put method (inside _put_checkpoint) during execution to save progress.

Conclusion

You’ve learned about Checkpointers (BaseCheckpointSaver), the mechanism that gives your LangGraph applications memory and resilience.

  • Checkpointers save the state of your graph’s Channels periodically.
  • They load saved states to resume execution.
  • This is crucial for long-running graphs, human-in-the-loop workflows (using Interrupt), and recovering from failures.
  • You enable checkpointing by passing a checkpointer instance (like MemorySaver or SqliteSaver) to graph.compile().
  • You manage different execution histories using a unique thread_id in the config.
  • MemorySaver is simple for testing but lost when the script ends; use database savers (like SqliteSaver) for true persistence.

This chapter concludes our tour of the core concepts in LangGraph! You now understand the fundamental building blocks: the blueprint (StateGraph), the workers (Nodes), the communication system (Channels), the traffic signals (Control Flow Primitives), the engine room (Pregel Execution Engine), and the save system (Checkpointer).

With these concepts, you’re well-equipped to start building your own sophisticated, stateful applications with LangGraph! Explore the documentation for more examples, advanced patterns, and different checkpointer implementations. Happy building!


Generated by AI Codebase Knowledge Builder