This document describes the current stable version of Kombu (4.3). For development docs, go here.
Connection - kombu.connection
¶
Client (Connection).
Connection¶
-
class
kombu.connection.
Connection
(hostname=u'localhost', userid=None, password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, heartbeat=0, failover_strategy=u'round-robin', alternates=None, **kwargs)[source]¶ A connection to the broker.
Example
>>> Connection('amqp://guest:guest@localhost:5672//') >>> Connection('amqp://foo;amqp://bar', ... failover_strategy='round-robin') >>> Connection('redis://', transport_options={ ... 'visibility_timeout': 3000, ... })
>>> import ssl >>> Connection('amqp://', login_method='EXTERNAL', ssl={ ... 'ca_certs': '/etc/pki/tls/certs/something.crt', ... 'keyfile': '/etc/something/system.key', ... 'certfile': '/etc/something/system.cert', ... 'cert_reqs': ssl.CERT_REQUIRED, ... })
Note
SSL currently only works with the py-amqp, and qpid transports. For other transports you can use stunnel.
Parameters: URL (str, Sequence) – Broker URL, or a list of URLs.
Keyword Arguments: - ssl (bool) – Use SSL to connect to the server. Default is
False
. May not be supported by the specified transport. - transport (Transport) – Default transport if not specified in the URL.
- connect_timeout (float) – Timeout in seconds for connecting to the server. May not be supported by the specified transport.
- transport_options (Dict) – A dict of additional connection arguments to pass to alternate kombu channel implementations. Consult the transport documentation for available options.
- heartbeat (float) – Heartbeat interval in int/float seconds.
Note that if heartbeats are enabled then the
heartbeat_check()
method must be called regularly, around once per second.
Note
The connection is established lazily when needed. If you need the connection to be established, then force it by calling
connect()
:>>> conn = Connection('amqp://') >>> conn.connect()
and always remember to close the connection:
>>> conn.release()
These options have been replaced by the URL argument, but are still supported for backwards compatibility:
Keyword Arguments: - hostname – Host name/address. NOTE: You cannot specify both the URL argument and use the hostname keyword argument at the same time.
- userid – Default user name if not provided in the URL.
- password – Default password if not provided in the URL.
- virtual_host – Default virtual host if not provided in the URL.
- port – Default port if not provided in the URL.
-
ChannelPool
(limit=None, **kwargs)[source]¶ Pool of channels.
See also
Parameters: limit (int) – Maximum number of active channels. Default is no limit. Example
>>> connection = Connection('amqp://') >>> pool = connection.ChannelPool(2) >>> c1 = pool.acquire() >>> c2 = pool.acquire() >>> c3 = pool.acquire() >>> c1.release() >>> c3 = pool.acquire() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "kombu/connection.py", line 354, in acquire raise ChannelLimitExceeded(self.limit) kombu.connection.ChannelLimitExceeded: 2
-
Consumer
(queues=None, channel=None, *args, **kwargs)[source]¶ Create new
kombu.Consumer
instance.
-
Pool
(limit=None, **kwargs)[source]¶ Pool of connections.
See also
Parameters: limit (int) – Maximum number of active connections. Default is no limit. Example
>>> connection = Connection('amqp://') >>> pool = connection.Pool(2) >>> c1 = pool.acquire() >>> c2 = pool.acquire() >>> c3 = pool.acquire() >>> c1.release() >>> c3 = pool.acquire() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "kombu/connection.py", line 354, in acquire raise ConnectionLimitExceeded(self.limit) kombu.exceptions.ConnectionLimitExceeded: 2
-
Producer
(channel=None, *args, **kwargs)[source]¶ Create new
kombu.Producer
instance.
-
SimpleBuffer
(name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs)[source]¶ Simple ephemeral queue API.
Create new
SimpleQueue
using a channel from this connection.See also
Same as
SimpleQueue()
, but configured with buffering semantics. The resulting queue and exchange will not be durable, also auto delete is enabled. Messages will be transient (not persistent), and acknowledgments are disabled (no_ack
).
-
SimpleQueue
(name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, channel=None, **kwargs)[source]¶ Simple persistent queue API.
Create new
SimpleQueue
, using a channel from this connection.If
name
is a string, a queue and exchange will be automatically created using that name as the name of the queue and exchange, also it will be used as the default routing key.Parameters: - name (str, kombu.Queue) – Name of the queue/or a queue.
- no_ack (bool) – Disable acknowledgments. Default is false.
- queue_opts (Dict) – Additional keyword arguments passed to the
constructor of the automatically created
Queue
. - queue_args (Dict) – Additional keyword arguments passed to the
constructor of the automatically created
Queue
for setting implementation extensions (e.g., in RabbitMQ). - exchange_opts (Dict) – Additional keyword arguments passed to the
constructor of the automatically created
Exchange
. - channel (ChannelT) – Custom channel to use. If not specified the connection default channel is used.
-
as_uri
(include_password=False, mask=u'**', getfields=<operator.itemgetter object>)[source]¶ Convert connection parameters to URL form.
-
autoretry
(fun, channel=None, **ensure_options)[source]¶ Decorator for functions supporting a
channel
keyword argument.The resulting callable will retry calling the function if it raises connection or channel related errors. The return value will be a tuple of
(retval, last_created_channel)
.If a
channel
is not provided, then one will be automatically acquired (remember to close it afterwards).See also
ensure()
for the full list of supported keyword arguments.Example
>>> channel = connection.channel() >>> try: ... ret, channel = connection.autoretry( ... publish_messages, channel) ... finally: ... channel.close()
-
close
()¶ Close the connection (if open).
-
connect_timeout
= 5¶
-
connected
¶ Return true if the connection has been established.
-
connection
¶ The underlying connection object.
Warning
This instance is transport specific, so do not depend on the interface of this object.
-
cycle
= None¶ Iterator returning the next broker URL to try in the event of connection failure (initialized by
failover_strategy
).
-
declared_entities
= None¶ The cache of declared entities is per connection, in case the server loses data.
-
default_channel
¶ Default channel.
Created upon access and closed when the connection is closed.
Note
Can be used for automatic channel handling when you only need one channel, and also it is the channel implicitly used if a connection is passed instead of a channel, to functions that require a channel.
-
drain_events
(**kwargs)[source]¶ Wait for a single event from the server.
Parameters: timeout (float) – Timeout in seconds before we give up. Raises: socket.timeout
– if the timeout is exceeded.
-
ensure
(obj, fun, errback=None, max_retries=None, interval_start=1, interval_step=1, interval_max=1, on_revive=None)[source]¶ Ensure operation completes.
Regardless of any channel/connection errors occurring.
Retries by establishing the connection, and reapplying the function.
Parameters: - obj – The object to ensure an action on.
- fun (Callable) – Method to apply.
- errback (Callable) – Optional callback called each time the
connection can’t be established. Arguments provided are
the exception raised and the interval that will
be slept
(exc, interval)
. - max_retries (int) – Maximum number of times to retry. If this limit is exceeded the connection error will be re-raised.
- interval_start (float) – The number of seconds we start sleeping for.
- interval_step (float) – How many seconds added to the interval for each retry.
- interval_max (float) – Maximum number of seconds to sleep between each retry.
- on_revive (Callable) – Optional callback called whenever revival completes successfully
Examples
>>> from kombu import Connection, Producer >>> conn = Connection('amqp://') >>> producer = Producer(conn)
>>> def errback(exc, interval): ... logger.error('Error: %r', exc, exc_info=1) ... logger.info('Retry in %s seconds.', interval)
>>> publish = conn.ensure(producer, producer.publish, ... errback=errback, max_retries=3) >>> publish({'hello': 'world'}, routing_key='dest')
-
ensure_connection
(errback=None, max_retries=None, interval_start=2, interval_step=2, interval_max=30, callback=None, reraise_as_library_errors=True, timeout=None)[source]¶ Ensure we have a connection to the server.
If not retry establishing the connection with the settings specified.
Parameters: - errback (Callable) – Optional callback called each time the
connection can’t be established. Arguments provided are
the exception raised and the interval that will be
slept
(exc, interval)
. - max_retries (int) – Maximum number of times to retry. If this limit is exceeded the connection error will be re-raised.
- interval_start (float) – The number of seconds we start sleeping for.
- interval_step (float) – How many seconds added to the interval for each retry.
- interval_max (float) – Maximum number of seconds to sleep between each retry.
- callback (Callable) – Optional callback that is called for every internal iteration (1 s).
- timeout (int) – Maximum amount of time in seconds to spend waiting for connection
- errback (Callable) – Optional callback called each time the
connection can’t be established. Arguments provided are
the exception raised and the interval that will be
slept
-
failover_strategies
= {u'round-robin': <type 'itertools.cycle'>, u'shuffle': <function shufflecycle at 0x7f10df06d488>}¶
-
failover_strategy
= u'round-robin'¶ Strategy used to select new hosts when reconnecting after connection failure. One of “round-robin”, “shuffle” or any custom iterator constantly yielding new URLs to try.
-
heartbeat
= None¶ Heartbeat value, currently only supported by the py-amqp transport.
-
heartbeat_check
(rate=2)[source]¶ Check heartbeats.
Allow the transport to perform any periodic tasks required to make heartbeats work. This should be called approximately every second.
If the current transport does not support heartbeats then this is a noop operation.
Parameters: rate (int) – Rate is how often the tick is called compared to the actual heartbeat value. E.g. if the heartbeat is set to 3 seconds, and the tick is called every 3 / 2 seconds, then the rate is 2. This value is currently unused by any transports.
-
host
¶ The host as a host name/port pair separated by colon.
-
hostname
= None¶
-
is_evented
¶
-
login_method
= None¶
-
manager
[source]¶ AMQP Management API.
Experimental manager that can be used to manage/monitor the broker instance.
Not available for all transports.
-
maybe_close_channel
(channel)[source]¶ Close given channel, but ignore connection and channel errors.
-
password
= None¶
-
port
= None¶
-
qos_semantics_matches_spec
¶
-
recoverable_channel_errors
[source]¶ Recoverable channel errors.
List of channel related exceptions that can be automatically recovered from without re-establishing the connection.
-
recoverable_connection_errors
[source]¶ Recoverable connection errors.
List of connection related exceptions that can be recovered from, but where the connection must be closed and re-established first.
-
resolve_aliases
= {u'librabbitmq': u'amqp', u'pyamqp': u'amqp'}¶
-
ssl
= None¶
-
supports_heartbeats
¶
-
transport
¶
-
transport_options
= None¶ Additional transport specific options, passed on to the transport instance.
-
uri_prefix
= None¶
-
userid
= None¶
-
virtual_host
= u'/'¶
- ssl (bool) – Use SSL to connect to the server. Default is
Pools¶
See also
The shortcut methods Connection.Pool()
and
Connection.ChannelPool()
is the recommended way
to instantiate these classes.
-
class
kombu.connection.
ConnectionPool
(connection, limit=None, **kwargs)[source]¶ Pool of connections.
-
LimitExceeded
= <class 'kombu.exceptions.ConnectionLimitExceeded'>¶
-
acquire
(block=False, timeout=None)¶ Acquire resource.
Parameters: Raises: LimitExceeded
– if block is false and the limit has been exceeded.
-
release
(resource)¶
-
force_close_all
()¶ Close and remove all resources in the pool (also those in use).
Used to close resources from parent processes after fork (e.g. sockets/connections).
-
-
class
kombu.connection.
ChannelPool
(connection, limit=None, **kwargs)[source]¶ Pool of channels.
-
LimitExceeded
= <class 'kombu.exceptions.ChannelLimitExceeded'>¶
-
acquire
(block=False, timeout=None)¶ Acquire resource.
Parameters: Raises: LimitExceeded
– if block is false and the limit has been exceeded.
-
release
(resource)¶
-
force_close_all
()¶ Close and remove all resources in the pool (also those in use).
Used to close resources from parent processes after fork (e.g. sockets/connections).
-