This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

Amazon SQS Transport - kombu.transport.SQS

Amazon SQS transport module for Kombu.

This package implements an AMQP-like interface on top of Amazons SQS service, with the goal of being optimized for high performance and reliability.

The default settings for this module are focused now on high performance in task queue situations where tasks are small, idempotent and run very fast.

SQS Features supported by this transport

Long Polling

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html

Long polling is enabled by setting the wait_time_seconds transport option to a number > 1. Amazon supports up to 20 seconds. This is enabled with 10 seconds by default.

Batch API Actions

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-batch-api.html

The default behavior of the SQS Channel.drain_events() method is to request up to the ‘prefetch_count’ messages on every request to SQS. These messages are stored locally in a deque object and passed back to the Transport until the deque is empty, before triggering a new API call to Amazon.

This behavior dramatically speeds up the rate that you can pull tasks from SQS when you have short-running tasks (or a large number of workers).

When a Celery worker has multiple queues to monitor, it will pull down up to ‘prefetch_count’ messages from queueA and work on them all before moving on to queueB. If queueB is empty, it will wait up until ‘polling_interval’ expires before moving back and checking on queueA.

Other Features supported by this transport

Predefined Queues

The default behavior of this transport is to use a single AWS credential pair in order to manage all SQS queues (e.g. listing queues, creating queues, polling queues, deleting messages).

If it is preferable for your environment to use multiple AWS credentials, you can use the ‘predefined_queues’ setting inside the ‘transport_options’ map. This setting allows you to specify the SQS queue URL and AWS credentials for each of your queues. For example, if you have two queues which both already exist in AWS) you can tell this transport about them as follows:

transport_options = {
  'predefined_queues': {
    'queue-1': {
      'url': 'https://sqs.us-east-1.amazonaws.com/xxx/aaa',
      'access_key_id': 'a',
      'secret_access_key': 'b',
      'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, # optional
      'backoff_tasks': ['svc.tasks.tasks.task1'] # optional
    },
    'queue-2': {
      'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb',
      'access_key_id': 'c',
      'secret_access_key': 'd',
      'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, # optional
      'backoff_tasks': ['svc.tasks.tasks.task2'] # optional
    },
  }
'sts_role_arn': 'arn:aws:iam::<xxx>:role/STSTest', # optional
'sts_token_timeout': 900 # optional
}

backoff_policy & backoff_tasks are optional arguments. These arguments automatically change the message visibility timeout, in order to have different times between specific task retries. This would apply after task failure.

AWS STS authentication is supported, by using sts_role_arn, and sts_token_timeout. sts_role_arn is the assumed IAM role ARN we are trying to access with. sts_token_timeout is the token timeout, defaults (and minimum) to 900 seconds. After the mentioned period, a new token will be created.

If you authenticate using Okta (e.g. calling gimme-aws-creds), you can also specify a ‘session_token’ to connect to a queue. Note that those tokens have a limited lifetime and are therefore only suited for short-lived tests.

Client config

In some cases you may need to override the botocore config. You can do it as follows:

transport_option = {
  'client-config': {
      'connect_timeout': 5,
   },
}

For a complete list of settings you can adjust using this option see https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html

Features

  • Type: Virtual

  • Supports Direct: Yes

  • Supports Topic: Yes

  • Supports Fanout: Yes

  • Supports Priority: No

  • Supports TTL: No

Transport

class kombu.transport.SQS.Transport(client, **kwargs)[source]

SQS Transport.

Additional queue attributes can be supplied to SQS during queue creation by passing an sqs-creation-attributes key in transport_options. sqs-creation-attributes must be a dict whose key-value pairs correspond with Attributes in the CreateQueue SQS API.

For example, to have SQS queues created with server-side encryption enabled using the default Amazon Managed Customer Master Key, you can set KmsMasterKeyId Attribute. When the queue is initially created by Kombu, encryption will be enabled.

from kombu.transport.SQS import Transport

transport = Transport(
    ...,
    transport_options={
        'sqs-creation-attributes': {
            'KmsMasterKeyId': 'alias/aws/sqs',
        },
    }
)
class Channel(*args, **kwargs)

SQS Channel.

class QoS(channel, prefetch_count=0)

Quality of Service guarantees implementation for SQS.

apply_backoff_policy(routing_key, delivery_tag, backoff_policy, backoff_tasks)
static extract_task_name_and_number_of_retries(message)
reject(delivery_tag, requeue=False)

Remove from transactional state and requeue message.

asynsqs(queue=None)
basic_ack(delivery_tag, multiple=False)

Acknowledge message.

basic_cancel(consumer_tag)

Cancel consumer by consumer tag.

basic_consume(queue, no_ack, *args, **kwargs)

Consume from queue.

canonical_queue_name(queue_name)
close()

Close channel.

Cancel all consumers, and requeue unacked messages.

property conninfo
default_region = 'us-east-1'
default_visibility_timeout = 1800
default_wait_time_seconds = 10
domain_format = 'kombu%(vhost)s'
drain_events(timeout=None, callback=None, **kwargs)

Return a single payload message from one of our queues.

Raises

Queue.Empty – if no messages available.

endpoint_url
entity_name(name, table={33: 95, 34: 95, 35: 95, 36: 95, 37: 95, 38: 95, 39: 95, 40: 95, 41: 95, 42: 95, 43: 95, 44: 95, 46: 45, 47: 95, 58: 95, 59: 95, 60: 95, 61: 95, 62: 95, 63: 95, 64: 95, 91: 95, 92: 95, 93: 95, 94: 95, 96: 95, 123: 95, 124: 95, 125: 95, 126: 95})

Format AMQP queue name into a legal SQS queue name.

generate_sts_session_token(role_arn, token_expiry_seconds)
is_secure
new_sqs_client(region, access_key_id, secret_access_key, session_token=None)
port
predefined_queues

Map of queue_name to predefined queue settings.

queue_name_prefix
region
regioninfo
sqs(queue=None)
supports_fanout

flag set if the channel supports fanout exchanges.

property transport_options
visibility_timeout
wait_time_seconds
channel_errors = (<class 'amqp.exceptions.ChannelError'>, <class 'botocore.exceptions.BotoCoreError'>)

Tuple of errors that can happen due to channel/method failure.

connection_errors = (<class 'amqp.exceptions.ConnectionError'>, <class 'botocore.exceptions.BotoCoreError'>, <class 'OSError'>)

Tuple of errors that can happen due to connection failure.

property default_connection_params
default_port = None

port number used when no port is specified.

driver_name = 'sqs'

Name of driver library (e.g. ‘py-amqp’, ‘redis’).

driver_type = 'sqs'

Type of driver, can be used to separate transports using the AMQP protocol (driver_type: ‘amqp’), Redis (driver_type: ‘redis’), etc…

implements = {'asynchronous': True, 'exchange_type': frozenset({'direct'}), 'heartbeats': False}
polling_interval = 1

Time to sleep between unsuccessful polls.

wait_time_seconds = 0

Channel

class kombu.transport.SQS.Channel(*args, **kwargs)[source]

SQS Channel.

class QoS(channel, prefetch_count=0)

Quality of Service guarantees implementation for SQS.

apply_backoff_policy(routing_key, delivery_tag, backoff_policy, backoff_tasks)
static extract_task_name_and_number_of_retries(message)
reject(delivery_tag, requeue=False)

Remove from transactional state and requeue message.

asynsqs(queue=None)[source]
basic_ack(delivery_tag, multiple=False)[source]

Acknowledge message.

basic_cancel(consumer_tag)[source]

Cancel consumer by consumer tag.

basic_consume(queue, no_ack, *args, **kwargs)[source]

Consume from queue.

canonical_queue_name(queue_name)[source]
close()[source]

Close channel.

Cancel all consumers, and requeue unacked messages.

property conninfo
default_region = 'us-east-1'
default_visibility_timeout = 1800
default_wait_time_seconds = 10
domain_format = 'kombu%(vhost)s'
drain_events(timeout=None, callback=None, **kwargs)[source]

Return a single payload message from one of our queues.

Raises

Queue.Empty – if no messages available.

endpoint_url
entity_name(name, table={33: 95, 34: 95, 35: 95, 36: 95, 37: 95, 38: 95, 39: 95, 40: 95, 41: 95, 42: 95, 43: 95, 44: 95, 46: 45, 47: 95, 58: 95, 59: 95, 60: 95, 61: 95, 62: 95, 63: 95, 64: 95, 91: 95, 92: 95, 93: 95, 94: 95, 96: 95, 123: 95, 124: 95, 125: 95, 126: 95})[source]

Format AMQP queue name into a legal SQS queue name.

generate_sts_session_token(role_arn, token_expiry_seconds)[source]
is_secure
new_sqs_client(region, access_key_id, secret_access_key, session_token=None)[source]
port
predefined_queues

Map of queue_name to predefined queue settings.

queue_name_prefix
region
regioninfo
sqs(queue=None)[source]
supports_fanout

flag set if the channel supports fanout exchanges.

property transport_options
visibility_timeout
wait_time_seconds

Back-off policy

Back-off policy is using SQS visibility timeout mechanism altering the time difference between task retries. The mechanism changes message specific visibility timeout from queue Default visibility timeout to policy configured timeout. The number of retries is managed by SQS (specifically by the ApproximateReceiveCount message attribute) and no further action is required by the user.

Configuring the queues and backoff policy:

broker_transport_options = {
    'predefined_queues': {
        'my-q': {
            'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
            'access_key_id': 'xxx',
            'secret_access_key': 'xxx',
            'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640},
            'backoff_tasks': ['svc.tasks.tasks.task1']
        }
    }
}

backoff_policy dictionary where key is number of retries, and value is delay seconds between retries (i.e SQS visibility timeout) backoff_tasks list of task names to apply the above policy

The above policy:

Attempt

Delay

2nd attempt

20 seconds

3rd attempt

40 seconds

4th attempt

80 seconds

5th attempt

320 seconds

6th attempt

640 seconds