This document describes the current stable version of Kombu (4.0). For development docs, go here.

Connection/Producer Pools - kombu.pools

Public resource pools.

class kombu.pools.ProducerPool(connections, *args, **kwargs)[source]

Pool of kombu.Producer instances.

class Producer(channel, exchange=None, routing_key=None, serializer=None, auto_declare=None, compression=None, on_return=None)

Message Producer.

Parameters:
  • channel (kombu.Connection, ChannelT) – Connection or channel.
  • exchange (Exchange, str) – Optional default exchange.
  • routing_key (str) – Optional default routing key.
  • serializer (str) – Default serializer. Default is “json”.
  • compression (str) – Default compression method. Default is no compression.
  • auto_declare (bool) – Automatically declare the default exchange at instantiation. Default is True.
  • on_return (Callable) – Callback to call for undeliverable messages, when the mandatory or immediate arguments to publish() is used. This callback needs the following signature: (exception, exchange, routing_key, message). Note that the producer needs to drain events to use this feature.
auto_declare = True
channel
close()
compression = None
connection
declare()

Declare the exchange.

Note

This happens automatically at instantiation when the auto_declare flag is enabled.

exchange = None
maybe_declare(entity, retry=False, **retry_policy)

Declare exchange if not already declared during this session.

on_return = None
publish(body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, retry_policy=None, declare=None, expiration=None, **properties)

Publish message to the specified exchange.

Parameters:
  • body (Any) – Message body.
  • routing_key (str) – Message routing key.
  • delivery_mode (enum) – See delivery_mode.
  • mandatory (bool) – Currently not supported.
  • immediate (bool) – Currently not supported.
  • priority (int) – Message priority. A number between 0 and 9.
  • content_type (str) – Content type. Default is auto-detect.
  • content_encoding (str) – Content encoding. Default is auto-detect.
  • serializer (str) – Serializer to use. Default is auto-detect.
  • compression (str) – Compression method to use. Default is none.
  • headers (Dict) – Mapping of arbitrary headers to pass along with the message body.
  • exchange (Exchange, str) – Override the exchange. Note that this exchange must have been declared.
  • declare (Sequence[EntityT]) – Optional list of required entities that must have been declared before publishing the message. The entities will be declared using maybe_declare().
  • retry (bool) – Retry publishing, or declaring entities if the connection is lost.
  • retry_policy (Dict) – Retry configuration, this is the keywords supported by ensure().
  • expiration (float) – A TTL in seconds can be specified per message. Default is no expiration.
  • **properties (Any) – Additional message properties, see AMQP spec.
release()
revive(channel)

Revive the producer after connection loss.

routing_key = u''
serializer = None
ProducerPool.close_after_fork = True
ProducerPool.close_resource(resource)[source]
ProducerPool.create_producer()[source]
ProducerPool.new()[source]
ProducerPool.prepare(p)[source]
ProducerPool.release(resource)[source]
ProducerPool.setup()[source]
class kombu.pools.PoolGroup(limit=None, close_after_fork=True)[source]

Collection of resource pools.

create(resource, limit)[source]
kombu.pools.register_group(group)[source]

Register group (can be used as decorator).

kombu.pools.get_limit()[source]

Get current connection pool limit.

kombu.pools.set_limit(limit, force=False, reset_after=False, ignore_errors=False)[source]

Set new connection pool limit.

kombu.pools.reset(*args, **kwargs)[source]

Reset all pools by closing open resources.