.. _guide-consumers: =========== Consumers =========== .. _consumer-basics: Basics ====== The :class:`Consumer` takes a connection (or channel) and a list of queues to consume from. Several consumers can be mixed to consume from different channels, as they all bind to the same connection, and ``drain_events`` will drain events from all channels on that connection. .. note:: Kombu since 3.0 will only accept json/binary or text messages by default, to allow deserialization of other formats you have to specify them in the ``accept`` argument (in addition to setting the right content type for your messages): .. code-block:: python Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml']) Draining events from a single consumer: .. code-block:: python with Consumer(connection, queues, accept=['json']): connection.drain_events(timeout=1) Draining events from several consumers: .. code-block:: python from kombu.utils.compat import nested with connection.channel(), connection.channel() as (channel1, channel2): with nested(Consumer(channel1, queues1, accept=['json']), Consumer(channel2, queues2, accept=['json'])): connection.drain_events(timeout=1) Or using :class:`~kombu.mixins.ConsumerMixin`: .. code-block:: python from kombu.mixins import ConsumerMixin class C(ConsumerMixin): def __init__(self, connection): self.connection = connection def get_consumers(self, Consumer, channel): return [ Consumer(queues, callbacks=[self.on_message], accept=['json']), ] def on_message(self, body, message): print('RECEIVED MESSAGE: {0!r}'.format(body)) message.ack() C(connection).run() and with multiple channels again: .. code-block:: python from kombu import Consumer from kombu.mixins import ConsumerMixin class C(ConsumerMixin): channel2 = None def __init__(self, connection): self.connection = connection def get_consumers(self, _, default_channel): self.channel2 = default_channel.connection.channel() return [Consumer(default_channel, queues1, callbacks=[self.on_message], accept=['json']), Consumer(self.channel2, queues2, callbacks=[self.on_special_message], accept=['json'])] def on_consumer_end(self, connection, default_channel): if self.channel2: self.channel2.close() C(connection).run() There's also a :class:`~kombu.mixins.ConsumerProducerMixin` for consumers that need to also publish messages on a separate connection (e.g. sending rpc replies, streaming results): .. code-block:: python from kombu import Producer, Queue from kombu.mixins import ConsumerProducerMixin rpc_queue = Queue('rpc_queue') class Worker(ConsumerProducerMixin): def __init__(self, connection): self.connection = connection def get_consumers(self, Consumer, channel): return [Consumer( queues=[rpc_queue], on_message=self.on_request, accept={'application/json'}, prefetch_count=1, )] def on_request(self, message): n = message.payload['n'] print(' [.] fib({0})'.format(n)) result = fib(n) self.producer.publish( {'result': result}, exchange='', routing_key=message.properties['reply_to'], correlation_id=message.properties['correlation_id'], serializer='json', retry=True, ) message.ack() .. seealso:: :file:`examples/rpc-tut6/` in the Github repository. Advanced Topics =============== RabbitMQ -------- Consumer Priorities ~~~~~~~~~~~~~~~~~~~ RabbitMQ defines a consumer priority extension to the amqp protocol, that can be enabled by setting the ``x-priority`` argument to ``basic.consume``. In kombu you can specify this argument on the :class:`~kombu.Queue`, like this: .. code-block:: python queue = Queue('name', Exchange('exchange_name', type='direct'), consumer_arguments={'x-priority': 10}) Read more about consumer priorities here: https://www.rabbitmq.com/consumer-priority.html Reference ========= .. autoclass:: kombu.Consumer :noindex: :members: