Examples

Task Queue Example

Very simple task queue using pickle, with primitive support for priorities using different queues.

queues.py:

from kombu import Exchange, Queue

task_exchange = Exchange("tasks", type="direct")
task_queues = [Queue("hipri", task_exchange, routing_key="hipri"),
               Queue("midpri", task_exchange, routing_key="midpri"),
               Queue("lopri", task_exchange, routing_key="lopri")]

worker.py:

from __future__ import with_statement

from kombu.mixins import ConsumerMixin
from kombu.utils import kwdict, reprcall

from queues import task_queues


class Worker(ConsumerMixin):

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=task_queues,
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        fun = body["fun"]
        args = body["args"]
        kwargs = body["kwargs"]
        self.info("Got task: %s", reprcall(fun.__name__, args, kwargs))
        try:
            fun(*args, **kwdict(kwargs))
        except Exception, exc:
            self.error("task raised exception: %r", exc)
        message.ack()

if __name__ == "__main__":
    from kombu import BrokerConnection
    from kombu.utils.debug import setup_logging
    setup_logging(loglevel="INFO")

    with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
        try:
            Worker(conn).run()
        except KeyboardInterrupt:
            print("bye bye")

tasks.py:

def hello_task(who="world"):
    print("Hello %s" % (who, ))

client.py:

from __future__ import with_statement

from kombu.common import maybe_declare
from kombu.pools import producers

from queues import task_exchange

priority_to_routing_key = {"high": "hipri",
                           "mid": "midpri",
                           "low": "lopri"}


def send_as_task(connection, fun, args=(), kwargs={}, priority="mid"):
    payload = {"fun": fun, "args": args, "kwargs": kwargs}
    routing_key = priority_to_routing_key[priority]

    with producers[connection].acquire(block=True) as producer:
        maybe_declare(task_exchange, producer.channel)
        producer.publish(payload, serializer="pickle",
                                  compression="bzip2",
                                  routing_key=routing_key)

if __name__ == "__main__":
    from kombu import BrokerConnection
    from tasks import hello_task

    connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
    send_as_task(connection, fun=hello_task, args=("Kombu", ), kwargs={},
                 priority="high")

Table Of Contents

Previous topic

Consumers

Next topic

Simple Interface

This Page