Chapter 8: Canvas (Signatures & Primitives) - Building Task Workflows

In the previous chapter, Chapter 7: Beat (Scheduler), we learned how to schedule tasks to run automatically at specific times using Celery Beat. This is great for recurring jobs. But what if you need to run a sequence of tasks, where one task depends on the result of another? Or run multiple tasks in parallel and then collect their results?

Imagine you’re building a feature where a user uploads an article, and you need to:

  1. Fetch the article content from a URL.
  2. Process the text to extract keywords.
  3. Process the text to detect the language.
  4. Once both processing steps are done, save the article and the extracted metadata to your database.

Simply running these tasks independently won’t work. Keyword extraction and language detection can happen at the same time, but only after the content is fetched. Saving can only happen after both processing steps are complete. How do you orchestrate this multi-step workflow?

This is where Celery Canvas comes in. It provides the building blocks to design complex task workflows.

What Problem Does Canvas Solve?

Canvas helps you connect individual Tasks together to form more sophisticated processes. It solves the problem of defining dependencies and flow control between tasks. Instead of just firing off tasks one by one and hoping they complete in the right order or manually checking results, Canvas lets you declare the desired workflow structure directly.

Think of it like having different types of Lego bricks:

  • Some bricks represent a single task.
  • Other bricks let you connect tasks end-to-end (run in sequence).
  • Some let you stack bricks side-by-side (run in parallel).
  • Others let you build a structure where several parallel steps must finish before the next piece is added.

Canvas gives you these connecting bricks for your Celery tasks.

Key Concepts: Signatures and Primitives

The core ideas in Canvas are Signatures and Workflow Primitives.

  1. Signature (signature or .s()): The Basic Building Block
    • A Signature wraps up everything needed to call a single task: the task’s name, the arguments (args), the keyword arguments (kwargs), and any execution options (like countdown, eta, queue name).
    • Think of it as a pre-filled request form or a recipe card for a specific task execution. It doesn’t run the task immediately; it just holds the plan for running it.
    • The easiest way to create a signature is using the .s() shortcut on a task function.
    # tasks.py
    from celery_app import app # Assuming app is defined in celery_app.py
    
    @app.task
    def add(x, y):
        return x + y
    
    # Create a signature for add(2, 3)
    add_sig = add.s(2, 3)
    
    # add_sig now holds the 'plan' to run add(2, 3)
    print(f"Signature: {add_sig}")
    print(f"Task name: {add_sig.task}")
    print(f"Arguments: {add_sig.args}")
    
    # To actually run it, you call .delay() or .apply_async() ON the signature
    # result_promise = add_sig.delay()
    

    Output:

    Signature: tasks.add(2, 3)
    Task name: tasks.add
    Arguments: (2, 3)
    
  2. Primitives: Connecting the Blocks Canvas provides several functions (primitives) to combine signatures into workflows:

    • chain: Links tasks sequentially. The result of the first task is passed as the first argument to the second task, and so on.
      • Analogy: An assembly line where each station passes its output to the next.
      • Syntax: (sig1 | sig2 | sig3) or chain(sig1, sig2, sig3)
    • group: Runs a list of tasks in parallel. It returns a special result object that helps track the group.
      • Analogy: Hiring several workers to do similar jobs independently at the same time.
      • Syntax: group(sig1, sig2, sig3)
    • chord: Runs a group of tasks in parallel (the “header”), and then, once all tasks in the group have finished successfully, it runs a single callback task (the “body”) with the results of the header tasks.
      • Analogy: A team of researchers works on different parts of a project in parallel. Once everyone is done, a lead researcher collects all the findings to write the final report.
      • Syntax: chord(group(header_sigs), body_sig)

There are other primitives like chunks, xmap, and starmap, but chain, group, and chord are the most fundamental ones for building workflows.

How to Use Canvas: Building the Article Processing Workflow

Let’s build the workflow we described earlier: Fetch -> (Process Keywords & Detect Language in parallel) -> Save.

1. Define the Tasks

First, we need our basic tasks. Let’s create dummy versions in tasks.py:

# tasks.py
from celery_app import app
import time
import random

@app.task
def fetch_data(url):
    print(f"Fetching data from {url}...")
    time.sleep(1)
    # Simulate fetching some data
    data = f"Content from {url} - {random.randint(1, 100)}"
    print(f"Fetched: {data}")
    return data

@app.task
def process_part_a(data):
    print(f"Processing Part A for: {data}")
    time.sleep(2)
    result_a = f"Keywords for '{data}'"
    print("Part A finished.")
    return result_a

@app.task
def process_part_b(data):
    print(f"Processing Part B for: {data}")
    time.sleep(3) # Simulate slightly longer processing
    result_b = f"Language for '{data}'"
    print("Part B finished.")
    return result_b

@app.task
def combine_results(results):
    # 'results' will be a list containing the return values
    # of process_part_a and process_part_b
    print(f"Combining results: {results}")
    time.sleep(1)
    final_output = f"Combined: {results[0]} | {results[1]}"
    print(f"Final Output: {final_output}")
    return final_output

2. Define the Workflow Using Canvas

Now, in a separate script or Python shell, let’s define the workflow using signatures and primitives.

# run_workflow.py
from celery import chain, group, chord
from tasks import fetch_data, process_part_a, process_part_b, combine_results

# The URL we want to process
article_url = "http://example.com/article1"

# Create the workflow structure
# 1. Fetch data. The result (data) is passed to the next step.
# 2. The next step is a chord:
#    - Header: A group running process_part_a and process_part_b in parallel.
#              Both tasks receive the 'data' from fetch_data.
#    - Body: combine_results receives a list of results from the group.

workflow = chain(
    fetch_data.s(article_url),              # Step 1: Fetch
    chord(                                  # Step 2: Chord
        group(process_part_a.s(), process_part_b.s()), # Header: Parallel processing
        combine_results.s()                            # Body: Combine results
    )
)

print(f"Workflow definition:\n{workflow}")

# Start the workflow
print("\nSending workflow to Celery...")
result_promise = workflow.apply_async()

print(f"Workflow sent! Final result ID: {result_promise.id}")
print("Run a Celery worker to execute the tasks.")
# You can optionally wait for the final result:
# final_result = result_promise.get()
# print(f"\nWorkflow finished! Final result: {final_result}")

Explanation:

  • We import chain, group, chord from celery.
  • We import our task functions.
  • fetch_data.s(article_url): Creates a signature for the first step.
  • process_part_a.s() and process_part_b.s(): Create signatures for the parallel tasks. Note that we don’t provide the data argument here. chain automatically passes the result of fetch_data to the next task in the sequence. Since the next task is a chord containing a group, Celery cleverly passes the data to each task within that group.
  • combine_results.s(): Creates the signature for the final step (the chord’s body). It doesn’t need arguments initially because the chord will automatically pass the list of results from the header group to it.
  • chain(...): Connects fetch_data to the chord.
  • chord(group(...), ...): Defines that the group must finish before combine_results is called.
  • group(...): Defines that process_part_a and process_part_b run in parallel.
  • workflow.apply_async(): This sends the first task (fetch_data) to the broker. The rest of the workflow is encoded in the task’s options (like link or chord information) so that Celery knows what to do next after each step completes.

If you run this script (and have a Worker running), you’ll see the tasks execute in the worker logs, respecting the defined dependencies and parallelism. fetch_data runs first, then process_part_a and process_part_b run concurrently, and finally combine_results runs after both A and B are done.

How It Works Internally (Simplified Walkthrough)

Let’s trace a simpler workflow: my_chain = (add.s(2, 2) | add.s(4))

  1. Workflow Definition: When you create my_chain, Celery creates a chain object containing the signatures add.s(2, 2) and add.s(4).
  2. Sending (my_chain.apply_async()):
    • Celery looks at the first task in the chain: add.s(2, 2).
    • It prepares to send this task message to the Broker Connection (AMQP).
    • Crucially, it adds a special option to the message, often called link (or uses the chain field in newer protocols). This option contains the signature of the next task in the chain: add.s(4).
    • The message for add(2, 2) (with the link to add(4)) is sent to the broker.
  3. Worker 1 Executes First Task:
    • A Worker picks up the message for add(2, 2).
    • It runs the add function with arguments (2, 2). The result is 4.
    • The worker stores the result 4 in the Result Backend (if configured).
    • The worker notices the link option in the original message, pointing to add.s(4).
  4. Worker 1 Sends Second Task:
    • The worker takes the result of the first task (4).
    • It uses the linked signature add.s(4).
    • It prepends the result (4) to the arguments of the linked signature, making it effectively add.s(4, 4). (Note: The original 4 in add.s(4) came from the chain definition, the first 4 is the result).
    • It sends a new message to the broker for add(4, 4).
  5. Worker 2 Executes Second Task:
    • Another (or the same) worker picks up the message for add(4, 4).
    • It runs add(4, 4). The result is 8.
    • It stores the result 8 in the backend.
    • There are no more links, so the chain is complete.

group works by sending all task messages in the group concurrently. chord is more complex; it involves the workers coordinating via the Result Backend to count completed tasks in the header before the callback task is finally sent.

sequenceDiagram
    participant Client as Your Code
    participant Canvas as workflow = chain(...)
    participant Broker as Message Broker
    participant Worker as Celery Worker

    Client->>Canvas: workflow.apply_async()
    Note over Canvas: Prepare msg for add(2, 2) with link=add.s(4)
    Canvas->>Broker: Send Task 1 msg ('add', (2, 2), link=add.s(4), id=T1)
    Broker-->>Canvas: Ack
    Canvas-->>Client: Return AsyncResult(id=T2) # ID of the *last* task in chain

    Worker->>Broker: Fetch msg (T1)
    Broker-->>Worker: Deliver Task 1 msg
    Worker->>Worker: Execute add(2, 2) -> returns 4
    Note over Worker: Store result 4 for T1 in Backend
    Worker->>Worker: Check 'link' option -> add.s(4)
    Note over Worker: Prepare msg for add(4, 4) using result 4 + linked args
    Worker->>Broker: Send Task 2 msg ('add', (4, 4), id=T2)
    Broker-->>Worker: Ack
    Worker->>Broker: Ack Task 1 msg complete

    Worker->>Broker: Fetch msg (T2)
    Broker-->>Worker: Deliver Task 2 msg
    Worker->>Worker: Execute add(4, 4) -> returns 8
    Note over Worker: Store result 8 for T2 in Backend
    Worker->>Broker: Ack Task 2 msg complete

Code Dive: Canvas Implementation

The logic for signatures and primitives resides primarily in celery/canvas.py.

  • Signature Class:
    • Defined in celery/canvas.py. It’s essentially a dictionary subclass holding task, args, kwargs, options, etc.
    • The .s() method on a Task instance (in celery/app/task.py) is a shortcut to create a Signature.
    • apply_async: Prepares arguments/options by calling _merge and then delegates to self.type.apply_async (the task’s method) or app.send_task.
    • link, link_error: Methods that modify the options dictionary to add callbacks.
    • __or__: The pipe operator (|) overload. It checks the type of the right-hand operand (other) and constructs a _chain object accordingly.
    # Simplified from celery/canvas.py
    class Signature(dict):
        # ... methods like __init__, clone, set, apply_async ...
    
        def link(self, callback):
            # Appends callback signature to the 'link' list in options
            return self.append_to_list_option('link', callback)
    
        def link_error(self, errback):
            # Appends errback signature to the 'link_error' list in options
            return self.append_to_list_option('link_error', errback)
    
        def __or__(self, other):
            # Called when you use the pipe '|' operator
            if isinstance(other, Signature):
                # task | task -> chain
                return _chain(self, other, app=self._app)
            # ... other cases for group, chain ...
            return NotImplemented
    
  • _chain Class:
    • Also in celery/canvas.py, inherits from Signature. Its task name is hardcoded to 'celery.chain'. The actual task signatures are stored in kwargs['tasks'].
    • apply_async / run: Contains the logic to handle sending the first task with the rest of the chain embedded in the options (either via link for protocol 1 or the chain message property for protocol 2).
    • prepare_steps: This complex method recursively unwraps nested primitives (like a chain within a chain, or a group that needs to become a chord) and sets up the linking between steps.
    # Simplified concept from celery/canvas.py (chain execution)
    class _chain(Signature):
        # ... __init__, __or__ ...
    
        def apply_async(self, args=None, kwargs=None, **options):
            # ... handle always_eager ...
            return self.run(args, kwargs, app=self.app, **options)
    
        def run(self, args=None, kwargs=None, app=None, **options):
            # ... setup ...
            tasks, results = self.prepare_steps(...) # Unroll and freeze tasks
    
            if results: # If there are tasks to run
                first_task = tasks.pop() # Get the first task (list is reversed)
                remaining_chain = tasks if tasks else None
    
                # Determine how to pass the chain info (link vs. message field)
                use_link = self._use_link # ... logic to decide ...
    
                if use_link:
                    # Protocol 1: Link first task to the second task
                    if remaining_chain:
                         first_task.link(remaining_chain.pop())
                         # (Worker handles subsequent links)
                    options_to_apply = options # Pass original options
                else:
                    # Protocol 2: Embed the rest of the reversed chain in options
                    options_to_apply = ChainMap({'chain': remaining_chain}, options)
    
                # Send the *first* task only
                result_from_apply = first_task.apply_async(**options_to_apply)
                # Return AsyncResult of the *last* task in the original chain
                return results[0]
    
  • group Class:
    • In celery/canvas.py. Its task name is 'celery.group'.
    • apply_async: Iterates through its tasks, freezes each one (assigning a common group_id), sends their messages, and collects the AsyncResult objects into a GroupResult. It uses a barrier (from the vine library) to track completion.
  • chord Class:
    • In celery/canvas.py. Its task name is 'celery.chord'.
    • apply_async / run: Coordinates with the result backend (backend.apply_chord). It typically runs the header group first, configuring it to notify the backend upon completion. The backend then triggers the body task once the count is reached.

Conclusion

Celery Canvas transforms simple tasks into powerful workflow components.

  • A Signature (task.s()) captures the details for a single task call without running it.
  • Primitives like chain (|), group, and chord combine signatures to define complex execution flows:
    • chain: Sequence (output of one to input of next).
    • group: Parallel execution.
    • chord: Parallel execution followed by a callback with all results.
  • You compose these primitives like building with Lego bricks to model your application’s logic.
  • Calling .apply_async() on a workflow primitive starts the process by sending the first task(s), embedding the rest of the workflow logic in the task options or using backend coordination.

Canvas allows you to move complex orchestration logic out of your application code and into Celery, making your tasks more modular and your overall system more robust.

Now that you can build and run complex workflows, how do you monitor what’s happening inside Celery? How do you know when tasks start, finish, or fail in real-time?

Next: Chapter 9: Events


Generated by AI Codebase Knowledge Builder