This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 4.1.

Redis Transport - kombu.transport.redis

Redis transport.

Transport

class kombu.transport.redis.Transport(*args, **kwargs)[source]

Redis Transport.

class Channel(*args, **kwargs)

Redis Channel.

class QoS(*args, **kwargs)

Redis Ack Emulation.

ack(delivery_tag)
append(message, delivery_tag)
pipe_or_acquire(**kwds)
reject(delivery_tag, requeue=False)
restore_at_shutdown = True
restore_by_tag(tag, client=None, leftmost=False)
restore_unacked(client=None)
restore_visible(start=0, num=10, interval=10)
unacked_index_key
unacked_key
unacked_mutex_expire
unacked_mutex_key
visibility_timeout
ack_emulation = True
active_queues

Set of queues being consumed from (excluding fanout queues).

async_pool
basic_cancel(consumer_tag)
basic_consume(queue, *args, **kwargs)
client

Client used to publish messages, BRPOP etc.

close()
conn_or_acquire(**kwds)
connection_class = None
fanout_patterns = True
fanout_prefix = True
from_transport_options = (u'body_encoding', u'deadletter_queue', u'ack_emulation', u'unacked_key', u'unacked_index_key', u'unacked_mutex_key', u'unacked_mutex_expire', u'visibility_timeout', u'unacked_restore_limit', u'fanout_prefix', u'fanout_patterns', u'socket_timeout', u'socket_connect_timeout', u'socket_keepalive', u'socket_keepalive_options', u'queue_order_strategy', u'max_connections', u'priority_steps')
get_table(exchange)
keyprefix_fanout = u'/{db}.'
keyprefix_queue = u'_kombu.binding.%s'
max_connections = 10
pool
priority(n)
priority_steps = [0, 3, 6, 9]
queue_order_strategy = u'round_robin'
sep = u'\x06\x16'
socket_connect_timeout = None
socket_keepalive = None
socket_keepalive_options = None
socket_timeout = None
subclient

Pub/Sub connection used to consume fanout queues.

supports_fanout = True
unacked_index_key = u'unacked_index'
unacked_key = u'unacked'
unacked_mutex_expire = 300
unacked_mutex_key = u'unacked_mutex'
unacked_restore_limit = None
visibility_timeout = 3600
default_port = 6379
driver_name = u'redis'
driver_type = u'redis'
driver_version()[source]
implements = {'asynchronous': True, 'exchange_type': frozenset([u'topic', u'fanout', u'direct']), 'heartbeats': False}
on_readable(fileno)[source]

Handle AIO event for one of our file descriptors.

polling_interval = None
register_with_event_loop(connection, loop)[source]

Channel

class kombu.transport.redis.Channel(*args, **kwargs)[source]

Redis Channel.

class QoS(*args, **kwargs)

Redis Ack Emulation.

ack(delivery_tag)
append(message, delivery_tag)
pipe_or_acquire(**kwds)
reject(delivery_tag, requeue=False)
restore_at_shutdown = True
restore_by_tag(tag, client=None, leftmost=False)
restore_unacked(client=None)
restore_visible(start=0, num=10, interval=10)
unacked_index_key
unacked_key
unacked_mutex_expire
unacked_mutex_key
visibility_timeout
ack_emulation = True
active_queues

Set of queues being consumed from (excluding fanout queues).

async_pool
basic_cancel(consumer_tag)[source]
basic_consume(queue, *args, **kwargs)[source]
client[source]

Client used to publish messages, BRPOP etc.

close()[source]
conn_or_acquire(**kwds)[source]
connection_class = None
fanout_patterns = True

If enabled the fanout exchange will support patterns in routing and binding keys (like a topic exchange but using PUB/SUB).

Enabled by default since Kombu 4.x. Disable for backwards compatibility with Kombu 3.x.

fanout_prefix = True

Transport option to disable fanout keyprefix. Can also be string, in which case it changes the default prefix (‘/{db}.’) into to something else. The prefix must include a leading slash and a trailing dot.

Enabled by default since Kombu 4.x. Disable for backwards compatibility with Kombu 3.x.

from_transport_options = (u'body_encoding', u'deadletter_queue', u'ack_emulation', u'unacked_key', u'unacked_index_key', u'unacked_mutex_key', u'unacked_mutex_expire', u'visibility_timeout', u'unacked_restore_limit', u'fanout_prefix', u'fanout_patterns', u'socket_timeout', u'socket_connect_timeout', u'socket_keepalive', u'socket_keepalive_options', u'queue_order_strategy', u'max_connections', u'priority_steps')
get_table(exchange)[source]
keyprefix_fanout = u'/{db}.'
keyprefix_queue = u'_kombu.binding.%s'
max_connections = 10
pool
priority(n)[source]
priority_steps = [0, 3, 6, 9]
queue_order_strategy = u'round_robin'

Order in which we consume from queues.

Can be either string alias, or a cycle strategy class

  • round_robin (round_robin_cycle).

    Make sure each queue has an equal opportunity to be consumed from.

  • sorted (sorted_cycle).

    Consume from queues in alphabetical order. If the first queue in the sorted list always contains messages, then the rest of the queues will never be consumed from.

  • priority (priority_cycle).

    Consume from queues in original order, so that if the first queue always contains messages, the rest of the queues in the list will never be consumed from.

The default is to consume from queues in round robin.

sep = u'\x06\x16'
socket_connect_timeout = None
socket_keepalive = None
socket_keepalive_options = None
socket_timeout = None
subclient[source]

Pub/Sub connection used to consume fanout queues.

supports_fanout = True
unacked_index_key = u'unacked_index'
unacked_key = u'unacked'
unacked_mutex_expire = 300
unacked_mutex_key = u'unacked_mutex'
unacked_restore_limit = None
visibility_timeout = 3600