This document describes the current stable version of Kombu (4.0). For development docs, go here.

Amazon SQS Transport - kombu.transport.SQS

Amazon SQS Transport.

Amazon SQS transport module for Kombu. This package implements an AMQP-like interface on top of Amazons SQS service, with the goal of being optimized for high performance and reliability.

The default settings for this module are focused now on high performance in task queue situations where tasks are small, idempotent and run very fast.

SQS Features supported by this transport:
Long Polling:
http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/
sqs-long-polling.html

Long polling is enabled by setting the wait_time_seconds transport option to a number > 1. Amazon supports up to 20 seconds. This is enabled with 10 seconds by default.

Batch API Actions:
http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/
sqs-batch-api.html

The default behavior of the SQS Channel.drain_events() method is to request up to the ‘prefetch_count’ messages on every request to SQS. These messages are stored locally in a deque object and passed back to the Transport until the deque is empty, before triggering a new API call to Amazon.

This behavior dramatically speeds up the rate that you can pull tasks from SQS when you have short-running tasks (or a large number of workers).

When a Celery worker has multiple queues to monitor, it will pull down up to ‘prefetch_count’ messages from queueA and work on them all before moving on to queueB. If queueB is empty, it will wait up until ‘polling_interval’ expires before moving back and checking on queueA.

Transport

class kombu.transport.SQS.Transport(client, **kwargs)[source]

SQS Transport.

class Channel(*args, **kwargs)

SQS Channel.

asynsqs
basic_ack(delivery_tag, multiple=False)
basic_cancel(consumer_tag)
basic_consume(queue, no_ack, *args, **kwargs)
close()
conninfo
default_region = u'us-east-1'
default_visibility_timeout = 1800
default_wait_time_seconds = 10
domain_format = u'kombu%(vhost)s'
drain_events(timeout=None)

Return a single payload message from one of our queues.

Raises:Queue.Empty – if no messages available.
entity_name(name, table={33: 95, 34: 95, 35: 95, 36: 95, 37: 95, 38: 95, 39: 95, 40: 95, 41: 95, 42: 95, 43: 95, 44: 95, 46: 45, 47: 95, 58: 95, 59: 95, 60: 95, 61: 95, 62: 95, 63: 95, 64: 95, 91: 95, 92: 95, 93: 95, 94: 95, 96: 95, 123: 95, 124: 95, 125: 95, 126: 95})

Format AMQP queue name into a legal SQS queue name.

is_secure
port
queue_name_prefix
region
regioninfo
sqs
supports_fanout
transport_options
visibility_timeout
wait_time_seconds
Transport.channel_errors = (<class 'amqp.exceptions.ChannelError'>, <class 'kombu.async.aws.ext.BotoError'>)
Transport.connection_errors = (<class 'amqp.exceptions.ConnectionError'>, <class 'kombu.async.aws.ext.BotoError'>, <class 'socket.error'>)
Transport.default_port = None
Transport.driver_name = u'sqs'
Transport.driver_type = u'sqs'
Transport.implements = {'async': True, 'exchange_type': frozenset([u'direct']), 'heartbeats': False}
Transport.polling_interval = 1
Transport.wait_time_seconds = 0

Channel

class kombu.transport.SQS.Channel(*args, **kwargs)[source]

SQS Channel.

asynsqs
basic_ack(delivery_tag, multiple=False)[source]
basic_cancel(consumer_tag)[source]
basic_consume(queue, no_ack, *args, **kwargs)[source]
close()[source]
conninfo
default_region = u'us-east-1'
default_visibility_timeout = 1800
default_wait_time_seconds = 10
domain_format = u'kombu%(vhost)s'
drain_events(timeout=None)[source]

Return a single payload message from one of our queues.

Raises:Queue.Empty – if no messages available.
entity_name(name, table={33: 95, 34: 95, 35: 95, 36: 95, 37: 95, 38: 95, 39: 95, 40: 95, 41: 95, 42: 95, 43: 95, 44: 95, 46: 45, 47: 95, 58: 95, 59: 95, 60: 95, 61: 95, 62: 95, 63: 95, 64: 95, 91: 95, 92: 95, 93: 95, 94: 95, 96: 95, 123: 95, 124: 95, 125: 95, 126: 95})[source]

Format AMQP queue name into a legal SQS queue name.

is_secure[source]
port[source]
queue_name_prefix[source]
region[source]
regioninfo[source]
sqs
supports_fanout[source]
transport_options
visibility_timeout[source]
wait_time_seconds[source]