kombu.connection

Broker connection and pools.

copyright:
  1. 2009 - 2012 by Ask Solem.
license:

BSD, see LICENSE for more details.

Connection

class kombu.connection.BrokerConnection(hostname='localhost', userid=None, password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, **kwargs)

A connection to the broker.

Parameters:
  • URL – Connection URL.
  • hostname – Default host name/address if not provided in the URL.
  • userid – Default user name if not provided in the URL.
  • password – Default password if not provided in the URL.
  • virtual_host – Default virtual host if not provided in the URL.
  • port – Default port if not provided in the URL.
  • ssl – Use SSL to connect to the server. Default is False. May not be supported by the specified transport.
  • transport – Default transport if not specified in the URL.
  • connect_timeout – Timeout in seconds for connecting to the server. May not be supported by the specified transport.
  • transport_options – A dict of additional connection arguments to pass to alternate kombu channel implementations. Consult the transport documentation for available options.
  • insistDeprecated

Note

The connection is established lazily when needed. If you need the connection to be established, then force it to do so using connect():

>>> conn.connect()

Remember to always close the connection:

>>> conn.release()

Attributes

connection_errors

List of exceptions that may be raised by the connection.

channel_errors

List of exceptions that may be raised by the channel.

transport
host

The host as a host name/port pair separated by colon.

connection

The underlying connection object.

Warning

This instance is transport specific, so do not depend on the interface of this object.

Methods

connect()

Establish connection to server immediately.

channel()

Request a new channel.

drain_events(**kwargs)

Wait for a single event from the server.

Parameters:timeout – Timeout in seconds before we give up. Raises socket.timeout if the timeout is exceeded.

Usually used from an event loop.

release()

Close the connection (if open).

ensure_connection(errback=None, max_retries=None, interval_start=2, interval_step=2, interval_max=30, callback=None)

Ensure we have a connection to the server.

If not retry establishing the connection with the settings specified.

Parameters:
  • errback – Optional callback called each time the connection can’t be established. Arguments provided are the exception raised and the interval that will be slept (exc, interval).
  • max_retries – Maximum number of times to retry. If this limit is exceeded the connection error will be re-raised.
  • interval_start – The number of seconds we start sleeping for.
  • interval_step – How many seconds added to the interval for each retry.
  • interval_max – Maximum number of seconds to sleep between each retry.
  • callback – Optional callback that is called for every internal iteration (1 s)
  • callback – Optional callback that is called for every internal iteration (1 s).
ensure(obj, fun, errback=None, max_retries=None, interval_start=1, interval_step=1, interval_max=1, on_revive=None)

Ensure operation completes, regardless of any channel/connection errors occurring.

Will retry by establishing the connection, and reapplying the function.

Parameters:
  • fun – Method to apply.
  • errback – Optional callback called each time the connection can’t be established. Arguments provided are the exception raised and the interval that will be slept (exc, interval).
  • max_retries – Maximum number of times to retry. If this limit is exceeded the connection error will be re-raised.
  • interval_start – The number of seconds we start sleeping for.
  • interval_step – How many seconds added to the interval for each retry.
  • interval_max – Maximum number of seconds to sleep between each retry.

Example

This is an example ensuring a publish operation:

>>> def errback(exc, interval):
...     print("Couldn't publish message: %r. Retry in %ds" % (
...             exc, interval))
>>> publish = conn.ensure(producer, producer.publish,
...                       errback=errback, max_retries=3)
>>> publish(message, routing_key)
create_transport()
get_transport_cls()

Get the currently used transport class.

clone(**kwargs)

Create a copy of the connection with the same connection settings.

info()

Get connection info.

Pool(limit=None, preload=None)

Pool of connections.

See ConnectionPool.

Parameters:
  • limit – Maximum number of active connections. Default is no limit.
  • preload – Number of connections to preload when the pool is created. Default is 0.

Example usage:

>>> pool = connection.Pool(2)
>>> c1 = pool.acquire()
>>> c2 = pool.acquire()
>>> c3 = pool.acquire()
>>> c1.release()
>>> c3 = pool.acquire()
ChannelPool(limit=None, preload=None)

Pool of channels.

See ChannelPool.

Parameters:
  • limit – Maximum number of active channels. Default is no limit.
  • preload – Number of channels to preload when the pool is created. Default is 0.

Example usage:

>>> pool = connection.ChannelPool(2)
>>> c1 = pool.acquire()
>>> c2 = pool.acquire()
>>> c3 = pool.acquire()
>>> c1.release()
>>> c3 = pool.acquire()
SimpleQueue(name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs)

Create new SimpleQueue, using a channel from this connection.

If name is a string, a queue and exchange will be automatically created using that name as the name of the queue and exchange, also it will be used as the default routing key.

Parameters:
  • name – Name of the queue/or a Queue.
  • no_ack – Disable acknowledgements. Default is false.
  • queue_opts – Additional keyword arguments passed to the constructor of the automatically created Queue.
  • exchange_opts – Additional keyword arguments passed to the constructor of the automatically created Exchange.
  • channel – Channel to use. If not specified a new channel from the current connection will be used. Remember to call close() when done with the object.
SimpleBuffer(name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs)

Create new SimpleQueue using a channel from this connection.

Same as SimpleQueue(), but configured with buffering semantics. The resulting queue and exchange will not be durable, also auto delete is enabled. Messages will be transient (not persistent), and acknowledgements are disabled (no_ack).

Pools

See also

The shortcut methods BrokerConnection.Pool() and BrokerConnection.ChannelPool() is the recommended way to instantiate these classes.

class kombu.connection.ConnectionPool(connection, limit=None, preload=None)
LimitExceeded = <class 'kombu.exceptions.ConnectionLimitExceeded'>
acquire(block=False, timeout=None)

Acquire resource.

Parameters:
  • block – If the limit is exceeded, block until there is an available item.
  • timeout – Timeout to wait if block is true. Default is None (forever).
Raises LimitExceeded:
 

if block is false and the limit has been exceeded.

release(resource)
class kombu.connection.ChannelPool(connection, limit=None, preload=None)
LimitExceeded = <class 'kombu.exceptions.ChannelLimitExceeded'>
acquire(block=False, timeout=None)

Acquire resource.

Parameters:
  • block – If the limit is exceeded, block until there is an available item.
  • timeout – Timeout to wait if block is true. Default is None (forever).
Raises LimitExceeded:
 

if block is false and the limit has been exceeded.

release(resource)

Table Of Contents

Previous topic

API Reference

Next topic

kombu.simple

This Page