This document describes an older version of Celery (2.1). For the latest stable version please go here.

Defining Tasks - celery.task.base

class celery.task.base.Task

A celery task.

All subclasses of Task must define the run() method, which is the actual method the celery daemon executes.

The run() method can take use of the default keyword arguments, as listed in the run() documentation.

The resulting class is callable, which if called will apply the run() method.

name

Name of the task.

abstract

If True the task is an abstract base class.

type

The type of task, currently this can be regular, or periodic, however if you want a periodic task, you should subclass PeriodicTask instead.

queue

Select a destination queue for this task. The queue needs to exist in CELERY_QUEUES. The routing_key, exchange and exchange_type attributes will be ignored if this is set.

routing_key

Override the global default routing_key for this task.

exchange

Override the global default exchange for this task.

exchange_type

Override the global default exchange type for this task.

delivery_mode

Override the global default delivery mode for this task. By default this is set to 2 (persistent). You can change this to 1 to get non-persistent behavior, which means the messages are lost if the broker is restarted.

mandatory

Mandatory message routing. An exception will be raised if the task can’t be routed to a queue.

immediate:

Request immediate delivery. An exception will be raised if the task can’t be routed to a worker immediately.

priority:

The message priority. A number from 0 to 9, where 0 is the highest. Note that RabbitMQ doesn’t support priorities yet.

max_retries

Maximum number of retries before giving up. If set to None, it will never stop retrying.

default_retry_delay

Default time in seconds before a retry of the task should be executed. Default is a 3 minute delay.

rate_limit

Set the rate limit for this task type, Examples: None (no rate limit), "100/s" (hundred tasks a second), "100/m" (hundred tasks a minute), "100/h" (hundred tasks an hour)

ignore_result

Don’t store the return value of this task.

store_errors_even_if_ignored

If true, errors will be stored even if the task is configured to ignore results.

send_error_emails

If true, an e-mail will be sent to the admins whenever a task of this type raises an exception.

error_whitelist

List of exception types to send error e-mails for.

serializer

The name of a serializer that has been registered with carrot.serialization.registry. Example: "json".

backend

The result store backend used for this task.

autoregister

If True the task is automatically registered in the task registry, which is the default behaviour.

track_started

If True the task will report its status as “started” when the task is executed by a worker. The default value is False as the normal behaviour is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried. Having a “started” status can be useful for when there are long running tasks and there is a need to report which task is currently running.

The global default can be overridden by the CELERY_TRACK_STARTED setting.

acks_late

If set to True messages for this task will be acknowledged after the task has been executed, not just before, which is the default behavior.

Note that this means the task may be executed twice if the worker crashes in the middle of execution, which may be acceptable for some applications.

The global default can be overriden by the CELERY_ACKS_LATE setting.

expires

Default task expiry time in seconds or a datetime.

classmethod AsyncResult(task_id)

Get AsyncResult instance for this kind of task.

Parameters:task_id – Task id to get result for.
exception MaxRetriesExceededError

The tasks max restart limit has been exceeded.

Task.after_return(status, retval, task_id, args, kwargs, einfo=None)

Handler called after the task returns.

Parameters:
  • status – Current task state.
  • retval – Task return value/exception.
  • task_id – Unique id of the task.
  • args – Original arguments for the task that failed.
  • kwargs – Original keyword arguments for the task that failed.
  • einfoExceptionInfo instance, containing the traceback (if any).

The return value of this handler is ignored.

classmethod Task.apply(args=None, kwargs=None, **options)

Execute this task locally, by blocking until the task has finished executing.

Parameters:
  • args – positional arguments passed on to the task.
  • kwargs – keyword arguments passed on to the task.
  • throw – Re-raise task exceptions. Defaults to the CELERY_EAGER_PROPAGATES_EXCEPTIONS setting.

:rtype celery.result.EagerResult:

See celery.execute.apply().

classmethod Task.apply_async(args=None, kwargs=None, **options)

Delay this task for execution by the celery daemon(s).

Parameters:
  • args – positional arguments passed on to the task.
  • kwargs – keyword arguments passed on to the task.
  • **options – Any keyword arguments to pass on to celery.execute.apply_async().

See celery.execute.apply_async() for more information.

:returns celery.result.AsyncResult:

classmethod Task.delay(*args, **kwargs)

Shortcut to apply_async(), with star arguments, but doesn’t support the extra options.

Parameters:
  • *args – positional arguments passed on to the task.
  • **kwargs – keyword arguments passed on to the task.

:returns celery.result.AsyncResult:

classmethod Task.establish_connection(connect_timeout=4)

Establish a connection to the message broker.

Task.execute(wrapper, pool, loglevel, logfile)

The method the worker calls to execute the task.

Parameters:
  • wrapper – A TaskRequest.
  • pool – A task pool.
  • loglevel – Current loglevel.
  • logfile – Name of the currently used logfile.
classmethod Task.get_consumer(connection=None, connect_timeout=4)

Get a celery task message consumer.

:rtype celery.messaging.TaskConsumer:

Please be sure to close the AMQP connection when you’re done with this object. i.e.:

>>> consumer = self.get_consumer()
>>> # do something with consumer
>>> consumer.connection.close()
classmethod Task.get_logger(loglevel=None, logfile=None, propagate=False, **kwargs)

Get task-aware logger object.

See celery.log.setup_task_logger().

classmethod Task.get_publisher(connection=None, exchange=None, connect_timeout=4, exchange_type=None)

Get a celery task message publisher.

:rtype celery.messaging.TaskPublisher:

Please be sure to close the AMQP connection when you’re done with this object, i.e.:

>>> publisher = self.get_publisher()
>>> # do something with publisher
>>> publisher.connection.close()
Task.on_failure(exc, task_id, args, kwargs, einfo=None)

Error handler.

This is run by the worker when the task fails.

Parameters:
  • exc – The exception raised by the task.
  • task_id – Unique id of the failed task.
  • args – Original arguments for the task that failed.
  • kwargs – Original keyword arguments for the task that failed.
  • einfoExceptionInfo instance, containing the traceback.

The return value of this handler is ignored.

Task.on_retry(exc, task_id, args, kwargs, einfo=None)

Retry handler.

This is run by the worker when the task is to be retried.

Parameters:
  • exc – The exception sent to retry().
  • task_id – Unique id of the retried task.
  • args – Original arguments for the retried task.
  • kwargs – Original keyword arguments for the retried task.
  • einfoExceptionInfo instance, containing the traceback.

The return value of this handler is ignored.

Task.on_success(retval, task_id, args, kwargs)

Success handler.

Run by the worker if the task executes successfully.

Parameters:
  • retval – The return value of the task.
  • task_id – Unique id of the executed task.
  • args – Original arguments for the executed task.
  • kwargs – Original keyword arguments for the executed task.

The return value of this handler is ignored.

classmethod Task.retry(args=None, kwargs=None, exc=None, throw=True, **options)

Retry the task.

Parameters:
  • args – Positional arguments to retry with.
  • kwargs – Keyword arguments to retry with.
  • exc – Optional exception to raise instead of MaxRetriesExceededError when the max restart limit has been exceeded.
  • countdown – Time in seconds to delay the retry for.
  • eta – Explicit time and date to run the retry at (must be a datetime.datetime instance).
  • **options – Any extra options to pass on to meth:apply_async. See celery.execute.apply_async().
  • throw – If this is False, do not raise the RetryTaskError exception, that tells the worker to mark the task as being retried. Note that this means the task will be marked as failed if the task raises an exception, or successful if it returns.
Raises celery.exceptions.RetryTaskError:
 

To tell the worker that the task has been re-sent for retry. This always happens, unless the throw keyword argument has been explicitly set to False, and is considered normal operation.

Example

>>> class TwitterPostStatusTask(Task):
...
...     def run(self, username, password, message, **kwargs):
...         twitter = Twitter(username, password)
...         try:
...             twitter.post_status(message)
...         except twitter.FailWhale, exc:
...             # Retry in 5 minutes.
...             self.retry([username, password, message], kwargs,
...                        countdown=60 * 5, exc=exc)
Task.run(*args, **kwargs)

The body of the task executed by the worker.

The following standard keyword arguments are reserved and is passed by the worker if the function/method supports them:

  • task_id
  • task_name
  • task_retries
  • task_is_eager
  • logfile
  • loglevel
  • delivery_info

Additional standard keyword arguments may be added in the future. To take these default arguments, the task can either list the ones it wants explicitly or just take an arbitrary list of keyword arguments (**kwargs).

classmethod Task.subtask(*args, **kwargs)

Returns a subtask object for this task that wraps arguments and execution options for a single task invocation.

Task.update_state(task_id, state, meta=None)

Update task state.

Parameters:
  • task_id – Id of the task to update.
  • state – New state (str).
  • meta – State metadata (dict).
class celery.task.base.PeriodicTask

A periodic task is a task that behaves like a cron job.

Results of periodic tasks are not stored by default.

run_every

REQUIRED Defines how often the task is run (its interval), it can be a timedelta object, a crontab object or an integer specifying the time in seconds.

relative

If set to True, run times are relative to the time when the server was started. This was the previous behaviour, periodic tasks are now scheduled by the clock.

Raises NotImplementedError:
 if the run_every attribute is not defined.

Example

>>> from celery.task import tasks, PeriodicTask
>>> from datetime import timedelta
>>> class EveryThirtySecondsTask(PeriodicTask):
...     run_every = timedelta(seconds=30)
...
...     def run(self, **kwargs):
...         logger = self.get_logger(**kwargs)
...         logger.info("Execute every 30 seconds")
>>> from celery.task import PeriodicTask
>>> from celery.task.schedules import crontab
>>> class EveryMondayMorningTask(PeriodicTask):
...     run_every = crontab(hour=7, minute=30, day_of_week=1)
...
...     def run(self, **kwargs):
...         logger = self.get_logger(**kwargs)
...         logger.info("Execute every Monday at 7:30AM.")
>>> class EveryMorningTask(PeriodicTask):
...     run_every = crontab(hours=7, minute=30)
...
...     def run(self, **kwargs):
...         logger = self.get_logger(**kwargs)
...         logger.info("Execute every day at 7:30AM.")
>>> class EveryQuarterPastTheHourTask(PeriodicTask):
...     run_every = crontab(minute=15)
...
...     def run(self, **kwargs):
...         logger = self.get_logger(**kwargs)
...         logger.info("Execute every 0:15 past the hour every day.")
is_due(last_run_at)

Returns tuple of two items (is_due, next_time_to_run), where next time to run is in seconds.

See celery.schedules.schedule.is_due() for more information.

remaining_estimate(last_run_at)

Returns when the periodic task should run next as a timedelta.

timedelta_seconds(delta)

Convert timedelta to seconds.

Doesn’t account for negative timedeltas.

class celery.task.base.TaskType

Metaclass for tasks.

Automatically registers the task in the task registry, except if the abstract attribute is set.

If no name attribute is provided, the name is automatically set to the name of the module it was defined in, and the class name.

Previous topic

Task Decorators - celery.decorators

Next topic

Task Sets, Subtasks and Callbacks - celery.task.sets

This Page