This document describes the current stable version of Celery (4.4). For development docs, go here.
Source code for celery.backends.riak
# -*- coding: utf-8 -*-
"""Riak result store backend."""
from __future__ import absolute_import, unicode_literals
import sys
import warnings
from kombu.utils.url import _parse_url
from celery.exceptions import CeleryWarning, ImproperlyConfigured
from .base import KeyValueStoreBackend
try:
import riak
from riak import RiakClient
from riak.resolver import last_written_resolver
except ImportError: # pragma: no cover
riak = RiakClient = last_written_resolver = None # noqa
__all__ = ('RiakBackend',)
E_BUCKET_NAME = """\
Riak bucket names must be composed of ASCII characters only, not: {0!r}\
"""
W_UNSUPPORTED_PYTHON_VERSION = """\
Python {}.{} is unsupported by the client library \
https://pypi.org/project/riak\
""".format(sys.version_info.major, sys.version_info.minor)
if sys.version_info[0] == 3:
if sys.version_info.minor >= 7:
warnings.warn(CeleryWarning(W_UNSUPPORTED_PYTHON_VERSION))
def to_bytes(string):
return string.encode() if isinstance(string, str) else string
def str_decode(string, encoding):
return to_bytes(string).decode(encoding)
else:
def str_decode(string, encoding):
return string.decode('ascii')
def is_ascii(string):
try:
str_decode(string, 'ascii')
except UnicodeDecodeError:
return False
return True
[docs]class RiakBackend(KeyValueStoreBackend):
"""Riak result backend.
Raises:
celery.exceptions.ImproperlyConfigured:
if module :pypi:`riak` is not available.
"""
# TODO: allow using other protocols than protobuf ?
#: default protocol used to connect to Riak, might be `http` or `pbc`
protocol = 'pbc'
#: default Riak bucket name (`default`)
bucket_name = 'celery'
#: default Riak server hostname (`localhost`)
host = 'localhost'
#: default Riak server port (8087)
port = 8087
_bucket = None
def __init__(self, host=None, port=None, bucket_name=None, protocol=None,
url=None, *args, **kwargs):
super(RiakBackend, self).__init__(*args, **kwargs)
self.url = url
if not riak:
raise ImproperlyConfigured(
'You need to install the riak library to use the '
'Riak backend.')
uhost = uport = upass = ubucket = None
if url:
_, uhost, uport, _, upass, ubucket, _ = _parse_url(url)
if ubucket:
ubucket = ubucket.strip('/')
config = self.app.conf.get('riak_backend_settings', None)
if config is not None:
if not isinstance(config, dict):
raise ImproperlyConfigured(
'Riak backend settings should be grouped in a dict')
else:
config = {}
self.host = uhost or config.get('host', self.host)
self.port = int(uport or config.get('port', self.port))
self.bucket_name = ubucket or config.get('bucket', self.bucket_name)
self.protocol = protocol or config.get('protocol', self.protocol)
# riak bucket must be ascii letters or numbers only
if not is_ascii(self.bucket_name):
raise ValueError(E_BUCKET_NAME.format(self.bucket_name))
self._client = None
def _get_client(self):
"""Get client connection."""
if self._client is None or not self._client.is_alive():
self._client = RiakClient(protocol=self.protocol,
host=self.host,
pb_port=self.port)
self._client.resolver = last_written_resolver
return self._client
def _get_bucket(self):
"""Connect to our bucket."""
if (
self._client is None or not self._client.is_alive() or
not self._bucket
):
self._bucket = self.client.bucket(self.bucket_name)
return self._bucket
@property
def client(self):
return self._get_client()
@property
def bucket(self):
return self._get_bucket()