kombu.transport.virtual

Virtual transport implementation.

Emulates the AMQ API for non-AMQ transports.

copyright:
  1. 2009, 2012 by Ask Solem.
license:

BSD, see LICENSE for more details.

Transports

class kombu.transport.virtual.Transport(client, **kwargs)

Virtual transport.

Parameters:clientBrokerConnection instance
Channel = <class 'kombu.transport.virtual.Channel'>
Cycle = <class 'kombu.transport.virtual.scheduling.FairCycle'>
polling_interval = 1.0

Time to sleep between unsuccessful polls.

default_port = None

port number used when no port is specified.

state = <kombu.transport.virtual.BrokerState object at 0x4213a10>

BrokerState containing declared exchanges and bindings (set by constructor).

cycle = None

FairCycle instance used to fairly drain events from channels (set by constructor).

establish_connection()
close_connection(connection)
create_channel(connection)
close_channel(channel)
drain_events(connection, timeout=None)

Channel

class kombu.transport.virtual.AbstractChannel

This is an abstract class defining the channel methods you’d usually want to implement in a virtual channel.

Do not subclass directly, but rather inherit from Channel instead.

class kombu.transport.virtual.Channel(connection, **kwargs)

Virtual channel.

Parameters:connection – The transport instance this channel is part of.
Message = <class 'kombu.transport.virtual.Message'>

message class used.

state

Broker state containing exchanges and bindings.

qos

QoS manager for this channel.

do_restore = True

flag to restore unacked messages when channel goes out of scope.

exchange_types = {'topic': <class 'kombu.transport.virtual.exchange.TopicExchange'>, 'fanout': <class 'kombu.transport.virtual.exchange.FanoutExchange'>, 'direct': <class 'kombu.transport.virtual.exchange.DirectExchange'>}

mapping of exchange types and corresponding classes.

exchange_declare(exchange, type='direct', durable=False, auto_delete=False, arguments=None, nowait=False)

Declare exchange.

exchange_delete(exchange, if_unused=False, nowait=False)

Delete exchange and all its bindings.

queue_declare(queue, passive=False, **kwargs)

Declare queue.

queue_delete(queue, if_unusued=False, if_empty=False, **kwargs)

Delete queue.

queue_bind(queue, exchange, routing_key='', arguments=None, **kwargs)

Bind queue to exchange with routing key.

queue_purge(queue, **kwargs)

Remove all ready messages from queue.

basic_publish(message, exchange, routing_key, **kwargs)

Publish message.

basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)

Consume from queue

basic_cancel(consumer_tag)

Cancel consumer by consumer tag.

basic_get(queue, **kwargs)

Get message by direct access (synchronous).

basic_ack(delivery_tag)

Acknowledge message.

basic_recover(requeue=False)

Recover unacked messages.

basic_reject(delivery_tag, requeue=False)

Reject message.

basic_qos(prefetch_size=0, prefetch_count=0, apply_global=False)

Change QoS settings for this channel.

Only prefetch_count is supported.

get_table(exchange)

Get table of bindings for exchange.

typeof(exchange)

Get the exchange type instance for exchange.

drain_events(timeout=None)
prepare_message(message_data, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)

Prepare message data.

message_to_python(raw_message)

Convert raw message to Message instance.

flow(active=True)

Enable/disable message flow.

Raises NotImplementedError:
 as flow is not implemented by the base virtual implementation.
close()

Close channel, cancel all consumers, and requeue unacked messages.

Message

class kombu.transport.virtual.Message(channel, payload, **kwargs)
exception MessageStateError

The message has already been acknowledged.

args
message
Message.ack()

Acknowledge this message as being processed., This will remove the message from the queue.

Raises MessageStateError:
 If the message has already been acknowledged/requeued/rejected.
Message.ack_log_error(logger, errors)
Message.acknowledged

Set to true if the message has been acknowledged.

Message.body
Message.channel
Message.content_encoding
Message.content_type
Message.decode()

Deserialize the message body, returning the original python structure sent by the publisher.

Message.delivery_info
Message.delivery_tag
Message.headers
Message.payload

The decoded message body.

Message.properties
Message.reject()

Reject this message.

The message will be discarded by the server.

Raises MessageStateError:
 If the message has already been acknowledged/requeued/rejected.
Message.reject_log_error(logger, errors)
Message.requeue()

Reject this message and put it back on the queue.

You must not use this method as a means of selecting messages to process.

Raises MessageStateError:
 If the message has already been acknowledged/requeued/rejected.
Message.serializable()

Quality Of Service

class kombu.transport.virtual.QoS(channel, prefetch_count=0)

Quality of Service guarantees.

Only supports prefetch_count at this point.

Parameters:
  • channel – AMQ Channel.
  • prefetch_count – Initial prefetch count (defaults to 0).
ack(delivery_tag)

Acknowledge message and remove from transactional state.

append(message, delivery_tag)

Append message to transactional state.

can_consume()

Returns true if the channel can be consumed from.

Used to ensure the client adhers to currently active prefetch limits.

get(delivery_tag)
prefetch_count = 0

current prefetch count value

reject(delivery_tag, requeue=False)

Remove from transactional state and requeue message.

restore_at_shutdown = True

If disabled, unacked messages won’t be restored at shutdown.

restore_unacked()

Restore all unacknowledged messages.

restore_unacked_once()

Restores all unacknowledged message at shutdown/gc collect.

Will only be done once for each instance.

In-memory State

class kombu.transport.virtual.BrokerState(exchanges=None, bindings=None)
bindings = None

active bindings.

clear()
exchanges = None

exchange declarations.

Table Of Contents

Previous topic

kombu.transport.base

Next topic

kombu.transport.virtual.exchange

This Page