This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 3.0.
Source code for kombu.async.aws.connection
# -*- coding: utf-8 -*-
"""Amazon AWS Connection."""
from __future__ import absolute_import, unicode_literals
from io import BytesIO
from vine import promise, transform
from kombu.async.http import Headers, Request, get_client
from kombu.five import items, python_2_unicode_compatible
from .ext import (
boto, AWSAuthConnection, AWSQueryConnection, XmlHandler, ResultSet,
)
try:
from urllib.parse import urlunsplit
except ImportError:
from urlparse import urlunsplit # noqa
from xml.sax import parseString as sax_parse # noqa
try: # pragma: no cover
from email import message_from_file
from email.mime.message import MIMEMessage
except ImportError: # pragma: no cover
from mimetools import Message as MIMEMessage # noqa
def message_from_file(m): # noqa
return m
__all__ = [
'AsyncHTTPConnection', 'AsyncHTTPSConnection',
'AsyncHTTPResponse', 'AsyncConnection',
'AsyncAWSAuthConnection', 'AsyncAWSQueryConnection',
]
@python_2_unicode_compatible
[docs]class AsyncHTTPResponse(object):
"""Async HTTP Response."""
def __init__(self, response):
self.response = response
self._msg = None
self.version = 10
@property
def msg(self):
if self._msg is None:
self._msg = MIMEMessage(message_from_file(
BytesIO(b'\r\n'.join(
b'{0}: {1}'.format(*h) for h in self.getheaders())
)
))
return self._msg
@property
def status(self):
return self.response.code
@property
def reason(self):
if self.response.error:
return self.response.error.message
return ''
def __repr__(self):
return repr(self.response)
@python_2_unicode_compatible
[docs]class AsyncHTTPConnection(object):
"""Async HTTP Connection."""
Request = Request
Response = AsyncHTTPResponse
method = 'GET'
path = '/'
body = None
scheme = 'http'
default_ports = {'http': 80, 'https': 443}
def __init__(self, host, port=None,
strict=None, timeout=20.0, http_client=None, **kwargs):
self.host = host
self.port = port
self.headers = []
self.timeout = timeout
self.strict = strict
self.http_client = http_client or get_client()
[docs] def request(self, method, path, body=None, headers=None):
self.path = path
self.method = method
if body is not None:
try:
read = body.read
except AttributeError:
self.body = body
else:
self.body = read()
if headers is not None:
self.headers.extend(list(items(headers)))
[docs] def getrequest(self, scheme=None):
scheme = scheme if scheme else self.scheme
host = self.host
if self.port and self.port != self.default_ports[scheme]:
host = '{0}:{1}'.format(host, self.port)
url = urlunsplit((scheme, host, self.path, '', ''))
headers = Headers(self.headers)
return self.Request(url, method=self.method, headers=headers,
body=self.body, connect_timeout=self.timeout,
request_timeout=self.timeout, validate_cert=False)
[docs] def getresponse(self, callback=None):
request = self.getrequest()
request.then(transform(self.Response, callback))
return self.http_client.add_request(request)
def __repr__(self):
return '<AsyncHTTPConnection: {0!r}>'.format(self.getrequest())
[docs]class AsyncHTTPSConnection(AsyncHTTPConnection):
"""Async HTTPS Connection."""
scheme = 'https'
[docs]class AsyncConnection(object):
"""Async AWS Connection."""
def __init__(self, http_client=None, **kwargs):
if boto is None:
raise ImportError('boto is not installed')
self._httpclient = http_client or get_client()
[docs] def get_http_connection(self, host, port, is_secure):
return (AsyncHTTPSConnection if is_secure else AsyncHTTPConnection)(
host, port, http_client=self._httpclient,
)
def _mexe(self, request, sender=None, callback=None):
callback = callback or promise()
boto.log.debug(
'HTTP %s/%s headers=%s body=%s',
request.host, request.path,
request.headers, request.body,
)
conn = self.get_http_connection(
request.host, request.port, self.is_secure,
)
request.authorize(connection=self)
if callable(sender):
sender(conn, request.method, request.path, request.body,
request.headers, callback)
else:
conn.request(request.method, request.path,
request.body, request.headers)
conn.getresponse(callback=callback)
return callback
[docs]class AsyncAWSAuthConnection(AsyncConnection, AWSAuthConnection):
"""Async AWS Authn Connection."""
def __init__(self, host,
http_client=None, http_client_params={}, **kwargs):
AsyncConnection.__init__(self, http_client, **http_client_params)
AWSAuthConnection.__init__(self, host, **kwargs)
[docs] def make_request(self, method, path, headers=None, data='', host=None,
auth_path=None, sender=None, callback=None, **kwargs):
req = self.build_base_http_request(
method, path, auth_path, {}, headers, data, host,
)
return self._mexe(req, sender=sender, callback=callback)
[docs]class AsyncAWSQueryConnection(AsyncConnection, AWSQueryConnection):
"""Async AWS Query Connection."""
def __init__(self, host,
http_client=None, http_client_params={}, **kwargs):
AsyncConnection.__init__(self, http_client, **http_client_params)
AWSAuthConnection.__init__(self, host, **kwargs)
[docs] def make_request(self, action, params, path, verb, callback=None):
request = self.build_base_http_request(
verb, path, None, params, {}, '', self.server_name())
if action:
request.params['Action'] = action
request.params['Version'] = self.APIVersion
return self._mexe(request, callback=callback)
[docs] def get_list(self, action, params, markers,
path='/', parent=None, verb='GET', callback=None):
return self.make_request(
action, params, path, verb,
callback=transform(
self._on_list_ready, callback, parent or self, markers,
),
)
[docs] def get_object(self, action, params, cls,
path='/', parent=None, verb='GET', callback=None):
return self.make_request(
action, params, path, verb,
callback=transform(
self._on_obj_ready, callback, parent or self, cls,
),
)
[docs] def get_status(self, action, params,
path='/', parent=None, verb='GET', callback=None):
return self.make_request(
action, params, path, verb,
callback=transform(
self._on_status_ready, callback, parent or self,
),
)
def _on_list_ready(self, parent, markers, response):
body = response.read()
if response.status == 200 and body:
rs = ResultSet(markers)
h = XmlHandler(rs, parent)
sax_parse(body, h)
return rs
else:
raise self._for_status(response, body)
def _on_obj_ready(self, parent, cls, response):
body = response.read()
if response.status == 200 and body:
obj = cls(parent)
h = XmlHandler(obj, parent)
sax_parse(body, h)
return obj
else:
raise self._for_status(response, body)
def _on_status_ready(self, parent, response):
body = response.read()
if response.status == 200 and body:
rs = ResultSet()
h = XmlHandler(rs, parent)
sax_parse(body, h)
return rs.status
else:
raise self._for_status(response, body)
def _for_status(self, response, body):
context = 'Empty body' if not body else 'HTTP Error'
exc = self.ResponseError(response.status, response.reason, body)
boto.log.error('{0}: %r'.format(context), exc)
return exc