This document describes the current stable version of Kombu (4.0). For development docs, go here.

Source code for kombu.serialization

"""Serialization utilities."""
from __future__ import absolute_import, unicode_literals

import codecs
import os
import sys

import pickle as pypickle
try:
    import cPickle as cpickle
except ImportError:  # pragma: no cover
    cpickle = None  # noqa

from collections import namedtuple
from contextlib import contextmanager
from io import BytesIO

from .exceptions import (
    ContentDisallowed, DecodeError, EncodeError, SerializerNotInstalled
)
from .five import reraise, text_t
from .utils.compat import entrypoints
from .utils.encoding import bytes_to_str, str_to_bytes, bytes_t

__all__ = ['pickle', 'loads', 'dumps', 'register', 'unregister']
SKIP_DECODE = frozenset(['binary', 'ascii-8bit'])
TRUSTED_CONTENT = frozenset(['application/data', 'application/text'])

if sys.platform.startswith('java'):  # pragma: no cover

    def _decode(t, coding):
        return codecs.getdecoder(coding)(t)[0]
else:
    _decode = codecs.decode

pickle = cpickle or pypickle
pickle_load = pickle.load

#: Kombu requires Python 2.5 or later so we use protocol 2 by default.
#: There's a new protocol (3) but this is only supported by Python 3.
pickle_protocol = int(os.environ.get('PICKLE_PROTOCOL', 2))

codec = namedtuple('codec', ('content_type', 'content_encoding', 'encoder'))


@contextmanager
def _reraise_errors(wrapper,
                    include=(Exception,), exclude=(SerializerNotInstalled,)):
    try:
        yield
    except exclude:
        raise
    except include as exc:
        reraise(wrapper, wrapper(exc), sys.exc_info()[2])


def pickle_loads(s, load=pickle_load):
    # used to support buffer objects
    return load(BytesIO(s))


def parenthesize_alias(first, second):
    return '%s (%s)' % (first, second) if first else second


class SerializerRegistry(object):
    """The registry keeps track of serialization methods."""

    def __init__(self):
        self._encoders = {}
        self._decoders = {}
        self._default_encode = None
        self._default_content_type = None
        self._default_content_encoding = None
        self._disabled_content_types = set()
        self.type_to_name = {}
        self.name_to_type = {}

    def register(self, name, encoder, decoder, content_type,
                 content_encoding='utf-8'):
        """Register a new encoder/decoder.

        Arguments:
            name (str): A convenience name for the serialization method.

            encoder (callable): A method that will be passed a python data
                structure and should return a string representing the
                serialized data.  If :const:`None`, then only a decoder
                will be registered. Encoding will not be possible.

            decoder (Callable): A method that will be passed a string
                representing serialized data and should return a python
                data structure.  If :const:`None`, then only an encoder
                will be registered.  Decoding will not be possible.

            content_type (str): The mime-type describing the serialized
                structure.

            content_encoding (str): The content encoding (character set) that
                the `decoder` method will be returning. Will usually be
                `utf-8`, `us-ascii`, or `binary`.
        """
        if encoder:
            self._encoders[name] = codec(
                content_type, content_encoding, encoder,
            )
        if decoder:
            self._decoders[content_type] = decoder
        self.type_to_name[content_type] = name
        self.name_to_type[name] = content_type

    def enable(self, name):
        if '/' not in name:
            name = self.name_to_type[name]
        self._disabled_content_types.discard(name)

    def disable(self, name):
        if '/' not in name:
            name = self.name_to_type[name]
        self._disabled_content_types.add(name)

    def unregister(self, name):
        """Unregister registered encoder/decoder.

        Arguments:
            name (str): Registered serialization method name.

        Raises:
            SerializerNotInstalled: If a serializer by that name
                cannot be found.
        """
        try:
            content_type = self.name_to_type[name]
            self._decoders.pop(content_type, None)
            self._encoders.pop(name, None)
            self.type_to_name.pop(content_type, None)
            self.name_to_type.pop(name, None)
        except KeyError:
            raise SerializerNotInstalled(
                'No encoder/decoder installed for {0}'.format(name))

    def _set_default_serializer(self, name):
        """Set the default serialization method used by this library.

        Arguments:
            name (str): The name of the registered serialization method.
                For example, `json` (default), `pickle`, `yaml`, `msgpack`,
                or any custom methods registered using :meth:`register`.

        Raises:
            SerializerNotInstalled: If the serialization method
                requested is not available.
        """
        try:
            (self._default_content_type, self._default_content_encoding,
             self._default_encode) = self._encoders[name]
        except KeyError:
            raise SerializerNotInstalled(
                'No encoder installed for {0}'.format(name))

    def dumps(self, data, serializer=None):
        """Encode data.

        Serialize a data structure into a string suitable for sending
        as an AMQP message body.

        Arguments:
            data (List, Dict, str): The message data to send.

            serializer (str): An optional string representing
                the serialization method you want the data marshalled
                into. (For example, `json`, `raw`, or `pickle`).

                If :const:`None` (default), then json will be used, unless
                `data` is a :class:`str` or :class:`unicode` object. In this
                latter case, no serialization occurs as it would be
                unnecessary.

                Note that if `serializer` is specified, then that
                serialization method will be used even if a :class:`str`
                or :class:`unicode` object is passed in.

        Returns:
            Tuple[str, str, str]: A three-item tuple containing the
            content type (e.g., `application/json`), content encoding, (e.g.,
            `utf-8`) and a string containing the serialized data.

        Raises:
            SerializerNotInstalled: If the serialization method
                requested is not available.
        """
        if serializer == 'raw':
            return raw_encode(data)
        if serializer and not self._encoders.get(serializer):
            raise SerializerNotInstalled(
                'No encoder installed for {0}'.format(serializer))

        # If a raw string was sent, assume binary encoding
        # (it's likely either ASCII or a raw binary file, and a character
        # set of 'binary' will encompass both, even if not ideal.
        if not serializer and isinstance(data, bytes_t):
            # In Python 3+, this would be "bytes"; allow binary data to be
            # sent as a message without getting encoder errors
            return 'application/data', 'binary', data

        # For Unicode objects, force it into a string
        if not serializer and isinstance(data, text_t):
            with _reraise_errors(EncodeError, exclude=()):
                payload = data.encode('utf-8')
            return 'text/plain', 'utf-8', payload

        if serializer:
            content_type, content_encoding, encoder = \
                self._encoders[serializer]
        else:
            encoder = self._default_encode
            content_type = self._default_content_type
            content_encoding = self._default_content_encoding

        with _reraise_errors(EncodeError):
            payload = encoder(data)
        return content_type, content_encoding, payload

    def loads(self, data, content_type, content_encoding,
              accept=None, force=False, _trusted_content=TRUSTED_CONTENT):
        """Decode serialized data.

        Deserialize a data stream as serialized using `dumps`
        based on `content_type`.

        Arguments:
            data (bytes, buffer, str): The message data to deserialize.

            content_type (str): The content-type of the data.
                (e.g., `application/json`).

            content_encoding (str): The content-encoding of the data.
                (e.g., `utf-8`, `binary`, or `us-ascii`).

            accept (Set): List of content-types to accept.

        Raises:
            ContentDisallowed: If the content-type is not accepted.

        Returns:
            Any: The unserialized data.
        """
        content_type = (bytes_to_str(content_type) if content_type
                        else 'application/data')
        if accept is not None:
            if content_type not in _trusted_content \
                    and content_type not in accept:
                raise self._for_untrusted_content(content_type, 'untrusted')
        else:
            if content_type in self._disabled_content_types and not force:
                raise self._for_untrusted_content(content_type, 'disabled')
        content_encoding = (content_encoding or 'utf-8').lower()

        if data:
            decode = self._decoders.get(content_type)
            if decode:
                with _reraise_errors(DecodeError):
                    return decode(data)
            if content_encoding not in SKIP_DECODE and \
                    not isinstance(data, text_t):
                with _reraise_errors(DecodeError):
                    return _decode(data, content_encoding)
        return data

    def _for_untrusted_content(self, ctype, why):
        return ContentDisallowed(
            'Refusing to deserialize {0} content of type {1}'.format(
                why,
                parenthesize_alias(self.type_to_name.get(ctype, ctype), ctype),
            ),
        )


#: Global registry of serializers/deserializers.
registry = SerializerRegistry()
dumps = registry.dumps
loads = registry.loads
register = registry.register
unregister = registry.unregister


[docs]def raw_encode(data): """Special case serializer.""" content_type = 'application/data' payload = data if isinstance(payload, text_t): content_encoding = 'utf-8' with _reraise_errors(EncodeError, exclude=()): payload = payload.encode(content_encoding) else: content_encoding = 'binary' return content_type, content_encoding, payload
def register_json(): """Register a encoder/decoder for JSON serialization.""" from kombu.utils import json as _json registry.register('json', _json.dumps, _json.loads, content_type='application/json', content_encoding='utf-8') def register_yaml(): """Register a encoder/decoder for YAML serialization. It is slower than JSON, but allows for more data types to be serialized. Useful if you need to send data such as dates """ try: import yaml registry.register('yaml', yaml.safe_dump, yaml.safe_load, content_type='application/x-yaml', content_encoding='utf-8') except ImportError: def not_available(*args, **kwargs): """Raise SerializerNotInstalled. Used in case a client receives a yaml message, but yaml isn't installed. """ raise SerializerNotInstalled( 'No decoder installed for YAML. Install the PyYAML library') registry.register('yaml', None, not_available, 'application/x-yaml') if sys.version_info[0] == 3: # pragma: no cover def unpickle(s): return pickle_loads(str_to_bytes(s)) else: unpickle = pickle_loads # noqa def register_pickle(): """Register pickle serializer. The fastest serialization method, but restricts you to python clients. """ def pickle_dumps(obj, dumper=pickle.dumps): return dumper(obj, protocol=pickle_protocol) registry.register('pickle', pickle_dumps, unpickle, content_type='application/x-python-serialize', content_encoding='binary') def register_msgpack(): """Register msgpack serializer. See Also: http://msgpack.sourceforge.net/. """ pack = unpack = None try: import msgpack if msgpack.version >= (0, 4): from msgpack import packb, unpackb def pack(s): return packb(s, use_bin_type=True) def unpack(s): return unpackb(s, encoding='utf-8') else: def version_mismatch(*args, **kwargs): raise SerializerNotInstalled( 'msgpack requires msgpack-python >= 0.4.0') pack = unpack = version_mismatch except (ImportError, ValueError): def not_available(*args, **kwargs): raise SerializerNotInstalled( 'No decoder installed for msgpack. ' 'Please install the msgpack-python library') pack = unpack = not_available registry.register( 'msgpack', pack, unpack, content_type='application/x-msgpack', content_encoding='binary', ) # Register the base serialization methods. register_json() register_pickle() register_yaml() register_msgpack() # Default serializer is 'json' registry._set_default_serializer('json') _setupfuns = { 'json': register_json, 'pickle': register_pickle, 'yaml': register_yaml, 'msgpack': register_msgpack, 'application/json': register_json, 'application/x-yaml': register_yaml, 'application/x-python-serialize': register_pickle, 'application/x-msgpack': register_msgpack, }
[docs]def enable_insecure_serializers(choices=['pickle', 'yaml', 'msgpack']): """Enable serializers that are considered to be unsafe. Note: Will enable ``pickle``, ``yaml`` and ``msgpack`` by default, but you can also specify a list of serializers (by name or content type) to enable. """ for choice in choices: try: registry.enable(choice) except KeyError: pass
[docs]def disable_insecure_serializers(allowed=['json']): """Disable untrusted serializers. Will disable all serializers except ``json`` or you can specify a list of deserializers to allow. Note: Producers will still be able to serialize data in these formats, but consumers will not accept incoming data using the untrusted content types. """ for name in registry._decoders: registry.disable(name) if allowed is not None: for name in allowed: registry.enable(name)
# Insecure serializers are disabled by default since v3.0 disable_insecure_serializers() # Load entrypoints from installed extensions for ep, args in entrypoints('kombu.serializers'): # pragma: no cover register(ep.name, *args) def prepare_accept_content(l, name_to_type=registry.name_to_type): if l is not None: return {n if '/' in n else name_to_type[n] for n in l} return l