This document describes the current stable version of Kombu (4.0). For development docs, go here.

Virtual Transport Base Class - kombu.transport.virtual

Transports

class kombu.transport.virtual.Transport(client, **kwargs)[source]

Virtual transport.

Parameters:client (kombu.Connection) – The client this is a transport for.
Channel = <class 'kombu.transport.virtual.base.Channel'>
Cycle = <class 'kombu.utils.scheduling.FairCycle'>
polling_interval = 1.0
default_port = None
state = <kombu.transport.virtual.base.BrokerState object>
cycle = None
establish_connection()[source]
close_connection(connection)[source]
create_channel(connection)[source]
close_channel(channel)[source]
drain_events(connection, timeout=None)[source]

Channel

class kombu.transport.virtual.AbstractChannel[source]

Abstract channel interface.

This is an abstract class defining the channel methods you’d usually want to implement in a virtual channel.

Note

Do not subclass directly, but rather inherit from Channel.

class kombu.transport.virtual.Channel(connection, **kwargs)[source]

Virtual channel.

Parameters:connection (ConnectionT) – The transport instance this channel is part of.
Message = <class 'kombu.transport.virtual.base.Message'>
state

Broker state containing exchanges and bindings.

qos

QoS manager for this channel.

do_restore = True
exchange_types = {u'topic': <class 'kombu.transport.virtual.exchange.TopicExchange'>, u'fanout': <class 'kombu.transport.virtual.exchange.FanoutExchange'>, u'direct': <class 'kombu.transport.virtual.exchange.DirectExchange'>}
exchange_declare(exchange=None, type=u'direct', durable=False, auto_delete=False, arguments=None, nowait=False, passive=False)[source]

Declare exchange.

exchange_delete(exchange, if_unused=False, nowait=False)[source]

Delete exchange and all its bindings.

queue_declare(queue=None, passive=False, **kwargs)[source]

Declare queue.

queue_delete(queue, if_unused=False, if_empty=False, **kwargs)[source]

Delete queue.

queue_bind(queue, exchange=None, routing_key=u'', arguments=None, **kwargs)[source]

Bind queue to exchange with routing key.

queue_purge(queue, **kwargs)[source]

Remove all ready messages from queue.

basic_publish(message, exchange, routing_key, **kwargs)[source]

Publish message.

basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)[source]

Consume from queue.

basic_cancel(consumer_tag)[source]

Cancel consumer by consumer tag.

basic_get(queue, no_ack=False, **kwargs)[source]

Get message by direct access (synchronous).

basic_ack(delivery_tag, multiple=False)[source]

Acknowledge message.

basic_recover(requeue=False)[source]

Recover unacked messages.

basic_reject(delivery_tag, requeue=False)[source]

Reject message.

basic_qos(prefetch_size=0, prefetch_count=0, apply_global=False)[source]

Change QoS settings for this channel.

Note

Only prefetch_count is supported.

get_table(exchange)[source]

Get table of bindings for exchange.

typeof(exchange, default=u'direct')[source]

Get the exchange type instance for exchange.

drain_events(timeout=None, callback=None)[source]
prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)[source]

Prepare message data.

message_to_python(raw_message)[source]

Convert raw message to Message instance.

flow(active=True)[source]

Enable/disable message flow.

Raises:NotImplementedError – as flow is not implemented by the base virtual implementation.
close()[source]

Close channel.

Cancel all consumers, and requeue unacked messages.

Message

class kombu.transport.virtual.Message(payload, channel=None, **kwargs)[source]

Message object.

exception MessageStateError

The message has already been acknowledged.

args
message
Message.accept
Message.ack(multiple=False)[source]

Acknowledge this message as being processed.

This will remove the message from the queue.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
Message.ack_log_error(logger, errors, multiple=False)[source]
Message.acknowledged

Set to true if the message has been acknowledged.

Message.body
Message.channel
Message.content_encoding
Message.content_type
Message.decode()[source]

Deserialize the message body.

Returning the original python structure sent by the publisher.

Note

The return value is memoized, use _decode to force re-evaluation.

Message.delivery_info
Message.delivery_tag
Message.errors = None
Message.headers
Message.payload

The decoded message body.

Message.properties
Message.reject(requeue=False)[source]

Reject this message.

The message will be discarded by the server.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
Message.reject_log_error(logger, errors, requeue=False)[source]
Message.requeue()[source]

Reject this message and put it back on the queue.

Warning

You must not use this method as a means of selecting messages to process.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
Message.serializable()[source]

Quality Of Service

class kombu.transport.virtual.QoS(channel, prefetch_count=0)[source]

Quality of Service guarantees.

Only supports prefetch_count at this point.

Parameters:
  • channel (ChannelT) – Connection channel.
  • prefetch_count (int) – Initial prefetch count (defaults to 0).
ack(delivery_tag)[source]

Acknowledge message and remove from transactional state.

append(message, delivery_tag)[source]

Append message to transactional state.

can_consume()[source]

Return true if the channel can be consumed from.

Used to ensure the client adhers to currently active prefetch limits.

can_consume_max_estimate()[source]

Return the maximum number of messages allowed to be returned.

Returns an estimated number of messages that a consumer may be allowed to consume at once from the broker. This is used for services where bulk ‘get message’ calls are preferred to many individual ‘get message’ calls - like SQS.

Returns:greater than zero.
Return type:int
get(delivery_tag)[source]
prefetch_count = 0
reject(delivery_tag, requeue=False)[source]

Remove from transactional state and requeue message.

restore_at_shutdown = True
restore_unacked()[source]

Restore all unacknowledged messages.

restore_unacked_once(stderr=None)[source]

Restore all unacknowledged messages at shutdown/gc collect.

Note

Can only be called once for each instance, subsequent calls will be ignored.

restore_visible(*args, **kwargs)[source]

Restore any pending unackwnowledged messages.

To be filled in for visibility_timeout style implementations.

Note

This is implementation optional, and currently only used by the Redis transport.

In-memory State

class kombu.transport.virtual.BrokerState(exchanges=None)[source]

Broker state holds exchanges, queues and bindings.

binding_declare(queue, exchange, routing_key, arguments)[source]
binding_delete(queue, exchange, routing_key)[source]
bindings = None
clear()[source]
exchanges = None
has_binding(queue, exchange, routing_key)[source]
queue_bindings(queue)[source]
queue_bindings_delete(queue)[source]
queue_index = None