Chapter 3: Task - The Job Description
In Chapter 1: The Celery App, we set up our Celery headquarters, and in Chapter 2: Configuration, we learned how to give it instructions. Now, we need to define the actual work we want Celery to do. This is where Tasks come in.
What Problem Does a Task Solve?
Imagine you have a specific job that needs doing, like “Resize this image to thumbnail size” or “Send a welcome email to this new user.” In Celery, each of these specific jobs is represented by a Task.
A Task is like a job description or a recipe. It contains the exact steps (the code) needed to complete a specific piece of work. You write this recipe once as a Python function, and then you can tell Celery to follow that recipe whenever you need that job done, potentially many times with different inputs (like resizing different images or sending emails to different users).
The key benefit is that you don’t run the recipe immediately yourself. You hand the recipe (the Task) and the ingredients (the arguments, like the image file or the user’s email) over to Celery. Celery then finds an available helper (a Worker) who knows how to follow that specific recipe and lets them do the work in the background. This keeps your main application free to do other things.
Defining Your First Task
Defining a task in Celery is surprisingly simple. You just take a regular Python function and “decorate” it using @app.task
. Remember our app
object from Chapter 1? We use its task
decorator.
Let’s create a file, perhaps named tasks.py
, to hold our task definitions:
# tasks.py
import time
from celery_app import app # Import the app instance we created
@app.task
def add(x, y):
"""A simple task that adds two numbers."""
print(f"Task 'add' starting with ({x}, {y})")
# Simulate some work taking time
time.sleep(5)
result = x + y
print(f"Task 'add' finished with result: {result}")
return result
@app.task
def send_welcome_email(user_id):
"""A task simulating sending a welcome email."""
print(f"Task 'send_welcome_email' starting for user {user_id}")
# Simulate email sending process
time.sleep(3)
print(f"Welcome email supposedly sent to user {user_id}")
return f"Email sent to {user_id}"
# You can have many tasks in one file!
Explanation:
from celery_app import app
: We import theCelery
app instance we configured earlier. This instance holds the knowledge about our broker and backend.@app.task
: This is the magic decorator! When Celery sees this above a function (add
orsend_welcome_email
), it says, “Ah! This isn’t just a regular function; it’s a job description that my workers need to know about.”- The Function (
add
,send_welcome_email
): This is the actual Python code that performs the work. It’s the core of the task – the steps in the recipe. It can take arguments (likex
,y
, oruser_id
) and can return a value. - Registration: The
@app.task
decorator automatically registers this function with our Celeryapp
. Now,app
knows about a task namedtasks.add
and another namedtasks.send_welcome_email
(Celery creates the name frommodule_name.function_name
). Workers connected to thisapp
will be able to find and execute this code when requested.
Self-Host Note: If you are running this code, make sure you have a celery_app.py
file containing your Celery app instance as shown in previous chapters, and that the tasks.py
file can import app
from it.
Sending a Task for Execution
Okay, we’ve written our recipes (add
and send_welcome_email
). How do we tell Celery, “Please run the add
recipe with the numbers 5 and 7”?
We don’t call the function directly like add(5, 7)
. If we did that, it would just run immediately in our current program, which defeats the purpose of using Celery!
Instead, we use special methods on the task object itself, most commonly .delay()
or .apply_async()
.
Let’s try this in a separate Python script or an interactive Python session:
# run_tasks.py
from tasks import add, send_welcome_email
print("Let's send some tasks!")
# --- Using .delay() ---
# Tell Celery to run add(5, 7) in the background
result_promise_add = add.delay(5, 7)
print(f"Sent task add(5, 7). Task ID: {result_promise_add.id}")
# Tell Celery to run send_welcome_email(123) in the background
result_promise_email = send_welcome_email.delay(123)
print(f"Sent task send_welcome_email(123). Task ID: {result_promise_email.id}")
# --- Using .apply_async() ---
# Does the same thing as .delay() but allows more options
result_promise_add_later = add.apply_async(args=(10, 20), countdown=10) # Run after 10s
print(f"Sent task add(10, 20) to run in 10s. Task ID: {result_promise_add_later.id}")
print("Tasks have been sent to the broker!")
print("A Celery worker needs to be running to pick them up.")
Explanation:
from tasks import add, send_welcome_email
: We import our task functions. Because they were decorated with@app.task
, they are now special Celery Task objects.add.delay(5, 7)
: This is the simplest way to send a task.- It doesn’t run
add(5, 7)
right now. - It takes the arguments
(5, 7)
. - It packages them up into a message along with the task’s name (
tasks.add
). - It sends this message to the message broker (like Redis or RabbitMQ) that we configured in our
celery_app.py
. Think of it like dropping a request slip into a mailbox.
- It doesn’t run
send_welcome_email.delay(123)
: Same idea, but for our email task. A message withtasks.send_welcome_email
and the argument123
is sent to the broker.add.apply_async(args=(10, 20), countdown=10)
: This is a more powerful way to send tasks.- It does the same fundamental thing: sends a message to the broker.
- It allows for more options, like
args
(positional arguments as a tuple),kwargs
(keyword arguments as a dict),countdown
(delay execution by seconds),eta
(run at a specific future time), and many others. .delay(*args, **kwargs)
is just a convenient shortcut for.apply_async(args=args, kwargs=kwargs)
.
result_promise_... = ...
: Both.delay()
andapply_async()
return anAsyncResult
object immediately. This is not the actual result of the task (like12
foradd(5, 7)
). It’s more like a receipt or a tracking number (notice the.id
attribute). You can use this object later to check if the task finished and what its result was, but only if you’ve set up a Result Backend (Chapter 6).- The Worker: Sending the task only puts the message on the queue. A separate process, the Celery Worker (Chapter 5), needs to be running. The worker constantly watches the queue, picks up messages, finds the corresponding task function (using the name like
tasks.add
), and executes it with the provided arguments.
How It Works Internally (Simplified)
Let’s trace the journey of defining and sending our add
task:
- Definition (
@app.task
intasks.py
):- Python defines the
add
function. - The
@app.task
decorator sees this function. - It tells the
Celery
instance (app
) about this function, registering it under the nametasks.add
in an internal dictionary (app.tasks
). Theapp
instance knows the broker/backend settings.
- Python defines the
- Sending (
add.delay(5, 7)
inrun_tasks.py
):- You call
.delay()
on theadd
task object. .delay()
(or.apply_async()
) internally uses theapp
the task is bound to.- It asks the
app
for the configured broker URL. - It creates a message containing:
- Task Name:
tasks.add
- Arguments:
(5, 7)
- Other options (like a unique Task ID).
- Task Name:
- It connects to the Broker (e.g., Redis) using the broker URL.
- It sends the message to a specific queue (usually named ‘celery’ by default) on the broker.
- It returns an
AsyncResult
object referencing the Task ID.
- You call
- Waiting: The message sits in the queue on the broker, waiting.
- Execution (by a Worker):
- A separate Celery Worker process is running, connected to the same broker and
app
. - The Worker fetches the message from the queue.
- It reads the task name:
tasks.add
. - It looks up
tasks.add
in its copy of theapp.tasks
registry to find the actualadd
function code. - It calls the
add
function with the arguments from the message:add(5, 7)
. - The function runs (prints logs, sleeps, calculates
12
). - If a Result Backend is configured, the Worker takes the return value (
12
) and stores it in the backend, associated with the Task ID. - The Worker acknowledges the message to the broker, removing it from the queue.
- A separate Celery Worker process is running, connected to the same broker and
sequenceDiagram
participant Client as Your Code (run_tasks.py)
participant TaskDef as @app.task def add()
participant App as Celery App Instance
participant Broker as Message Broker (e.g., Redis)
participant Worker as Celery Worker (separate process)
Note over TaskDef, App: 1. @app.task registers 'add' function with App's task registry
Client->>TaskDef: 2. Call add.delay(5, 7)
TaskDef->>App: 3. Get broker config
App-->>TaskDef: Broker URL
TaskDef->>Broker: 4. Send message ('tasks.add', (5, 7), task_id, ...)
Broker-->>TaskDef: Ack (Message Queued)
TaskDef-->>Client: 5. Return AsyncResult(task_id)
Worker->>Broker: 6. Fetch next message
Broker-->>Worker: Message ('tasks.add', (5, 7), task_id)
Worker->>App: 7. Lookup 'tasks.add' in registry
App-->>Worker: add function code
Worker->>Worker: 8. Execute add(5, 7) -> returns 12
Note over Worker: (Optionally store result in Backend)
Worker->>Broker: 9. Acknowledge message completion
Code Dive: Task Creation and Sending
-
Task Definition (
@app.task
): This decorator is defined incelery/app/base.py
within theCelery
class methodtask
. It ultimately calls_task_from_fun
.# Simplified from celery/app/base.py class Celery: # ... def task(self, *args, **opts): # ... handles decorator arguments ... def _create_task_cls(fun): # Returns a Task instance or a Proxy that creates one later ret = self._task_from_fun(fun, **opts) return ret return _create_task_cls def _task_from_fun(self, fun, name=None, base=None, bind=False, **options): # Generate name like 'tasks.add' if not given name = name or self.gen_task_name(fun.__name__, fun.__module__) base = base or self.Task # The base Task class (from celery.app.task) if name not in self._tasks: # If not already registered... # Dynamically create a Task class wrapping the function task = type(fun.__name__, (base,), { 'app': self, # Link task back to this app instance! 'name': name, 'run': staticmethod(fun), # The actual function to run '__doc__': fun.__doc__, '__module__': fun.__module__, # ... other options ... })() # Instantiate the new Task class self._tasks[task.name] = task # Add to app's registry! task.bind(self) # Perform binding steps else: task = self._tasks[name] # Task already exists return task
This shows how the decorator essentially creates a specialized object (an instance of a class derived from
celery.app.task.Task
) that wraps your original function and registers it with theapp
under a specific name. -
Task Sending (
.delay
): The.delay()
method is defined on theTask
class itself incelery/app/task.py
. It’s a simple shortcut.# Simplified from celery/app/task.py class Task: # ... def delay(self, *args, **kwargs): """Shortcut for apply_async(args, kwargs)""" return self.apply_async(args, kwargs) def apply_async(self, args=None, kwargs=None, ..., **options): # ... argument checking, option processing ... # Get the app associated with this task instance app = self._get_app() # If always_eager is set, run locally instead of sending if app.conf.task_always_eager: return self.apply(args, kwargs, ...) # Runs inline # The main path: tell the app to send the task message return app.send_task( self.name, args, kwargs, task_type=self, **options # Includes things like countdown, eta, queue etc. )
You can see how
.delay
just calls.apply_async
, which then (usually) delegates the actual message sending to theapp.send_task
method we saw briefly in Chapter 1. Theapp
uses its configuration to know how and where to send the message.
Conclusion
You’ve learned the core concept of a Celery Task:
- It represents a single, well-defined unit of work or job description.
- You define a task by decorating a normal Python function with
@app.task
. This registers the task with your Celery application. - You send a task request (not run it directly) using
.delay()
or.apply_async()
. - Sending a task puts a message onto a queue managed by a message broker.
- A separate Worker process picks up the message and executes the corresponding task function.
Tasks are the fundamental building blocks of work in Celery. Now that you know how to define a task and request its execution, let’s look more closely at the crucial component that handles passing these requests around: the message broker.
Next: Chapter 4: Broker Connection (AMQP)
Generated by AI Codebase Knowledge Builder