This document describes an older version of Celery (2.1). For the latest stable version please go here.

Task Information and Utilities - celery.task

Working with tasks and task sets.

class celery.task.Task

A celery task.

All subclasses of Task must define the run() method, which is the actual method the celery daemon executes.

The run() method can take use of the default keyword arguments, as listed in the run() documentation.

The resulting class is callable, which if called will apply the run() method.

name

Name of the task.

abstract

If True the task is an abstract base class.

type

The type of task, currently this can be regular, or periodic, however if you want a periodic task, you should subclass PeriodicTask instead.

queue

Select a destination queue for this task. The queue needs to exist in CELERY_QUEUES. The routing_key, exchange and exchange_type attributes will be ignored if this is set.

routing_key

Override the global default routing_key for this task.

exchange

Override the global default exchange for this task.

exchange_type

Override the global default exchange type for this task.

delivery_mode

Override the global default delivery mode for this task. By default this is set to 2 (persistent). You can change this to 1 to get non-persistent behavior, which means the messages are lost if the broker is restarted.

mandatory

Mandatory message routing. An exception will be raised if the task can’t be routed to a queue.

immediate:

Request immediate delivery. An exception will be raised if the task can’t be routed to a worker immediately.

priority:

The message priority. A number from 0 to 9, where 0 is the highest. Note that RabbitMQ doesn’t support priorities yet.

max_retries

Maximum number of retries before giving up. If set to None, it will never stop retrying.

default_retry_delay

Default time in seconds before a retry of the task should be executed. Default is a 3 minute delay.

rate_limit

Set the rate limit for this task type, Examples: None (no rate limit), "100/s" (hundred tasks a second), "100/m" (hundred tasks a minute), "100/h" (hundred tasks an hour)

ignore_result

Don’t store the return value of this task.

store_errors_even_if_ignored

If true, errors will be stored even if the task is configured to ignore results.

send_error_emails

If true, an e-mail will be sent to the admins whenever a task of this type raises an exception.

error_whitelist

List of exception types to send error e-mails for.

serializer

The name of a serializer that has been registered with carrot.serialization.registry. Example: "json".

backend

The result store backend used for this task.

autoregister

If True the task is automatically registered in the task registry, which is the default behaviour.

track_started

If True the task will report its status as “started” when the task is executed by a worker. The default value is False as the normal behaviour is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried. Having a “started” status can be useful for when there are long running tasks and there is a need to report which task is currently running.

The global default can be overridden by the CELERY_TRACK_STARTED setting.

acks_late

If set to True messages for this task will be acknowledged after the task has been executed, not just before, which is the default behavior.

Note that this means the task may be executed twice if the worker crashes in the middle of execution, which may be acceptable for some applications.

The global default can be overriden by the CELERY_ACKS_LATE setting.

expires

Default task expiry time in seconds or a datetime.

classmethod AsyncResult(task_id)

Get AsyncResult instance for this kind of task.

Parameters:task_id – Task id to get result for.
exception MaxRetriesExceededError

The tasks max restart limit has been exceeded.

Task.acks_late = False
Task.after_return(status, retval, task_id, args, kwargs, einfo=None)

Handler called after the task returns.

Parameters:
  • status – Current task state.
  • retval – Task return value/exception.
  • task_id – Unique id of the task.
  • args – Original arguments for the task that failed.
  • kwargs – Original keyword arguments for the task that failed.
  • einfoExceptionInfo instance, containing the traceback (if any).

The return value of this handler is ignored.

classmethod Task.apply(args=None, kwargs=None, **options)

Execute this task locally, by blocking until the task has finished executing.

Parameters:
  • args – positional arguments passed on to the task.
  • kwargs – keyword arguments passed on to the task.
  • throw – Re-raise task exceptions. Defaults to the CELERY_EAGER_PROPAGATES_EXCEPTIONS setting.

:rtype celery.result.EagerResult:

See celery.execute.apply().

classmethod Task.apply_async(args=None, kwargs=None, **options)

Delay this task for execution by the celery daemon(s).

Parameters:
  • args – positional arguments passed on to the task.
  • kwargs – keyword arguments passed on to the task.
  • **options – Any keyword arguments to pass on to celery.execute.apply_async().

See celery.execute.apply_async() for more information.

:returns celery.result.AsyncResult:

Task.autoregister = True
Task.backend = <celery.backends.amqp.AMQPBackend object at 0x49aa950>
Task.default_retry_delay = 180
classmethod Task.delay(*args, **kwargs)

Shortcut to apply_async(), with star arguments, but doesn’t support the extra options.

Parameters:
  • *args – positional arguments passed on to the task.
  • **kwargs – keyword arguments passed on to the task.

:returns celery.result.AsyncResult:

Task.delivery_mode = 2
Task.disable_error_emails = False
Task.error_whitelist = ()
classmethod Task.establish_connection(connect_timeout=4)

Establish a connection to the message broker.

Task.exchange = None
Task.exchange_type = 'direct'
Task.execute(wrapper, pool, loglevel, logfile)

The method the worker calls to execute the task.

Parameters:
  • wrapper – A TaskRequest.
  • pool – A task pool.
  • loglevel – Current loglevel.
  • logfile – Name of the currently used logfile.
Task.expires = None
classmethod Task.get_consumer(connection=None, connect_timeout=4)

Get a celery task message consumer.

:rtype celery.messaging.TaskConsumer:

Please be sure to close the AMQP connection when you’re done with this object. i.e.:

>>> consumer = self.get_consumer()
>>> # do something with consumer
>>> consumer.connection.close()
classmethod Task.get_logger(loglevel=None, logfile=None, propagate=False, **kwargs)

Get task-aware logger object.

See celery.log.setup_task_logger().

classmethod Task.get_publisher(connection=None, exchange=None, connect_timeout=4, exchange_type=None)

Get a celery task message publisher.

:rtype celery.messaging.TaskPublisher:

Please be sure to close the AMQP connection when you’re done with this object, i.e.:

>>> publisher = self.get_publisher()
>>> # do something with publisher
>>> publisher.connection.close()
Task.ignore_result = False
Task.immediate = False
Task.mandatory = False
Task.max_retries = 5
Task.name = None
Task.on_failure(exc, task_id, args, kwargs, einfo=None)

Error handler.

This is run by the worker when the task fails.

Parameters:
  • exc – The exception raised by the task.
  • task_id – Unique id of the failed task.
  • args – Original arguments for the task that failed.
  • kwargs – Original keyword arguments for the task that failed.
  • einfoExceptionInfo instance, containing the traceback.

The return value of this handler is ignored.

Task.on_retry(exc, task_id, args, kwargs, einfo=None)

Retry handler.

This is run by the worker when the task is to be retried.

Parameters:
  • exc – The exception sent to retry().
  • task_id – Unique id of the retried task.
  • args – Original arguments for the retried task.
  • kwargs – Original keyword arguments for the retried task.
  • einfoExceptionInfo instance, containing the traceback.

The return value of this handler is ignored.

Task.on_success(retval, task_id, args, kwargs)

Success handler.

Run by the worker if the task executes successfully.

Parameters:
  • retval – The return value of the task.
  • task_id – Unique id of the executed task.
  • args – Original arguments for the executed task.
  • kwargs – Original keyword arguments for the executed task.

The return value of this handler is ignored.

Task.priority = None
Task.queue = None
Task.rate_limit = None
classmethod Task.retry(args=None, kwargs=None, exc=None, throw=True, **options)

Retry the task.

Parameters:
  • args – Positional arguments to retry with.
  • kwargs – Keyword arguments to retry with.
  • exc – Optional exception to raise instead of MaxRetriesExceededError when the max restart limit has been exceeded.
  • countdown – Time in seconds to delay the retry for.
  • eta – Explicit time and date to run the retry at (must be a datetime.datetime instance).
  • **options – Any extra options to pass on to meth:apply_async. See celery.execute.apply_async().
  • throw – If this is False, do not raise the RetryTaskError exception, that tells the worker to mark the task as being retried. Note that this means the task will be marked as failed if the task raises an exception, or successful if it returns.
Raises celery.exceptions.RetryTaskError:
 

To tell the worker that the task has been re-sent for retry. This always happens, unless the throw keyword argument has been explicitly set to False, and is considered normal operation.

Example

>>> class TwitterPostStatusTask(Task):
...
...     def run(self, username, password, message, **kwargs):
...         twitter = Twitter(username, password)
...         try:
...             twitter.post_status(message)
...         except twitter.FailWhale, exc:
...             # Retry in 5 minutes.
...             self.retry([username, password, message], kwargs,
...                        countdown=60 * 5, exc=exc)
Task.routing_key = None
Task.run(*args, **kwargs)

The body of the task executed by the worker.

The following standard keyword arguments are reserved and is passed by the worker if the function/method supports them:

  • task_id
  • task_name
  • task_retries
  • task_is_eager
  • logfile
  • loglevel
  • delivery_info

Additional standard keyword arguments may be added in the future. To take these default arguments, the task can either list the ones it wants explicitly or just take an arbitrary list of keyword arguments (**kwargs).

Task.send_error_emails = False
Task.serializer = 'pickle'
Task.store_errors_even_if_ignored = False
classmethod Task.subtask(*args, **kwargs)

Returns a subtask object for this task that wraps arguments and execution options for a single task invocation.

Task.track_started = False
Task.type = 'regular'
Task.update_state(task_id, state, meta=None)

Update task state.

Parameters:
  • task_id – Id of the task to update.
  • state – New state (str).
  • meta – State metadata (dict).
class celery.task.TaskSet(task=None, tasks=None)

A task containing several subtasks, making it possible to track how many, or when all of the tasks has been completed.

Parameters:tasks – A list of subtask instances.
total

Total number of subtasks in this task set.

Example:

>>> from djangofeeds.tasks import RefreshFeedTask
>>> from celery.task.sets import TaskSet, subtask
>>> urls = ("http://cnn.com/rss",
...         "http://bbc.co.uk/rss",
...         "http://xkcd.com/rss")
>>> subtasks = [RefreshFeedTask.subtask(kwargs={"feed_url": url})
...                 for url in urls]
>>> taskset = TaskSet(tasks=subtasks)
>>> taskset_result = taskset.apply_async()
>>> list_of_return_values = taskset_result.join()
Publisher

alias of TaskPublisher

apply()

Applies the taskset locally.

apply_async(*args, **kwargs)

Run all tasks in the taskset.

Returns a celery.result.TaskSetResult instance.

Example

>>> ts = TaskSet(tasks=(
...         RefreshFeedTask.subtask(["http://foo.com/rss"]),
...         RefreshFeedTask.subtask(["http://bar.com/rss"]),
... ))
>>> result = ts.apply_async()
>>> result.taskset_id
"d2c9b261-8eff-4bfb-8459-1e1b72063514"
>>> result.subtask_ids
["b4996460-d959-49c8-aeb9-39c530dcde25",
"598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
>>> result.waiting()
True
>>> time.sleep(10)
>>> result.ready()
True
>>> result.successful()
True
>>> result.failed()
False
>>> result.join()
[True, True]
task
task_name
tasks
class celery.task.PeriodicTask

A periodic task is a task that behaves like a cron job.

Results of periodic tasks are not stored by default.

run_every

REQUIRED Defines how often the task is run (its interval), it can be a timedelta object, a crontab object or an integer specifying the time in seconds.

relative

If set to True, run times are relative to the time when the server was started. This was the previous behaviour, periodic tasks are now scheduled by the clock.

Raises NotImplementedError:
 if the run_every attribute is not defined.

Example

>>> from celery.task import tasks, PeriodicTask
>>> from datetime import timedelta
>>> class EveryThirtySecondsTask(PeriodicTask):
...     run_every = timedelta(seconds=30)
...
...     def run(self, **kwargs):
...         logger = self.get_logger(**kwargs)
...         logger.info("Execute every 30 seconds")
>>> from celery.task import PeriodicTask
>>> from celery.task.schedules import crontab
>>> class EveryMondayMorningTask(PeriodicTask):
...     run_every = crontab(hour=7, minute=30, day_of_week=1)
...
...     def run(self, **kwargs):
...         logger = self.get_logger(**kwargs)
...         logger.info("Execute every Monday at 7:30AM.")
>>> class EveryMorningTask(PeriodicTask):
...     run_every = crontab(hours=7, minute=30)
...
...     def run(self, **kwargs):
...         logger = self.get_logger(**kwargs)
...         logger.info("Execute every day at 7:30AM.")
>>> class EveryQuarterPastTheHourTask(PeriodicTask):
...     run_every = crontab(minute=15)
...
...     def run(self, **kwargs):
...         logger = self.get_logger(**kwargs)
...         logger.info("Execute every 0:15 past the hour every day.")
ignore_result = True
is_due(last_run_at)

Returns tuple of two items (is_due, next_time_to_run), where next time to run is in seconds.

See celery.schedules.schedule.is_due() for more information.

relative = False
remaining_estimate(last_run_at)

Returns when the periodic task should run next as a timedelta.

timedelta_seconds(delta)

Convert timedelta to seconds.

Doesn’t account for negative timedeltas.

type = 'periodic'
celery.task.discard_all(*args, **kwargs)

Discard all waiting tasks.

This will ignore all tasks waiting for execution, and they will be deleted from the messaging server.

Returns:the number of tasks discarded.
celery.task.dmap(fun, args, timeout=None)

Distribute processing of the arguments and collect the results.

Example

>>> from celery.task import dmap
>>> import operator
>>> dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
[4, 8, 16]
celery.task.dmap_async(fun, args, timeout=None)

Distribute processing of the arguments and collect the results asynchronously.

:returns celery.result.AsyncResult:

Example

>>> from celery.task import dmap_async
>>> import operator
>>> presult = dmap_async(operator.add, [[2, 2], [4, 4], [8, 8]])
>>> presult
<AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
>>> presult.status
'SUCCESS'
>>> presult.result
[4, 8, 16]
celery.task.execute_remote(fun, *args, **kwargs)

Execute arbitrary function/object remotely.

Parameters:
  • fun – A callable function or object.
  • *args – Positional arguments to apply to the function.
  • **kwargs – Keyword arguments to apply to the function.

The object must be picklable, so you can’t use lambdas or functions defined in the REPL (the objects must have an associated module).

:returns class:celery.result.AsyncResult:

celery.task.ping()

Test if the server is alive.

Example:

>>> from celery.task import ping
>>> ping()
'pong'
class celery.task.HttpDispatchTask

Task dispatching to an URL.

Parameters:
  • url – The URL location of the HTTP callback task.
  • method – Method to use when dispatching the callback. Usually GET or POST.
  • **kwargs – Keyword arguments to pass on to the HTTP callback.
url

If this is set, this is used as the default URL for requests. Default is to require the user of the task to supply the url as an argument, as this attribute is intended for subclasses.

method

If this is set, this is the default method used for requests. Default is to require the user of the task to supply the method as an argument, as this attribute is intended for subclasses.

method = None
name = 'celery.task.http.HttpDispatchTask'
run(url=None, method='GET', **kwargs)
url = None

Previous topic

Task Result - celery.result

Next topic

Configuration - celery.conf

This Page