Simple Interface

kombu.simple is a simple interface to AMQP queueing. It is only slightly different from the Queue class in the Python Standard Library, which makes it excellent for users with basic messaging needs.

Instead of defining exchanges and queues, the simple classes only requires two arguments, a connection channel and a name. The name is used as the queue, exchange and routing key. If the need arises, you can specify a Queue as the name argument instead.

In addition, the BrokerConnection comes with shortcuts to create simple queues using the current connection:

>>> queue = connection.SimpleQueue("myqueue")
>>> # ... do something with queue
>>> queue.close()

This is equivalent to:

>>> from kombu import SimpleQueue, SimpleBuffer

>>> channel = connection.channel()
>>> queue = SimpleBuffer(channel)
>>> # ... do something with queue
>>> channel.close()
>>> queue.close()

Sending and receiving messages

The simple interface defines two classes; SimpleQueue, and SimpleBuffer. The former is used for persistent messages, and the latter is used for transient, buffer-like queues. They both have the same interface, so you can use them interchangeably.

Here is an example using the SimpleQueue class to produce and consume logging messages:

from __future__ import with_statement

from socket import gethostname
from time import time

from kombu import BrokerConnection


class Logger(object):

    def __init__(self, connection, queue_name="log_queue",
            serializer="json", compression=None):
        self.queue = connection.SimpleQueue(self.queue_name)
        self.serializer = serializer
        self.compression = compression

    def log(self, message, level="INFO", context={}):
        self.queue.put({"message": message,
                        "level": level,
                        "context": context,
                        "hostname": socket.gethostname(),
                        "timestamp": time()},
                        serializer=self.serializer,
                        compression=self.compression)

    def process(self, callback, n=1, timeout=1):
        for i in xrange(n):
            log_message = self.queue.get(block=True, timeout=1)
            entry = log_message.payload # deserialized data.
            callback(entry)
            log_message.ack() # remove message from queue

    def close(self):
        self.queue.close()


if __name__ == "__main__":
    from contextlib import closing

    with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
        with closing(Logger(connection)) as logger:

            # Send message
            logger.log("Error happened while encoding video",
                        level="ERROR",
                        context={"filename": "cutekitten.mpg"})

            # Consume and process message

            # This is the callback called when a log message is
            # received.
            def dump_entry(entry):
                date = datetime.fromtimestamp(entry["timestamp"])
                print("[%s %s %s] %s %r" % (date,
                                            entry["hostname"],
                                            entry["level"],
                                            entry["message"],
                                            entry["context"]))

            # Process a single message using the callback above.
            logger.process(dump_entry, n=1)

Table Of Contents

Previous topic

Examples

Next topic

Connection and Producer Pools

This Page