Very simple task queue using pickle, with primitive support for priorities using different queues.
queues.py:
from kombu import Exchange, Queue
task_exchange = Exchange("tasks", type="direct")
task_queues = [Queue("hipri", task_exchange, routing_key="hipri"),
Queue("midpri", task_exchange, routing_key="midpri"),
Queue("lopri", task_exchange, routing_key="lopri")]
worker.py:
from __future__ import with_statement
from kombu.mixins import ConsumerMixin
from kombu.utils import kwdict, reprcall
from queues import task_queues
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=task_queues,
callbacks=[self.process_task])]
def process_task(self, body, message):
fun = body["fun"]
args = body["args"]
kwargs = body["kwargs"]
self.info("Got task: %s", reprcall(fun.__name__, args, kwargs))
try:
fun(*args, **kwdict(kwargs))
except Exception, exc:
self.error("task raised exception: %r", exc)
message.ack()
if __name__ == "__main__":
from kombu import BrokerConnection
from kombu.utils.debug import setup_logging
setup_logging(loglevel="INFO")
with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
try:
Worker(conn).run()
except KeyboardInterrupt:
print("bye bye")
tasks.py:
def hello_task(who="world"):
print("Hello %s" % (who, ))
client.py:
from __future__ import with_statement
from kombu.common import maybe_declare
from kombu.pools import producers
from queues import task_exchange
priority_to_routing_key = {"high": "hipri",
"mid": "midpri",
"low": "lopri"}
def send_as_task(connection, fun, args=(), kwargs={}, priority="mid"):
payload = {"fun": fun, "args": args, "kwargs": kwargs}
routing_key = priority_to_routing_key[priority]
with producers[connection].acquire(block=True) as producer:
maybe_declare(task_exchange, producer.channel)
producer.publish(payload, serializer="pickle",
compression="bzip2",
routing_key=routing_key)
if __name__ == "__main__":
from kombu import BrokerConnection
from tasks import hello_task
connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
send_as_task(connection, fun=hello_task, args=("Kombu", ), kwargs={},
priority="high")