This document describes the current stable version of Celery (4.0). For development docs, go here.
Source code for celery.bin.celery
# -*- coding: utf-8 -*-
"""The :program:`celery` umbrella command.
.. program:: celery
.. _preload-options:
Preload Options
---------------
These options are supported by all commands,
and usually parsed before command-specific arguments.
.. cmdoption:: -A, --app
app instance to use (e.g., ``module.attr_name``)
.. cmdoption:: -b, --broker
URL to broker. default is ``amqp://guest@localhost//``
.. cmdoption:: --loader
name of custom loader class to use.
.. cmdoption:: --config
Name of the configuration module
.. cmdoption:: -C, --no-color
Disable colors in output.
.. cmdoption:: -q, --quiet
Give less verbose output (behavior depends on the sub command).
.. cmdoption:: --help
Show help and exit.
.. _daemon-options:
Daemon Options
--------------
These options are supported by commands that can detach
into the background (daemon). They will be present
in any command that also has a `--detach` option.
.. cmdoption:: -f, --logfile
Path to log file. If no logfile is specified, `stderr` is used.
.. cmdoption:: --pidfile
Optional file used to store the process pid.
The program won't start if this file already exists
and the pid is still alive.
.. cmdoption:: --uid
User id, or user name of the user to run as after detaching.
.. cmdoption:: --gid
Group id, or group name of the main group to change to after
detaching.
.. cmdoption:: --umask
Effective umask (in octal) of the process after detaching. Inherits
the umask of the parent process by default.
.. cmdoption:: --workdir
Optional directory to change to after detaching.
.. cmdoption:: --executable
Executable to use for the detached process.
``celery inspect``
------------------
.. program:: celery inspect
.. cmdoption:: -t, --timeout
Timeout in seconds (float) waiting for reply
.. cmdoption:: -d, --destination
Comma separated list of destination node names.
.. cmdoption:: -j, --json
Use json as output format.
``celery control``
------------------
.. program:: celery control
.. cmdoption:: -t, --timeout
Timeout in seconds (float) waiting for reply
.. cmdoption:: -d, --destination
Comma separated list of destination node names.
.. cmdoption:: -j, --json
Use json as output format.
``celery migrate``
------------------
.. program:: celery migrate
.. cmdoption:: -n, --limit
Number of tasks to consume (int).
.. cmdoption:: -t, -timeout
Timeout in seconds (float) waiting for tasks.
.. cmdoption:: -a, --ack-messages
Ack messages from source broker.
.. cmdoption:: -T, --tasks
List of task names to filter on.
.. cmdoption:: -Q, --queues
List of queues to migrate.
.. cmdoption:: -F, --forever
Continually migrate tasks until killed.
``celery upgrade``
------------------
.. program:: celery upgrade
.. cmdoption:: --django
Upgrade a Django project.
.. cmdoption:: --compat
Maintain backwards compatibility.
.. cmdoption:: --no-backup
Don't backup original files.
``celery shell``
----------------
.. program:: celery shell
.. cmdoption:: -I, --ipython
Force :pypi:`iPython` implementation.
.. cmdoption:: -B, --bpython
Force :pypi:`bpython` implementation.
.. cmdoption:: -P, --python
Force default Python shell.
.. cmdoption:: -T, --without-tasks
Don't add tasks to locals.
.. cmdoption:: --eventlet
Use :pypi:`eventlet` monkey patches.
.. cmdoption:: --gevent
Use :pypi:`gevent` monkey patches.
``celery result``
-----------------
.. program:: celery result
.. cmdoption:: -t, --task
Name of task (if custom backend).
.. cmdoption:: --traceback
Show traceback if any.
``celery purge``
----------------
.. program:: celery purge
.. cmdoption:: -f, --force
Don't prompt for verification before deleting messages (DANGEROUS)
``celery call``
---------------
.. program:: celery call
.. cmdoption:: -a, --args
Positional arguments (json format).
.. cmdoption:: -k, --kwargs
Keyword arguments (json format).
.. cmdoption:: --eta
Scheduled time in ISO-8601 format.
.. cmdoption:: --countdown
ETA in seconds from now (float/int).
.. cmdoption:: --expires
Expiry time in float/int seconds, or a ISO-8601 date.
.. cmdoption:: --serializer
Specify serializer to use (default is json).
.. cmdoption:: --queue
Destination queue.
.. cmdoption:: --exchange
Destination exchange (defaults to the queue exchange).
.. cmdoption:: --routing-key
Destination routing key (defaults to the queue routing key).
"""
from __future__ import absolute_import, unicode_literals, print_function
import numbers
import sys
from functools import partial
from celery.platforms import EX_OK, EX_FAILURE, EX_USAGE
from celery.utils import term
from celery.utils import text
# Cannot use relative imports here due to a Windows issue (#1111).
from celery.bin.base import Command, Extensions
# Import commands from other modules
from celery.bin.amqp import amqp
from celery.bin.beat import beat
from celery.bin.call import call
from celery.bin.control import _RemoteControl # noqa
from celery.bin.control import control, inspect, status
from celery.bin.events import events
from celery.bin.graph import graph
from celery.bin.list import list_
from celery.bin.logtool import logtool
from celery.bin.migrate import migrate
from celery.bin.purge import purge
from celery.bin.result import result
from celery.bin.shell import shell
from celery.bin.worker import worker
from celery.bin.upgrade import upgrade
__all__ = ['CeleryCommand', 'main']
HELP = """
---- -- - - ---- Commands- -------------- --- ------------
{commands}
---- -- - - --------- -- - -------------- --- ------------
Type '{prog_name} <command> --help' for help using a specific command.
"""
command_classes = [
('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
('Remote Control', ['status', 'inspect', 'control'], 'blue'),
('Utils',
['purge', 'list', 'call', 'result', 'migrate', 'graph', 'upgrade'],
None),
('Debugging', ['report', 'logtool'], 'red'),
]
def determine_exit_status(ret):
if isinstance(ret, numbers.Integral):
return ret
return EX_OK if ret else EX_FAILURE
[docs]def main(argv=None):
"""Start celery umbrella command."""
# Fix for setuptools generated scripts, so that it will
# work with multiprocessing fork emulation.
# (see multiprocessing.forking.get_preparation_data())
try:
if __name__ != '__main__': # pragma: no cover
sys.modules['__main__'] = sys.modules[__name__]
cmd = CeleryCommand()
cmd.maybe_patch_concurrency()
from billiard import freeze_support
freeze_support()
cmd.execute_from_commandline(argv)
except KeyboardInterrupt:
pass
class multi(Command):
"""Start multiple worker instances."""
respects_app_option = False
def run_from_argv(self, prog_name, argv, command=None):
from celery.bin.multi import MultiTool
cmd = MultiTool(quiet=self.quiet, no_color=self.no_color)
return cmd.execute_from_commandline([command] + argv)
class help(Command):
"""Show help screen and exit."""
def usage(self, command):
return '%(prog)s <command> [options] {0.args}'.format(self)
def run(self, *args, **kwargs):
self.parser.print_help()
self.out(HELP.format(
prog_name=self.prog_name,
commands=CeleryCommand.list_commands(
colored=self.colored, app=self.app),
))
return EX_USAGE
class report(Command):
"""Shows information useful to include in bug-reports."""
def run(self, *args, **kwargs):
self.out(self.app.bugreport())
return EX_OK
[docs]class CeleryCommand(Command):
"""Base class for commands."""
commands = {
'amqp': amqp,
'beat': beat,
'call': call,
'control': control,
'events': events,
'graph': graph,
'help': help,
'inspect': inspect,
'list': list_,
'logtool': logtool,
'migrate': migrate,
'multi': multi,
'purge': purge,
'report': report,
'result': result,
'shell': shell,
'status': status,
'upgrade': upgrade,
'worker': worker,
}
ext_fmt = '{self.namespace}.commands'
enable_config_from_cmdline = True
prog_name = 'celery'
namespace = 'celery'
@classmethod
[docs] def register_command(cls, fun, name=None):
cls.commands[name or fun.__name__] = fun
return fun
[docs] def execute(self, command, argv=None):
try:
cls = self.commands[command]
except KeyError:
cls, argv = self.commands['help'], ['help']
cls = self.commands.get(command) or self.commands['help']
try:
return cls(
app=self.app, on_error=self.on_error,
no_color=self.no_color, quiet=self.quiet,
on_usage_error=partial(self.on_usage_error, command=command),
).run_from_argv(self.prog_name, argv[1:], command=argv[0])
except self.UsageError as exc:
self.on_usage_error(exc)
return exc.status
except self.Error as exc:
self.on_error(exc)
return exc.status
[docs] def on_usage_error(self, exc, command=None):
if command:
helps = '{self.prog_name} {command} --help'
else:
helps = '{self.prog_name} --help'
self.error(self.colored.magenta('Error: {0}'.format(exc)))
self.error("""Please try '{0}'""".format(helps.format(
self=self, command=command,
)))
def _relocate_args_from_start(self, argv, index=0):
if argv:
rest = []
while index < len(argv):
value = argv[index]
if value.startswith('--'):
rest.append(value)
elif value.startswith('-'):
# we eat the next argument even though we don't know
# if this option takes an argument or not.
# instead we'll assume what's the command name in the
# return statements below.
try:
nxt = argv[index + 1]
if nxt.startswith('-'):
# is another option
rest.append(value)
else:
# is (maybe) a value for this option
rest.extend([value, nxt])
index += 1
except IndexError: # pragma: no cover
rest.append(value)
break
else:
break
index += 1
if argv[index:]: # pragma: no cover
# if there are more arguments left then divide and swap
# we assume the first argument in argv[i:] is the command
# name.
return argv[index:] + rest
# if there are no more arguments then the last arg in rest'
# must be the command.
[rest.pop()] + rest
return []
[docs] def prepare_prog_name(self, name):
if name == '__main__.py':
return sys.modules['__main__'].__file__
return name
[docs] def handle_argv(self, prog_name, argv, **kwargs):
self.prog_name = self.prepare_prog_name(prog_name)
argv = self._relocate_args_from_start(argv)
_, argv = self.prepare_args(None, argv)
try:
command = argv[0]
except IndexError:
command, argv = 'help', ['help']
return self.execute(command, argv)
[docs] def execute_from_commandline(self, argv=None):
argv = sys.argv if argv is None else argv
if 'multi' in argv[1:3]: # Issue 1008
self.respects_app_option = False
try:
sys.exit(determine_exit_status(
super(CeleryCommand, self).execute_from_commandline(argv)))
except KeyboardInterrupt:
sys.exit(EX_FAILURE)
@classmethod
[docs] def get_command_info(cls, command, indent=0,
color=None, colored=None, app=None):
colored = term.colored() if colored is None else colored
colored = colored.names[color] if color else lambda x: x
obj = cls.commands[command]
cmd = 'celery {0}'.format(colored(command))
if obj.leaf:
return '|' + text.indent(cmd, indent)
return text.join([
' ',
'|' + text.indent('{0} --help'.format(cmd), indent),
obj.list_commands(indent, 'celery {0}'.format(command), colored,
app=app),
])
@classmethod
[docs] def list_commands(cls, indent=0, colored=None, app=None):
colored = term.colored() if colored is None else colored
white = colored.white
ret = []
for command_cls, commands, color in command_classes:
ret.extend([
text.indent('+ {0}: '.format(white(command_cls)), indent),
'\n'.join(
cls.get_command_info(
command, indent + 4, color, colored, app=app)
for command in commands),
''
])
return '\n'.join(ret).strip()
[docs] def with_pool_option(self, argv):
if len(argv) > 1 and 'worker' in argv[0:3]:
# this command supports custom pools
# that may have to be loaded as early as possible.
return (['-P'], ['--pool'])
[docs] def load_extension_commands(self):
names = Extensions(self.ext_fmt.format(self=self),
self.register_command).load()
if names:
command_classes.append(('Extensions', names, 'magenta'))
if __name__ == '__main__': # pragma: no cover
main()