This document describes the current stable version of Kombu (4.0). For development docs, go here.

Source code for librabbitmq

from __future__ import absolute_import

import itertools

import _librabbitmq

from amqp.protocol import queue_declare_ok_t
from array import array

__version__ = _librabbitmq.__version__
__author__ = _librabbitmq.__author__
__contact__ = _librabbitmq.__contact__
__homepage__ = _librabbitmq.__homepage__
__docformat__ = 'restructuredtext'

ConnectionError = _librabbitmq.ConnectionError
ChannelError = _librabbitmq.ChannelError


__version__ = '1.6.1'
__all__ = ['Connection', 'Message', 'ConnectionError', 'ChannelError']


class Message(object):

    def __init__(self, channel, properties, delivery_info={}, body=''):
        self.channel = channel
        self.properties = properties
        self.delivery_info = delivery_info
        self.body = body

    def ack(self):
        return self.channel.basic_ack(self.delivery_info['delivery_tag'])

    def reject(self):
        return self.channel.basic_reject(self.delivery_info['delivery_tag'])


class Channel(object):
    Message = Message
    is_open = False

    def __init__(self, connection, channel_id):
        self.connection = connection
        self.channel_id = channel_id
        self.next_consumer_tag = itertools.count(1).next
        self.no_ack_consumers = set()

    def __enter__(self):
        return self

    def __exit__(self, *exc_info):
        self.close()

    def basic_qos(self, prefetch_size=0, prefetch_count=0, _global=False):
        return self.connection._basic_qos(
            self.channel_id, prefetch_size, prefetch_count, _global,
        )

    def flow(self, active):
        return self.connection._flow(self.channel_id, active)

    def basic_recover(self, requeue=True):
        return self.connection._basic_recover(self.channel_id, requeue)

    def basic_get(self, queue='', no_ack=False):
        frame = self.connection._basic_get(self.channel_id, queue, no_ack)
        if frame is not None:
            return self.Message(
                self, frame['properties'],
                frame['delivery_info'], frame['body'],
            )

    def basic_consume(self, queue='', consumer_tag=None, no_local=False,
                      no_ack=False, exclusive=False, callback=None,
                      arguments=None, nowait=False):
        if consumer_tag is None:
            consumer_tag = self.next_consumer_tag()
        consumer_tag = self.connection._basic_consume(
            self.channel_id, queue, str(consumer_tag),
            no_local, no_ack, exclusive, arguments or {},
        )
        self.connection.callbacks[self.channel_id][consumer_tag] = callback
        if no_ack:
            self.no_ack_consumers.add(consumer_tag)
        return consumer_tag

    def basic_ack(self, delivery_tag, multiple=False):
        return self.connection._basic_ack(
            self.channel_id, delivery_tag, multiple,
        )

    def basic_reject(self, delivery_tag, requeue=True):
        return self.connection._basic_reject(
            self.channel_id, delivery_tag, requeue,
        )

    def basic_cancel(self, consumer_tag, **kwargs):
        self.no_ack_consumers.discard(consumer_tag)
        if self.connection:
            try:
                callbacks = self.connection.callbacks[self.channel_id]
            except KeyError:
                pass
            else:
                callbacks.pop(consumer_tag, None)
            self.connection._basic_cancel(self.channel_id, consumer_tag)

    def basic_publish(self, body, exchange='', routing_key='',
                      mandatory=False, immediate=False, **properties):
        if isinstance(body, tuple):
            body, properties = body
        elif isinstance(body, self.Message):
            body, properties = body.body, body.properties
        return self.connection._basic_publish(
            self.channel_id, body, exchange, routing_key, properties,
            mandatory or False, immediate or False,
        )

    def queue_purge(self, queue, nowait=False):
        return self.connection._queue_purge(self.channel_id, queue, nowait)

    def exchange_declare(self, exchange='', type='direct',
                         passive=False, durable=False, auto_delete=False,
                         arguments=None, nowait=False):
        """Declare exchange.

        :keyword auto_delete: Not recommended and so it is ignored.

        """
        return self.connection._exchange_declare(
            self.channel_id, exchange, type, passive, durable,
            auto_delete, arguments or {},
        )

    def exchange_delete(self, exchange='', if_unused=False, nowait=False):
        return self.connection._exchange_delete(
            self.channel_id, exchange, if_unused,
        )

    def queue_declare(self, queue='', passive=False, durable=False,
                      exclusive=False, auto_delete=False, arguments=None,
                      nowait=False):
        return queue_declare_ok_t(
            *self.connection._queue_declare(
                self.channel_id, queue, passive, durable,
                exclusive, auto_delete, arguments or {},
            )
        )

    def queue_bind(self, queue='', exchange='', routing_key='',
                   arguments=None, nowait=False):
        return self.connection._queue_bind(
            self.channel_id, queue, exchange, routing_key, arguments or {},
        )

    def queue_unbind(self, queue='', exchange='', routing_key='',
                     arguments=None, nowait=False):
        return self.connection._queue_unbind(
            self.channel_id, queue, exchange, routing_key, arguments or {},
        )

    def queue_delete(self, queue='',
                     if_unused=False, if_empty=False, nowait=False):
        """nowait argument is not supported."""
        return self.connection._queue_delete(
            self.channel_id, queue, if_unused, if_empty,
        )

    def close(self):
        if self.connection:
            self.connection._remove_channel(self)


class Connection(_librabbitmq.Connection):
    """Create a connection to the specified host, which should be
    a ``'host[:port]'`` string, such as ``'localhost'``, or ``'1.2.3.4:5672'``

    Host defaults to ``'localhost'``, if a port is not specified
    then 5672 is used.

    """
    Message = Message
    Channel = Channel

    def __init__(self, host='localhost', userid='guest', password='guest',
                 virtual_host='/', port=5672, channel_max=0xffff,
                 frame_max=131072, heartbeat=0, lazy=False, **kwargs):
        if ':' in host:
            host, port = host.split(':')
        super(Connection, self).__init__(
            hostname=host, port=int(port), userid=userid, password=password,
            virtual_host=virtual_host, channel_max=channel_max,
            frame_max=frame_max, heartbeat=heartbeat,
        )
        self.channels = {}
        self._avail_channel_ids = array('H', xrange(self.channel_max, 0, -1))
        if not lazy:
            self.connect()

    def __enter__(self):
        return self

    def __exit__(self, *exc_info):
        self.close()

[docs] def reconnect(self): self.close() self.connect()
[docs] def drain_events(self, timeout=None): # we rewrite to socket.timeout here, as this is what kombu-patched # amqplib uses. if timeout == 0.0: timeout = -1 elif timeout is None: timeout = 0.0 else: timeout = float(timeout) self._basic_recv(timeout)
[docs] def channel(self, channel_id=None): if channel_id is None: channel_id = self._get_free_channel_id() elif channel_id in self.channels: return self.channels[channel_id] self._channel_open(channel_id) self.callbacks[channel_id] = {} channel = self.Channel(self, channel_id) channel.is_open = True self.channels[channel_id] = channel return channel
def _remove_channel(self, channel): channel.is_open = False try: self._channel_close(channel.channel_id) except ChannelError: pass self.channels.pop(channel.channel_id, None) self.callbacks.pop(channel.channel_id, None) self._avail_channel_ids.append(channel.channel_id) def _get_free_channel_id(self): try: return self._avail_channel_ids.pop() except IndexError: raise ConnectionError( 'No free channel ids, current=%d, channel_max=%d' % ( len(self.channels), self.channel_max))
[docs] def close(self): try: self._close() except ConnectionError: pass