Chapter 5: Worker - The Task Doer
In Chapter 4: Broker Connection (AMQP), we learned how Celery uses a message broker, like a postal service, to send task messages. When you call add.delay(2, 2)
, a message asking to run the add
task with arguments (2, 2)
gets dropped into a mailbox (the broker queue).
But who actually checks that mailbox, picks up the message, and performs the addition? That’s the job of the Celery Worker.
What Problem Does the Worker Solve?
Imagine our workshop analogy again. You’ve defined the blueprint for a job (Task) and you’ve dropped the work order into the central inbox (Broker Connection (AMQP)). Now you need an actual employee or a machine to:
- Look in the inbox for new work orders.
- Pick up an order.
- Follow the instructions (run the task code).
- Maybe put the finished product (the result) somewhere specific.
- Mark the order as complete.
The Celery Worker is that employee or machine. It’s a separate program (process) that you run, whose sole purpose is to execute the tasks you send to the broker. Without a worker running, your task messages would just sit in the queue forever, waiting for someone to process them.
Starting Your First Worker
Running a worker is typically done from your command line or terminal. You need to tell the worker where to find your Celery App instance (which holds the configuration, including the broker address and the list of known tasks).
Assuming you have:
- A file
celery_app.py
containing yourapp = Celery(...)
instance. - A file
tasks.py
containing your task definitions (likeadd
andsend_welcome_email
) decorated with@app.task
. - Your message broker (e.g., Redis or RabbitMQ) running.
You can start a worker like this:
# In your terminal, in the same directory as celery_app.py and tasks.py
# Make sure your Python environment has celery and the broker driver installed
# (e.g., pip install celery redis)
celery -A celery_app worker --loglevel=info
Explanation:
celery
: This is the main Celery command-line program.-A celery_app
: The-A
flag (or--app
) tells Celery where to find yourCelery
app instance.celery_app
refers to thecelery_app.py
file (or module) and implies Celery should look for an instance namedapp
inside it.worker
: This specifies that you want to run the worker component.--loglevel=info
: This sets the logging level.info
is a good starting point, showing you when the worker connects, finds tasks, and executes them. Other levels includedebug
(more verbose),warning
,error
, andcritical
.
What You’ll See:
When the worker starts successfully, you’ll see a banner like this (details may vary):
-------------- celery@yourhostname v5.x.x (stars)
--- ***** -----
-- ******* ---- Linux-5.15.0...-generic-x86_64-with-... 2023-10-27 10:00:00
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f...
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
. tasks.send_welcome_email
[2023-10-27 10:00:01,000: INFO/MainProcess] Connected to redis://localhost:6379/0
[2023-10-27 10:00:01,050: INFO/MainProcess] mingle: searching for neighbors
[2023-10-27 10:00:02,100: INFO/MainProcess] mingle: all alone
[2023-10-27 10:00:02,150: INFO/MainProcess] celery@yourhostname ready.
Key Parts of the Banner:
celery@yourhostname
: The unique name of this worker instance.transport
: The broker URL it connected to (from your app config).results
: The result backend URL (if configured).concurrency
: How many tasks this worker can potentially run at once (defaults to the number of CPU cores) and the execution pool type (prefork
is common). We’ll touch on this later.queues
: The specific “mailboxes” (queues) the worker is listening to.celery
is the default queue name.[tasks]
: A list of all the tasks the worker discovered (like ourtasks.add
andtasks.send_welcome_email
). If your tasks don’t show up here, the worker won’t be able to run them!
The final celery@yourhostname ready.
message means the worker is connected and waiting for jobs!
What the Worker Does
Now that the worker is running, let’s trace what happens when you send a task (e.g., from run_tasks.py
in Chapter 3: Task):
- Waiting: The worker is connected to the broker, listening on the
celery
queue. - Message Arrival: Your
add.delay(5, 7)
call sends a message to thecelery
queue on the broker. The broker notifies the worker. - Receive & Decode: The worker receives the raw message. It decodes it to find the task name (
tasks.add
), the arguments ((5, 7)
), and other info (like a unique task ID). - Find Task Code: The worker looks up the name
tasks.add
in its internal registry (populated when it started) to find the actual Python functionadd
defined intasks.py
. - Execute: The worker executes the function:
add(5, 7)
.- You will see the
print
statements from your task function appear in the worker’s terminal output:[2023-10-27 10:05:00,100: INFO/ForkPoolWorker-1] Task tasks.add[some-task-id] received Task 'add' starting with (5, 7) Task 'add' finished with result: 12 [2023-10-27 10:05:05,150: INFO/ForkPoolWorker-1] Task tasks.add[some-task-id] succeeded in 5.05s: 12
- You will see the
- Store Result (Optional): If a Result Backend is configured, the worker takes the return value (
12
) and sends it to the backend, associating it with the task’s unique ID. - Acknowledge: The worker sends an “acknowledgement” (ack) back to the broker. This tells the broker, “I finished processing this message successfully, you can delete it from the queue.” This ensures tasks aren’t lost if a worker crashes mid-execution (the message would remain on the queue for another worker to pick up).
- Wait Again: The worker goes back to waiting for the next message.
Running Multiple Workers and Concurrency
- Multiple Workers: You can start multiple worker processes by running the
celery worker
command again, perhaps on different machines or in different terminals on the same machine. They will all connect to the same broker and pull tasks from the queue, allowing you to process tasks in parallel and scale your application. - Concurrency within a Worker: A single worker process can often handle more than one task concurrently. Celery achieves this using execution pools.
- Prefork (Default): The worker starts several child processes. Each child process handles one task at a time. The
-c
(or--concurrency
) flag controls the number of child processes (default is the number of CPU cores). This is good for CPU-bound tasks. - Eventlet/Gevent: Uses green threads (lightweight concurrency managed by libraries like eventlet or gevent). A single worker process can handle potentially hundreds or thousands of tasks concurrently, especially if the tasks are I/O-bound (e.g., waiting for network requests). You select these using the
-P
flag:celery -A celery_app worker -P eventlet -c 1000
. Requires installing the respective library (pip install eventlet
orpip install gevent
). - Solo: Executes tasks one after another in the main worker process. Useful for debugging.
-P solo
. - Threads: Uses regular OS threads.
-P threads
. Less common for Celery tasks due to Python’s Global Interpreter Lock (GIL) limitations for CPU-bound tasks, but can be useful for I/O-bound tasks.
- Prefork (Default): The worker starts several child processes. Each child process handles one task at a time. The
For beginners, sticking with the default prefork pool is usually fine. Just know that the worker can likely handle multiple tasks simultaneously.
How It Works Internally (Simplified)
Let’s visualize the worker’s main job: processing a single task.
- Startup: The
celery worker
command starts the main worker process. It loads theCelery App
, reads the configuration (broker_url
, tasks to import, etc.). - Connect & Listen: The worker establishes a connection to the message broker and tells it, “I’m ready to consume messages from the ‘celery’ queue.”
- Message Delivery: The broker sees a message for the ‘celery’ queue (sent by
add.delay(5, 7)
) and delivers it to the connected worker. - Consumer Receives: The worker’s internal “Consumer” component receives the message.
- Task Dispatch: The Consumer decodes the message, identifies the task (
tasks.add
), and finds the arguments ((5, 7)
). It then hands this off to the configured execution pool (e.g., prefork). - Pool Execution: The pool (e.g., a child process in the prefork pool) gets the task function and arguments and executes
add(5, 7)
. - Result Return: The pool process finishes execution and returns the result (
12
) back to the main worker process. - Result Handling (Optional): The main worker process, if a Result Backend is configured, sends the result (
12
) and task ID to the backend store. - Acknowledgement: The main worker process sends an “ack” message back to the broker, confirming the task message was successfully processed. The broker then deletes the message.
sequenceDiagram
participant CLI as Terminal (celery worker)
participant WorkerMain as Worker Main Process
participant App as Celery App Instance
participant Broker as Message Broker
participant Pool as Execution Pool (e.g., Prefork Child)
participant TaskCode as Your Task Function (add)
CLI->>WorkerMain: Start celery -A celery_app worker
WorkerMain->>App: Load App & Config (broker_url, tasks)
WorkerMain->>Broker: Connect & Listen on 'celery' queue
Broker-->>WorkerMain: Deliver Message ('tasks.add', (5, 7), task_id)
WorkerMain->>WorkerMain: Decode Message
WorkerMain->>Pool: Request Execute add(5, 7) with task_id
Pool->>TaskCode: Run add(5, 7)
TaskCode-->>Pool: Return 12
Pool-->>WorkerMain: Result=12 for task_id
Note over WorkerMain: (Optionally) Store 12 in Result Backend
WorkerMain->>Broker: Acknowledge task_id is complete
Code Dive: Where Worker Logic Lives
- Command Line Entry Point (
celery/bin/worker.py
): This script handles parsing the command-line arguments (-A
,-l
,-c
,-P
, etc.) when you runcelery worker ...
. It ultimately creates and starts aWorkController
instance. (Seeworker()
function in the file). - Main Worker Class (
celery/worker/worker.py
): TheWorkController
class is the heart of the worker. It manages all the different components (like the pool, consumer, timer, etc.) using a system called “bootsteps”. It handles the overall startup, shutdown, and coordination. (SeeWorkController
class). - Message Handling (
celery/worker/consumer/consumer.py
): TheConsumer
class (specifically itsBlueprint
and steps likeTasks
andEvloop
) is responsible for the core loop of fetching messages from the broker via the connection, decoding them, and dispatching them to the execution pool using task strategies. (SeeConsumer.create_task_handler
). - Execution Pools (
celery/concurrency/
): Modules likeprefork.py
,solo.py
,eventlet.py
,gevent.py
implement the different concurrency models (-P
flag). TheWorkController
selects and manages one of these pools.
A highly simplified conceptual view of the core message processing logic within the Consumer
:
# Conceptual loop inside the Consumer (highly simplified)
def message_handler(message):
try:
# 1. Decode message (task name, args, kwargs, id, etc.)
task_name, args, kwargs, task_id = decode_message(message.body, message.headers)
# 2. Find the registered task function
task_func = app.tasks[task_name]
# 3. Prepare execution request for the pool
request = TaskRequest(task_id, task_name, task_func, args, kwargs)
# 4. Send request to the pool for execution
# (Pool runs request.execute() which calls task_func(*args, **kwargs))
pool.apply_async(request.execute, accept_callback=task_succeeded, ...)
except Exception as e:
# Handle errors (e.g., unknown task, decoding error)
log_error(e)
message.reject() # Tell broker it failed
def task_succeeded(task_id, retval):
# Called by the pool when task finishes successfully
# 5. Store result (optional)
if app.backend:
app.backend.store_result(task_id, retval, status='SUCCESS')
# 6. Acknowledge message to broker
message.ack()
# --- Setup ---
# Worker connects to broker and registers message_handler
# for incoming messages on the subscribed queue(s)
connection.consume(queue_name, callback=message_handler)
# Start the event loop to wait for messages
connection.drain_events()
This illustrates the fundamental cycle: receive -> decode -> find task -> execute via pool -> handle result -> acknowledge. The actual code involves much more detail regarding error handling, state management, different protocols, rate limiting, etc., managed through the bootstep system.
Conclusion
You’ve now met the Celery Worker, the essential component that actually runs your tasks.
- It’s a separate process you start from the command line (
celery worker
). - It connects to the broker using the configuration from your Celery App.
- It listens for task messages on queues.
- It executes the corresponding task code when a message arrives.
- It handles concurrency using execution pools (like prefork, eventlet, gevent).
- It acknowledges messages to the broker upon successful completion.
Without workers, Celery tasks would never get done. But what happens when a task finishes? What if it returns a value, like our add
task returning 12
? How can your original application find out the result? That’s where the Result Backend comes in.
Next: Chapter 6: Result Backend
Generated by AI Codebase Knowledge Builder