This document describes the current stable version of Celery (4.0). For development docs, go here.

celery.app.control

Worker Remote Control Client.

Client for worker remote control commands. Server implementation is in celery.worker.control.

class celery.app.control.Inspect(destination=None, timeout=1.0, callback=None, connection=None, app=None, limit=None)[source]

API for app.control.inspect.

active(safe=None)[source]
active_queues()[source]
app = None
clock()[source]
conf(with_defaults=False)[source]
hello(from_node, revoked=None)[source]
memdump(samples=10)[source]
memsample()[source]
objgraph(type=u'Request', n=200, max_depth=10)[source]
ping(destination=None)[source]
query_task(*ids)[source]
registered(*taskinfoitems)[source]
registered_tasks(*taskinfoitems)
report()[source]
reserved(safe=None)[source]
revoked()[source]
scheduled(safe=None)[source]
stats()[source]
class celery.app.control.Control(app=None)[source]

Worker remote control client.

class Mailbox(namespace, type=u'direct', connection=None, clock=None, accept=None, serializer=None, producer_pool=None, queue_ttl=None, queue_expires=None, reply_queue_ttl=None, reply_queue_expires=10.0)

Process Mailbox.

Node(hostname=None, state=None, channel=None, handlers=None)
abcast(command, kwargs={})
accept = [u'json']
call(destination, command, kwargs={}, timeout=None, callback=None, channel=None)
cast(destination, command, kwargs={})
connection = None
exchange = None
exchange_fmt = u'%s.pidbox'
get_queue(hostname)
get_reply_queue()
multi_call(command, kwargs={}, timeout=1, limit=None, callback=None, channel=None)
namespace = None
node_cls

alias of Node

oid
producer_or_acquire(*args, **kwds)
producer_pool
reply_exchange = None
reply_exchange_fmt = u'reply.%s.pidbox'
reply_queue
serializer = None
type = u'direct'
Control.add_consumer(queue, exchange=None, exchange_type=u'direct', routing_key=None, options=None, destination=None, **kwargs)[source]

Tell all (or specific) workers to start consuming from a new queue.

Only the queue name is required as if only the queue is specified then the exchange/routing key will be set to the same name ( like automatic queues do).

Note

This command does not respect the default queue/exchange options in the configuration.

Parameters:
  • queue (str) – Name of queue to start consuming from.
  • exchange (str) – Optional name of exchange.
  • exchange_type (str) – Type of exchange (defaults to ‘direct’) command to, when empty broadcast to all workers.
  • routing_key (str) – Optional routing key.
  • options (Dict) – Additional options as supported by kombu.entitiy.Queue.from_dict().

See also

broadcast() for supported keyword arguments.

Control.autoscale(max, min, destination=None, **kwargs)[source]

Change worker(s) autoscale setting.

See also

Supports the same arguments as broadcast().

Control.broadcast(command, arguments=None, destination=None, connection=None, reply=False, timeout=1.0, limit=None, callback=None, channel=None, **extra_kwargs)[source]

Broadcast a control command to the celery workers.

Parameters:
  • command (str) – Name of command to send.
  • arguments (Dict) – Keyword arguments for the command.
  • destination (List) – If set, a list of the hosts to send the command to, when empty broadcast to all workers.
  • connection (kombu.Connection) – Custom broker connection to use, if not set, a connection will be acquired from the pool.
  • reply (bool) – Wait for and return the reply.
  • timeout (float) – Timeout in seconds to wait for the reply.
  • limit (int) – Limit number of replies.
  • callback (Callable) – Callback called immediately for each reply received.
Control.cancel_consumer(queue, destination=None, **kwargs)[source]

Tell all (or specific) workers to stop consuming from queue.

See also

Supports the same arguments as broadcast().

Control.disable_events(destination=None, **kwargs)[source]

Tell all (or specific) workers to disable events.

See also

Supports the same arguments as broadcast().

Control.discard_all(connection=None)

Discard all waiting tasks.

This will ignore all tasks waiting for execution, and they will be deleted from the messaging server.

Parameters:connection (kombu.Connection) – Optional specific connection instance to use. If not provided a connection will be acquired from the connection pool.
Returns:the number of tasks discarded.
Return type:int
Control.election(id, topic, action=None, connection=None)[source]
Control.enable_events(destination=None, **kwargs)[source]

Tell all (or specific) workers to enable events.

See also

Supports the same arguments as broadcast().

Control.heartbeat(destination=None, **kwargs)[source]

Tell worker(s) to send a heartbeat immediately.

See also

Supports the same arguments as broadcast()

Control.inspect[source]
Control.ping(destination=None, timeout=1.0, **kwargs)[source]

Ping all (or specific) workers.

Returns:List of {'hostname': reply} dictionaries.
Return type:List[Dict]

See also

broadcast() for supported keyword arguments.

Control.pool_grow(n=1, destination=None, **kwargs)[source]

Tell all (or specific) workers to grow the pool by n.

See also

Supports the same arguments as broadcast().

Control.pool_restart(modules=None, reload=False, reloader=None, destination=None, **kwargs)[source]

Restart the execution pools of all or specific workers.

Keyword Arguments:
 
  • modules (Sequence[str]) – List of modules to reload.
  • reload (bool) – Flag to enable module reloading. Default is False.
  • reloader (Any) – Function to reload a module.
  • destination (Sequence[str]) – List of worker names to send this command to.

See also

Supports the same arguments as broadcast()

Control.pool_shrink(n=1, destination=None, **kwargs)[source]

Tell all (or specific) workers to shrink the pool by n.

See also

Supports the same arguments as broadcast().

Control.purge(connection=None)[source]

Discard all waiting tasks.

This will ignore all tasks waiting for execution, and they will be deleted from the messaging server.

Parameters:connection (kombu.Connection) – Optional specific connection instance to use. If not provided a connection will be acquired from the connection pool.
Returns:the number of tasks discarded.
Return type:int
Control.rate_limit(task_name, rate_limit, destination=None, **kwargs)[source]

Tell workers to set a new rate limit for task by type.

Parameters:
  • task_name (str) – Name of task to change rate limit for.
  • rate_limit (int, str) – The rate limit as tasks per second, or a rate limit string (‘100/m’, etc. see celery.task.base.Task.rate_limit for more information).

See also

broadcast() for supported keyword arguments.

Control.revoke(task_id, destination=None, terminate=False, signal='SIGTERM', **kwargs)[source]

Tell all (or specific) workers to revoke a task by id.

If a task is revoked, the workers will ignore the task and not execute it after all.

Parameters:
  • task_id (str) – Id of the task to revoke.
  • terminate (bool) – Also terminate the process currently working on the task (if any).
  • signal (str) – Name of signal to send to process if terminate. Default is TERM.

See also

broadcast() for supported keyword arguments.

Control.shutdown(destination=None, **kwargs)[source]

Shutdown worker(s).

See also

Supports the same arguments as broadcast()

Control.terminate(task_id, destination=None, signal='SIGTERM', **kwargs)[source]

Tell all (or specific) workers to terminate a task by id.

See also

This is just a shortcut to revoke() with the terminate argument enabled.

Control.time_limit(task_name, soft=None, hard=None, destination=None, **kwargs)[source]

Tell workers to set time limits for a task by type.

Parameters:
  • task_name (str) – Name of task to change time limits for.
  • soft (float) – New soft time limit (in seconds).
  • hard (float) – New hard time limit (in seconds).
  • **kwargs (Any) – arguments passed on to broadcast().
celery.app.control.flatten_reply(reply)[source]

Flatten node replies.

Convert from a list of replies in this format:

[{'a@example.com': reply},
 {'b@example.com': reply}]

into this format:

{'a@example.com': reply,
 'b@example.com': reply}