This document describes the current stable version of Kombu (4.0). For development docs, go here.
Common Utilities - kombu.common
¶
Common Utilities.
-
class
kombu.common.
Broadcast
(name=None, queue=None, auto_delete=True, exchange=None, alias=None, **kwargs)[source]¶ Broadcast queue.
Convenience class used to define broadcast queues.
Every queue instance will have a unique name, and both the queue and exchange is configured with auto deletion.
Parameters: -
attrs
= ((u'name', None), (u'exchange', None), (u'routing_key', None), (u'queue_arguments', None), (u'binding_arguments', None), (u'consumer_arguments', None), (u'durable', <type 'bool'>), (u'exclusive', <type 'bool'>), (u'auto_delete', <type 'bool'>), (u'no_ack', None), (u'alias', None), (u'bindings', <type 'list'>), (u'no_declare', <type 'bool'>), (u'expires', <type 'float'>), (u'message_ttl', <type 'float'>), (u'max_length', <type 'int'>), (u'max_length_bytes', <type 'int'>), (u'max_priority', <type 'int'>), (u'queue', None))¶
-
-
kombu.common.
maybe_declare
(entity, channel=None, retry=False, **retry_policy)[source]¶ Declare entity (cached).
-
kombu.common.
uuid
(_uuid=<function uuid4>)[source]¶ Generate unique id in UUID4 format.
See also
For now this is provided by
uuid.uuid4()
.
-
kombu.common.
itermessages
(conn, channel, queue, limit=1, timeout=None, callbacks=None, **kwargs)[source]¶ Iterator over messages.
-
kombu.common.
send_reply
(exchange, req, msg, producer=None, retry=False, retry_policy=None, **props)[source]¶ Send reply for request.
Parameters: - exchange (kombu.Exchange, str) – Reply exchange
- req (Message) – Original request, a message with
a
reply_to
property. - producer (kombu.Producer) – Producer instance
- retry (bool) – If true must retry according to
the
reply_policy
argument. - retry_policy (Dict) – Retry settings.
- **props (Any) – Extra properties.
-
kombu.common.
collect_replies
(conn, channel, queue, *args, **kwargs)[source]¶ Generator collecting replies from
queue
.
-
kombu.common.
insured
(pool, fun, args, kwargs, errback=None, on_revive=None, **opts)[source]¶ Function wrapper to handle connection errors.
Ensures function performing broker commands completes despite intermittent connection failures.
-
kombu.common.
drain_consumer
(consumer, limit=1, timeout=None, callbacks=None)[source]¶ Drain messages from consumer instance.
-
kombu.common.
eventloop
(conn, limit=None, timeout=None, ignore_timeouts=False)[source]¶ Best practice generator wrapper around
Connection.drain_events
.Able to drain events forever, with a limit, and optionally ignoring timeout errors (a timeout of 1 is often used in environments where the socket can get “stuck”, and is a best practice for Kombu consumers).
eventloop
is a generator.Examples
>>> from kombu.common import eventloop
>>> def run(conn): ... it = eventloop(conn, timeout=1, ignore_timeouts=True) ... next(it) # one event consumed, or timed out. ... ... for _ in eventloop(conn, timeout=1, ignore_timeouts=True): ... pass # loop forever.
It also takes an optional limit parameter, and timeout errors are propagated by default:
for _ in eventloop(connection, limit=1, timeout=1): pass
See also
itermessages()
, which is an event loop bound to one or more consumers, that yields any messages received.