This document describes the current stable version of Kombu (4.0). For development docs, go here.

librabbitmq AMQP transport - kombu.transport.librabbitmq

librabbitmq transport.

Transport

class kombu.transport.librabbitmq.Transport(client, **kwargs)[source]

AMQP Transport (librabbitmq).

class Connection(host='localhost', userid='guest', password='guest', virtual_host='/', port=5672, channel_max=65535, frame_max=131072, heartbeat=0, lazy=False, **kwargs)

AMQP Connection (librabbitmq).

class Channel(connection, channel_id)

AMQP Channel (librabbitmq).

class Message(channel, props, info, body)

AMQP Message (librabbitmq).

Transport.Connection.Channel.prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)

Encapsulate data into a AMQP message.

Transport.Connection.Channel.prepare_queue_arguments(arguments, **kwargs)
class Transport.Connection.Message(channel, props, info, body)

AMQP Message (librabbitmq).

Transport.channel_errors = (<class 'amqp.exceptions.ChannelError'>, <class '_librabbitmq.ChannelError'>)
Transport.close_connection(connection)[source]

Close the AMQP broker connection.

Transport.connection_errors = (<class 'amqp.exceptions.ConnectionError'>, <class '_librabbitmq.ConnectionError'>, <class 'socket.error'>, <type 'exceptions.IOError'>, <type 'exceptions.OSError'>)
Transport.create_channel(connection)[source]
Transport.default_connection_params
Transport.default_port = 5672
Transport.default_ssl_port = 5671
Transport.drain_events(connection, **kwargs)[source]
Transport.driver_name = u'librabbitmq'
Transport.driver_type = u'amqp'
Transport.driver_version()[source]
Transport.establish_connection()[source]

Establish connection to the AMQP broker.

Transport.get_manager(*args, **kwargs)[source]
Transport.implements = {'heartbeats': False, 'exchange_type': frozenset([u'topic', u'headers', u'fanout', u'direct']), 'async': True}
Transport.qos_semantics_matches_spec(connection)[source]
Transport.register_with_event_loop(connection, loop)[source]
Transport.verify_connection(connection)[source]

Connection

class kombu.transport.librabbitmq.Connection(host='localhost', userid='guest', password='guest', virtual_host='/', port=5672, channel_max=65535, frame_max=131072, heartbeat=0, lazy=False, **kwargs)[source]

AMQP Connection (librabbitmq).

class Channel(connection, channel_id)

AMQP Channel (librabbitmq).

Consumer(*args, **kwargs)
class Message(channel, props, info, body)

AMQP Message (librabbitmq).

exception MessageStateError

The message has already been acknowledged.

args
message
Connection.Channel.Message.accept
Connection.Channel.Message.ack(multiple=False)

Acknowledge this message as being processed.

This will remove the message from the queue.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
Connection.Channel.Message.ack_log_error(logger, errors, multiple=False)
Connection.Channel.Message.acknowledged

Set to true if the message has been acknowledged.

Connection.Channel.Message.body
Connection.Channel.Message.channel
Connection.Channel.Message.content_encoding
Connection.Channel.Message.content_type
Connection.Channel.Message.decode()

Deserialize the message body.

Returning the original python structure sent by the publisher.

Note

The return value is memoized, use _decode to force re-evaluation.

Connection.Channel.Message.delivery_info
Connection.Channel.Message.delivery_tag
Connection.Channel.Message.errors = None
Connection.Channel.Message.headers
Connection.Channel.Message.payload

The decoded message body.

Connection.Channel.Message.properties
Connection.Channel.Message.reject(requeue=False)

Reject this message.

The message will be discarded by the server.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
Connection.Channel.Message.reject_log_error(logger, errors, requeue=False)
Connection.Channel.Message.requeue()

Reject this message and put it back on the queue.

Warning

You must not use this method as a means of selecting messages to process.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
Connection.Channel.Producer(*args, **kwargs)
Connection.Channel.after_reply_message_received(queue)

Callback called after RPC reply received.

Notes

Reply queue semantics: can be used to delete the queue after transient reply message received.

Connection.Channel.basic_ack(delivery_tag, multiple=False)
Connection.Channel.basic_cancel(consumer_tag, **kwargs)
Connection.Channel.basic_consume(queue='', consumer_tag=None, no_local=False, no_ack=False, exclusive=False, callback=None, arguments=None, nowait=False)
Connection.Channel.basic_get(queue='', no_ack=False)
Connection.Channel.basic_publish(body, exchange='', routing_key='', mandatory=False, immediate=False, **properties)
Connection.Channel.basic_qos(prefetch_size=0, prefetch_count=0, _global=False)
Connection.Channel.basic_recover(requeue=True)
Connection.Channel.basic_reject(delivery_tag, requeue=True)
Connection.Channel.close()
Connection.Channel.exchange_declare(exchange='', type='direct', passive=False, durable=False, auto_delete=False, arguments=None, nowait=False)

Declare exchange.

Keyword Arguments:
 auto_delete – Not recommended and so it is ignored.
Connection.Channel.exchange_delete(exchange='', if_unused=False, nowait=False)
Connection.Channel.flow(active)
Connection.Channel.get_bindings()
Connection.Channel.is_open = False
Connection.Channel.no_ack_consumers = None
Connection.Channel.prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)

Encapsulate data into a AMQP message.

Connection.Channel.prepare_queue_arguments(arguments, **kwargs)
Connection.Channel.queue_bind(queue='', exchange='', routing_key='', arguments=None, nowait=False)
Connection.Channel.queue_declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None, nowait=False)
Connection.Channel.queue_delete(queue='', if_unused=False, if_empty=False, nowait=False)

nowait argument is not supported.

Connection.Channel.queue_purge(queue, nowait=False)
Connection.Channel.queue_unbind(queue='', exchange='', routing_key='', arguments=None, nowait=False)
class Connection.Message(channel, props, info, body)

AMQP Message (librabbitmq).

exception MessageStateError

The message has already been acknowledged.

args
message
Connection.Message.accept
Connection.Message.ack(multiple=False)

Acknowledge this message as being processed.

This will remove the message from the queue.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
Connection.Message.ack_log_error(logger, errors, multiple=False)
Connection.Message.acknowledged

Set to true if the message has been acknowledged.

Connection.Message.body
Connection.Message.channel
Connection.Message.content_encoding
Connection.Message.content_type
Connection.Message.decode()

Deserialize the message body.

Returning the original python structure sent by the publisher.

Note

The return value is memoized, use _decode to force re-evaluation.

Connection.Message.delivery_info
Connection.Message.delivery_tag
Connection.Message.errors = None
Connection.Message.headers
Connection.Message.payload

The decoded message body.

Connection.Message.properties
Connection.Message.reject(requeue=False)

Reject this message.

The message will be discarded by the server.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
Connection.Message.reject_log_error(logger, errors, requeue=False)
Connection.Message.requeue()

Reject this message and put it back on the queue.

Warning

You must not use this method as a means of selecting messages to process.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
Connection.callbacks
Connection.channel(channel_id=None)[source]
Connection.channel_max
Connection.close()[source]
Connection.connect()

Establish connection to the broker.

Connection.connected
Connection.drain_events(timeout=None)[source]
Connection.fileno()

File descriptor number.

Connection.frame_max
Connection.heartbeat
Connection.hostname
Connection.password
Connection.port
Connection.reconnect()[source]
Connection.server_properties
Connection.userid
Connection.virtual_host

Channel

class kombu.transport.librabbitmq.Channel(connection, channel_id)[source]

AMQP Channel (librabbitmq).

class Message(channel, props, info, body)

AMQP Message (librabbitmq).

Channel.prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)[source]

Encapsulate data into a AMQP message.

Channel.prepare_queue_arguments(arguments, **kwargs)[source]

Message

class kombu.transport.librabbitmq.Message(channel, props, info, body)[source]

AMQP Message (librabbitmq).