This document describes an older version of Celery (2.2). For the latest stable version please go here.

Sets of tasks, Subtasks and Callbacks

Subtasks

New in version 2.0.

The subtask type is used to wrap the arguments and execution options for a single task invocation:

subtask(task_name_or_cls, args, kwargs, options)

For convenience every task also has a shortcut to create subtasks:

task.subtask(args, kwargs, options)

subtask is actually a dict subclass, which means it can be serialized with JSON or other encodings that doesn’t support complex Python objects.

Also it can be regarded as a type, as the following usage works:

>>> s = subtask("tasks.add", args=(2, 2), kwargs={})

>>> subtask(dict(s))  # coerce dict into subtask

This makes it excellent as a means to pass callbacks around to tasks.

Callbacks

Let’s improve our add task so it can accept a callback that takes the result as an argument:

from celery.task import task
from celery.task.sets import subtask

@task
def add(x, y, callback=None):
    result = x + y
    if callback is not None:
        subtask(callback).delay(result)
    return result

subtask also knows how it should be applied, asynchronously by delay(), and eagerly by apply().

The best thing is that any arguments you add to subtask.delay, will be prepended to the arguments specified by the subtask itself!

If you have the subtask:

>>> add.subtask(args=(10, ))

subtask.delay(result) becomes:

>>> add.apply_async(args=(result, 10))

...

Now let’s execute our new add task with a callback:

>>> add.delay(2, 2, callback=add.subtask((8, )))

As expected this will first launch one task calculating 2 + 2, then another task calculating 4 + 8.

Task Sets

The TaskSet enables easy invocation of several tasks at once, and is then able to join the results in the same order as the tasks were invoked.

A task set takes a list of subtask‘s:

>>> from celery.task.sets import TaskSet
>>> from tasks import add

>>> job = TaskSet(tasks=[
...             add.subtask((4, 4)),
...             add.subtask((8, 8)),
...             add.subtask((16, 16)),
...             add.subtask((32, 32)),
... ])

>>> result = job.apply_async()

>>> result.ready()  # have all subtasks completed?
True
>>> result.successful() # were all subtasks successful?
True
>>> result.join()
[4, 8, 16, 32, 64]

Results

When a TaskSet is applied it returns a TaskSetResult object.

TaskSetResult takes a list of AsyncResult instances and operates on them as if it was a single task.

It supports the following operations:

  • successful()

    Returns True if all of the subtasks finished successfully (e.g. did not raise an exception).

  • failed()

    Returns True if any of the subtasks failed.

  • waiting()

    Returns True if any of the subtasks is not ready yet.

  • ready()

    Return True if all of the subtasks are ready.

  • completed_count()

    Returns the number of completed subtasks.

  • revoke()

    Revokes all of the subtasks.

  • iterate()

    Iterates over the return values of the subtasks as they finish, one by one.

  • join()

    Gather the results for all of the subtasks and return a list with them ordered by the order of which they were called.

Task set callbacks

Simple, but may take a long time before your callback is called:

from celery import current_app
from celery.task import subtask

def join_taskset(setid, subtasks, callback, interval=15, max_retries=None):
    result = TaskSetResult(setid, subtasks)
    if result.ready():
        return subtask(callback).delay(result.join())
    join_taskset.retry(countdown=interval, max_retries=max_retries)

Using Redis and atomic counters:

from celery import current_app
from celery.task import Task, TaskSet
from celery.result import TaskSetResult
from celery.utils import gen_unique_id, cached_property
from redis import Redis
from time import sleep

class supports_taskset_callback(Task):
    abstract = True
    accept_magic_kwargs = False

    def after_return(self, \*args, \*\*kwargs):
        if self.request.taskset:
            callback = self.request.kwargs.get("callback")
            if callback:
                setid = self.request.taskset
                # task set must be saved in advance, so the task doesn't
                # try to restore it before that happens.  This is why we
                # use the `apply_presaved_taskset` below.
                result = TaskSetResult.restore(setid)
                current = self.redis.incr("taskset-" + setid)
                if current >= result.total:
                    r = subtask(callback).delay(result.join())

    @cached_property
    def redis(self):
        return Redis(host="localhost", port=6379)

@task(base=supports_taskset_callback)
def add(x, y, \*\*kwargs):
    return x + y

@task
def sum_of(numbers):
    print("TASKSET READY: %r" % (sum(numbers), ))

def apply_presaved_taskset(tasks):
    r = []
    setid = gen_unique_id()
    for task in tasks:
        uuid = gen_unique_id()
        task.options["task_id"] = uuid
        r.append((task, current_app.AsyncResult(uuid)))
    ts = current_app.TaskSetResult(setid, [task[1] for task in r])
    ts.save()
    return TaskSet(task[0] for task in r).apply_async(taskset_id=setid)


# sum of 100 add tasks
result = apply_presaved_taskset(
            add.subtask((i, i), {"callback": sum_of.subtask()})
                for i in xrange(100))

Previous topic

Periodic Tasks

Next topic

HTTP Callback Tasks (Webhooks)

This Page