This document describes the current stable version of Celery (3.1). For development docs, go here.

Source code for

# -*- coding: utf-8 -*-

    Secure serializer.

from __future__ import absolute_import

import base64

from kombu.serialization import registry, dumps, loads
from kombu.utils.encoding import bytes_to_str, str_to_bytes, ensure_bytes

from .certificate import Certificate, FSCertStore
from .key import PrivateKey
from .utils import reraise_errors

__all__ = ['SecureSerializer', 'register_auth']

def b64encode(s):
    return bytes_to_str(base64.b64encode(str_to_bytes(s)))

def b64decode(s):
    return base64.b64decode(str_to_bytes(s))

[docs]class SecureSerializer(object): def __init__(self, key=None, cert=None, cert_store=None, digest='sha1', serializer='json'): self._key = key self._cert = cert self._cert_store = cert_store self._digest = digest self._serializer = serializer
[docs] def serialize(self, data): """serialize data structure into string""" assert self._key is not None assert self._cert is not None with reraise_errors('Unable to serialize: {0!r}', (Exception, )): content_type, content_encoding, body = dumps( bytes_to_str(data), serializer=self._serializer) # What we sign is the serialized body, not the body itself. # this way the receiver doesn't have to decode the contents # to verify the signature (and thus avoiding potential flaws # in the decoding step). body = ensure_bytes(body) return self._pack(body, content_type, content_encoding, signature=self._key.sign(body, self._digest), signer=self._cert.get_id())
[docs] def deserialize(self, data): """deserialize data structure from string""" assert self._cert_store is not None with reraise_errors('Unable to deserialize: {0!r}', (Exception, )): payload = self._unpack(data) signature, signer, body = (payload['signature'], payload['signer'], payload['body']) self._cert_store[signer].verify(body, signature, self._digest) return loads(bytes_to_str(body), payload['content_type'], payload['content_encoding'], force=True)
def _pack(self, body, content_type, content_encoding, signer, signature, sep=str_to_bytes('\x00\x01')): fields = sep.join( ensure_bytes(s) for s in [signer, signature, content_type, content_encoding, body] ) return b64encode(fields) def _unpack(self, payload, sep=str_to_bytes('\x00\x01')): raw_payload = b64decode(ensure_bytes(payload)) first_sep = raw_payload.find(sep) signer = raw_payload[:first_sep] signer_cert = self._cert_store[signer] sig_len = signer_cert._cert.get_pubkey().bits() >> 3 signature = raw_payload[ first_sep + len(sep):first_sep + len(sep) + sig_len ] end_of_sig = first_sep + len(sep) + sig_len + len(sep) v = raw_payload[end_of_sig:].split(sep) return { 'signer': signer, 'signature': signature, 'content_type': bytes_to_str(v[0]), 'content_encoding': bytes_to_str(v[1]), 'body': bytes_to_str(v[2]), }
[docs]def register_auth(key=None, cert=None, store=None, digest='sha1', serializer='json'): """register security serializer""" s = SecureSerializer(key and PrivateKey(key), cert and Certificate(cert), store and FSCertStore(store), digest=digest, serializer=serializer) registry.register('auth', s.serialize, s.deserialize, content_type='application/data', content_encoding='utf-8')