This document describes the current stable version of Kombu (4.0). For development docs, go here.
Source code for kombu.async.aws.sqs.connection
# -*- coding: utf-8 -*-
"""Amazon SQS Connection."""
from __future__ import absolute_import, unicode_literals
from vine import transform
from kombu.async.aws.connection import AsyncAWSQueryConnection
from kombu.async.aws.ext import RegionInfo
from .ext import boto, Attributes, BatchResults, SQSConnection
from .message import AsyncMessage
from .queue import AsyncQueue
__all__ = ['AsyncSQSConnection']
[docs]class AsyncSQSConnection(AsyncAWSQueryConnection, SQSConnection):
"""Async SQS Connection."""
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
proxy_user=None, proxy_pass=None, debug=0,
https_connection_factory=None, region=None, *args, **kwargs):
if boto is None:
raise ImportError('boto is not installed')
self.region = region or RegionInfo(
self, self.DefaultRegionName, self.DefaultRegionEndpoint,
connection_cls=type(self),
)
AsyncAWSQueryConnection.__init__(
self,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
is_secure=is_secure, port=port,
proxy=proxy, proxy_port=proxy_port,
proxy_user=proxy_user, proxy_pass=proxy_pass,
host=self.region.endpoint, debug=debug,
https_connection_factory=https_connection_factory, **kwargs
)
[docs] def create_queue(self, queue_name,
visibility_timeout=None, callback=None):
params = {'QueueName': queue_name}
if visibility_timeout:
params['DefaultVisibilityTimeout'] = format(
visibility_timeout, 'd',
)
return self.get_object('CreateQueue', params, AsyncQueue,
callback=callback)
[docs] def delete_queue(self, queue, force_deletion=False, callback=None):
return self.get_status('DeleteQueue', None, queue.id,
callback=callback)
[docs] def get_queue_attributes(self, queue, attribute='All', callback=None):
return self.get_object(
'GetQueueAttributes', {'AttributeName': attribute},
Attributes, queue.id, callback=callback,
)
[docs] def set_queue_attribute(self, queue, attribute, value, callback=None):
return self.get_status(
'SetQueueAttribute',
{'Attribute.Name': attribute, 'Attribute.Value': value},
queue.id, callback=callback,
)
[docs] def receive_message(self, queue,
number_messages=1, visibility_timeout=None,
attributes=None, wait_time_seconds=None,
callback=None):
params = {'MaxNumberOfMessages': number_messages}
if visibility_timeout:
params['VisibilityTimeout'] = visibility_timeout
if attributes:
self.build_list_params(params, attributes, 'AttributeName')
if wait_time_seconds is not None:
params['WaitTimeSeconds'] = wait_time_seconds
return self.get_list(
'ReceiveMessage', params, [('Message', queue.message_class)],
queue.id, callback=callback,
)
[docs] def delete_message(self, queue, message, callback=None):
return self.delete_message_from_handle(
queue, message.receipt_handle, callback,
)
[docs] def delete_message_batch(self, queue, messages, callback=None):
params = {}
for i, m in enumerate(messages):
prefix = 'DeleteMessageBatchRequestEntry.{0}'.format(i + 1)
params.update({
'{0}.Id'.format(prefix): m.id,
'{0}.ReceiptHandle'.format(prefix): m.receipt_handle,
})
return self.get_object(
'DeleteMessageBatch', params, BatchResults, queue.id,
verb='POST', callback=callback,
)
[docs] def delete_message_from_handle(self, queue, receipt_handle,
callback=None):
return self.get_status(
'DeleteMessage', {'ReceiptHandle': receipt_handle},
queue.id, callback=callback,
)
[docs] def send_message(self, queue, message_content,
delay_seconds=None, callback=None):
params = {'MessageBody': message_content}
if delay_seconds:
params['DelaySeconds'] = int(delay_seconds)
return self.get_object(
'SendMessage', params, AsyncMessage, queue.id,
verb='POST', callback=callback,
)
[docs] def send_message_batch(self, queue, messages, callback=None):
params = {}
for i, msg in enumerate(messages):
prefix = 'SendMessageBatchRequestEntry.{0}'.format(i + 1)
params.update({
'{0}.Id'.format(prefix): msg[0],
'{0}.MessageBody'.format(prefix): msg[1],
'{0}.DelaySeconds'.format(prefix): msg[2],
})
return self.get_object(
'SendMessageBatch', params, BatchResults, queue.id,
verb='POST', callback=callback,
)
[docs] def change_message_visibility(self, queue, receipt_handle,
visibility_timeout, callback=None):
return self.get_status(
'ChangeMessageVisibility',
{'ReceiptHandle': receipt_handle,
'VisibilityTimeout': visibility_timeout},
queue.id, callback=callback,
)
[docs] def change_message_visibility_batch(self, queue, messages, callback=None):
params = {}
for i, t in enumerate(messages):
pre = 'ChangeMessageVisibilityBatchRequestEntry.{0}'.format(i + 1)
params.update({
'{0}.Id'.format(pre): t[0].id,
'{0}.ReceiptHandle'.format(pre): t[0].receipt_handle,
'{0}.VisibilityTimeout'.format(pre): t[1],
})
return self.get_object(
'ChangeMessageVisibilityBatch', params, BatchResults, queue.id,
verb='POST', callback=callback,
)
[docs] def get_all_queues(self, prefix='', callback=None):
params = {}
if prefix:
params['QueueNamePrefix'] = prefix
return self.get_list(
'ListQueues', params, [('QueueUrl', AsyncQueue)],
callback=callback,
)
[docs] def get_queue(self, queue_name, callback=None):
# TODO Does not support owner_acct_id argument
return self.get_all_queues(
queue_name,
transform(self._on_queue_ready, callback, queue_name),
)
lookup = get_queue
def _on_queue_ready(self, name, queues):
return next(
(q for q in queues if q.url.endswith(name)), None,
)
[docs] def get_dead_letter_source_queues(self, queue, callback=None):
return self.get_list(
'ListDeadLetterSourceQueues', {'QueueUrl': queue.url},
[('QueueUrl', AsyncQueue)],
callback=callback,
)
[docs] def add_permission(self, queue, label, aws_account_id, action_name,
callback=None):
return self.get_status(
'AddPermission',
{'Label': label,
'AWSAccountId': aws_account_id,
'ActionName': action_name},
queue.id, callback=callback,
)
[docs] def remove_permission(self, queue, label, callback=None):
return self.get_status(
'RemovePermission', {'Label': label}, queue.id, callback=callback,
)