This document describes the current stable version of Celery (4.0). For development docs, go here.

Source code for celery.bin.migrate

"""The ``celery migrate`` command, used to filter and move messages."""
from __future__ import absolute_import, unicode_literals
from celery.bin.base import Command

MIGRATE_PROGRESS_FMT = """\
Migrating task {state.count}/{state.strtotal}: \
{body[task]}[{body[id]}]\
"""


[docs]class migrate(Command): """Migrate tasks from one broker to another. Warning: This command is experimental, make sure you have a backup of the tasks before you continue. Example: .. code-block:: console $ celery migrate amqp://A.example.com amqp://guest@B.example.com// $ celery migrate redis://localhost amqp://guest@localhost// """ args = '<source_url> <dest_url>' progress_fmt = MIGRATE_PROGRESS_FMT
[docs] def add_arguments(self, parser): group = parser.add_argument_group('Migration Options') group.add_argument( '--limit', '-n', type=int, help='Number of tasks to consume (int)', ) group.add_argument( '--timeout', '-t', type=float, default=1.0, help='Timeout in seconds (float) waiting for tasks', ) group.add_argument( '--ack-messages', '-a', action='store_true', default=False, help='Ack messages from source broker.', ) group.add_argument( '--tasks', '-T', help='List of task names to filter on.', ) group.add_argument( '--queues', '-Q', help='List of queues to migrate.', ) group.add_argument( '--forever', '-F', action='store_true', default=False, help='Continually migrate tasks until killed.', )
[docs] def on_migrate_task(self, state, body, message): self.out(self.progress_fmt.format(state=state, body=body))
[docs] def run(self, source, destination, **kwargs): from kombu import Connection from celery.contrib.migrate import migrate_tasks migrate_tasks(Connection(source), Connection(destination), callback=self.on_migrate_task, **kwargs)