This document describes the current stable version of Celery (4.4). For development docs, go here.

Source code for celery.bin.control

"""The ``celery control``, ``. inspect`` and ``. status`` programs."""
from __future__ import absolute_import, unicode_literals

from kombu.utils.json import dumps
from kombu.utils.objects import cached_property

from celery.bin.base import Command
from celery.five import items, string_t
from celery.platforms import EX_UNAVAILABLE, EX_USAGE
from celery.utils import text

class _RemoteControl(Command):

    name = None
    leaf = False
    control_group = None

    def __init__(self, *args, **kwargs):
        self.show_body = kwargs.pop('show_body', True)
        self.show_reply = kwargs.pop('show_reply', True)
        super(_RemoteControl, self).__init__(*args, **kwargs)

    def add_arguments(self, parser):
        group = parser.add_argument_group('Remote Control Options')
            '--timeout', '-t', type=float,
            help='Timeout in seconds (float) waiting for reply',
            '--destination', '-d',
            help='Comma separated list of destination node names.')
            '--json', '-j', action='store_true', default=False,
            help='Use json as output format.',

    def get_command_info(cls, command,
                         indent=0, prefix='', color=None,
                         help=False, app=None, choices=None):
        if choices is None:
            choices = cls._choices_by_group(app)
        meta = choices[command]
        if help:
            help = '|' + text.indent(, indent + 4)
            help = None
        return text.join([
            '|' + text.indent('{0}{1} {2}'.format(
                prefix, color(command), meta.signature or ''), indent),

    def list_commands(cls, indent=0, prefix='',
                      color=None, help=False, app=None):
        choices = cls._choices_by_group(app)
        color = color if color else lambda x: x
        prefix = prefix + ' ' if prefix else ''
        return '\n'.join(
            cls.get_command_info(c, indent, prefix, color, help,
                                 app=app, choices=choices)
            for c in sorted(choices))

    def usage(self, command):
        return '%(prog)s {0} [options] {1} <command> [arg1 .. argN]'.format(
            command, self.args)

    def call(self, *args, **kwargs):
        raise NotImplementedError('call')

    def run(self, *args, **kwargs):
        if not args:
            raise self.UsageError(
                'Missing {} method.  See --help'.format(self))
        return self.do_call_method(args, **kwargs)

    def _ensure_fanout_supported(self):
        with as conn:
            if not conn.supports_exchange_type('fanout'):
                raise self.Error(
                    'Broadcast not supported by transport {0!r}'.format(

    def do_call_method(self, args,
                       timeout=None, destination=None, json=False, **kwargs):
        method = args[0]
        if method == 'help':
            raise self.Error("Did you mean '{} --help'?".format(self))
            meta = self.choices[method]
        except KeyError:
            raise self.UsageError(
                'Unknown {} method {1}'.format(self, method))


        timeout = timeout or meta.default_timeout
        if destination and isinstance(destination, string_t):
            destination = [dest.strip() for dest in destination.split(',')]

        replies =
            arguments=self.compile_arguments(meta, method, args[1:]),
            callback=None if json else self.say_remote_command_reply,
        if not replies:
            raise self.Error('No nodes replied within time constraint.',
        if json:
        return replies

    def compile_arguments(self, meta, method, args):
        args = list(args)
        kw = {}
        if meta.args:
                k: v for k, v in self._consume_args(meta, method, args)
        if meta.variadic:
            kw.update({meta.variadic: args})
        if not kw and args:
            raise self.Error(
                'Command {0!r} takes no arguments.'.format(method),
        return kw or {}

    def _consume_args(self, meta, method, args):
        i = 0
            for i, arg in enumerate(args):
                    name, typ = meta.args[i]
                except IndexError:
                    if meta.variadic:
                    raise self.Error(
                        'Command {0!r} takes arguments: {1}'.format(
                            method, meta.signature),
                    yield name, typ(arg) if typ is not None else arg
            args[:] = args[i:]

    def _choices_by_group(cls, app):
        from celery.worker.control import Panel
        # need to import task modules for custom user-remote control commands.

        return {
            name: info for name, info in items(Panel.meta)
            if info.type == cls.control_group and info.visible

    def choices(self):
        return self._choices_by_group(

    def epilog(self):
        return '\n'.join([
            self.list_commands(indent=4, help=True,

[docs]class inspect(_RemoteControl): """Inspect the worker at runtime. Availability: RabbitMQ (AMQP) and Redis transports. Examples: .. code-block:: console $ celery inspect active --timeout=5 $ celery inspect scheduled -d $ celery inspect revoked -d, """ name = 'inspect' control_group = 'inspect'
[docs] def call(self, method, arguments, **options): return**options)._request( method, **arguments)
[docs]class control(_RemoteControl): """Workers remote control. Availability: RabbitMQ (AMQP), Redis, and MongoDB transports. Examples: .. code-block:: console $ celery control enable_events --timeout=5 $ celery control -d enable_events $ celery control -d, enable_events $ celery control -d add_consumer queue_name $ celery control -d cancel_consumer queue_name $ celery control add_consumer queue exchange direct rkey """ name = 'control' control_group = 'control'
[docs] def call(self, method, arguments, **options): return method, arguments=arguments, reply=True, **options)
[docs]class status(Command): """Show list of workers that are online.""" option_list = inspect.option_list
[docs] def run(self, *args, **kwargs): I = inspect(, no_color=kwargs.get('no_color', False), stdout=self.stdout, stderr=self.stderr, show_reply=False, show_body=False, quiet=True, ) replies ='ping', **kwargs) if not replies: raise self.Error('No nodes replied within time constraint', status=EX_UNAVAILABLE) nodecount = len(replies) if not kwargs.get('quiet', False): self.out('\n{0} {1} online.'.format( nodecount, text.pluralize(nodecount, 'node')))