This document describes the current stable version of Celery (4.4). For development docs, go here.

Source code for

# -*- coding: utf-8 -*-
"""Utilities used by the message signing serializer."""
from __future__ import absolute_import, unicode_literals

import sys
from contextlib import contextmanager

import cryptography.exceptions
from cryptography.hazmat.primitives import hashes

from celery.exceptions import SecurityError
from celery.five import reraise

__all__ = ('get_digest_algorithm', 'reraise_errors',)

[docs]def get_digest_algorithm(digest='sha256'): """Convert string to hash object of cryptography library.""" assert digest is not None return getattr(hashes, digest.upper())()
[docs]@contextmanager def reraise_errors(msg='{0!r}', errors=None): """Context reraising crypto errors as :exc:`SecurityError`.""" errors = (cryptography.exceptions,) if errors is None else errors try: yield except errors as exc: reraise(SecurityError, SecurityError(msg.format(exc)), sys.exc_info()[2])