This document describes the current stable version of Celery (5.0). For development docs, go here.

Source code for celery.backends.elasticsearch

"""Elasticsearch result store backend."""
from datetime import datetime

from kombu.utils.encoding import bytes_to_str
from kombu.utils.url import _parse_url

from celery import states
from celery.exceptions import ImproperlyConfigured

from .base import KeyValueStoreBackend

try:
    import elasticsearch
except ImportError:  # pragma: no cover
    elasticsearch = None  # noqa

__all__ = ('ElasticsearchBackend',)

E_LIB_MISSING = """\
You need to install the elasticsearch library to use the Elasticsearch \
result backend.\
"""


[docs]class ElasticsearchBackend(KeyValueStoreBackend): """Elasticsearch Backend. Raises: celery.exceptions.ImproperlyConfigured: if module :pypi:`elasticsearch` is not available. """ index = 'celery' doc_type = 'backend' scheme = 'http' host = 'localhost' port = 9200 username = None password = None es_retry_on_timeout = False es_timeout = 10 es_max_retries = 3 def __init__(self, url=None, *args, **kwargs): super().__init__(*args, **kwargs) self.url = url _get = self.app.conf.get if elasticsearch is None: raise ImproperlyConfigured(E_LIB_MISSING) index = doc_type = scheme = host = port = username = password = None if url: scheme, host, port, username, password, path, _ = _parse_url(url) # noqa if scheme == 'elasticsearch': scheme = None if path: path = path.strip('/') index, _, doc_type = path.partition('/') self.index = index or self.index self.doc_type = doc_type or self.doc_type self.scheme = scheme or self.scheme self.host = host or self.host self.port = port or self.port self.username = username or self.username self.password = password or self.password self.es_retry_on_timeout = ( _get('elasticsearch_retry_on_timeout') or self.es_retry_on_timeout ) es_timeout = _get('elasticsearch_timeout') if es_timeout is not None: self.es_timeout = es_timeout es_max_retries = _get('elasticsearch_max_retries') if es_max_retries is not None: self.es_max_retries = es_max_retries self.es_save_meta_as_text = _get('elasticsearch_save_meta_as_text', True) self._server = None
[docs] def exception_safe_to_retry(self, exc): if isinstance(exc, (elasticsearch.exceptions.TransportError)): # 401: Unauthorized # 409: Conflict # 429: Too Many Requests # 500: Internal Server Error # 502: Bad Gateway # 503: Service Unavailable # 504: Gateway Timeout # N/A: Low level exception (i.e. socket exception) if exc.status_code in {401, 409, 429, 500, 502, 503, 504, 'N/A'}: return True return False
[docs] def get(self, key): try: res = self._get(key) try: if res['found']: return res['_source']['result'] except (TypeError, KeyError): pass except elasticsearch.exceptions.NotFoundError: pass
def _get(self, key): return self.server.get( index=self.index, doc_type=self.doc_type, id=key, ) def _set_with_state(self, key, value, state): body = { 'result': value, '@timestamp': '{}Z'.format( datetime.utcnow().isoformat()[:-3] ), } try: self._index( id=key, body=body, ) except elasticsearch.exceptions.ConflictError: # document already exists, update it self._update(key, body, state)
[docs] def set(self, key, value): return self._set_with_state(key, value, None)
def _index(self, id, body, **kwargs): body = {bytes_to_str(k): v for k, v in body.items()} return self.server.index( id=bytes_to_str(id), index=self.index, doc_type=self.doc_type, body=body, params={'op_type': 'create'}, **kwargs ) def _update(self, id, body, state, **kwargs): """Update state in a conflict free manner. If state is defined (not None), this will not update ES server if either: * existing state is success * existing state is a ready state and current state in not a ready state This way, a Retry state cannot override a Success or Failure, and chord_unlock will not retry indefinitely. """ body = {bytes_to_str(k): v for k, v in body.items()} try: res_get = self._get(key=id) if not res_get.get('found'): return self._index(id, body, **kwargs) # document disappeared between index and get calls. except elasticsearch.exceptions.NotFoundError: return self._index(id, body, **kwargs) try: meta_present_on_backend = self.decode_result(res_get['_source']['result']) except (TypeError, KeyError): pass else: if meta_present_on_backend['status'] == states.SUCCESS: # if stored state is already in success, do nothing return {'result': 'noop'} elif meta_present_on_backend['status'] in states.READY_STATES and state in states.UNREADY_STATES: # if stored state is in ready state and current not, do nothing return {'result': 'noop'} # get current sequence number and primary term # https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html seq_no = res_get.get('_seq_no', 1) prim_term = res_get.get('_primary_term', 1) # try to update document with current seq_no and primary_term res = self.server.update( id=bytes_to_str(id), index=self.index, doc_type=self.doc_type, body={'doc': body}, params={'if_primary_term': prim_term, 'if_seq_no': seq_no}, **kwargs ) # result is elastic search update query result # noop = query did not update any document # updated = at least one document got updated if res['result'] == 'noop': raise elasticsearch.exceptions.ConflictError(409, 'conflicting update occurred concurrently', {}) return res
[docs] def encode(self, data): if self.es_save_meta_as_text: return KeyValueStoreBackend.encode(self, data) else: if not isinstance(data, dict): return KeyValueStoreBackend.encode(self, data) if data.get("result"): data["result"] = self._encode(data["result"])[2] if data.get("traceback"): data["traceback"] = self._encode(data["traceback"])[2] return data
[docs] def decode(self, payload): if self.es_save_meta_as_text: return KeyValueStoreBackend.decode(self, payload) else: if not isinstance(payload, dict): return KeyValueStoreBackend.decode(self, payload) if payload.get("result"): payload["result"] = KeyValueStoreBackend.decode(self, payload["result"]) if payload.get("traceback"): payload["traceback"] = KeyValueStoreBackend.decode(self, payload["traceback"]) return payload
[docs] def mget(self, keys): return [self.get(key) for key in keys]
[docs] def delete(self, key): self.server.delete(index=self.index, doc_type=self.doc_type, id=key)
def _get_server(self): """Connect to the Elasticsearch server.""" http_auth = None if self.username and self.password: http_auth = (self.username, self.password) return elasticsearch.Elasticsearch( f'{self.host}:{self.port}', retry_on_timeout=self.es_retry_on_timeout, max_retries=self.es_max_retries, timeout=self.es_timeout, scheme=self.scheme, http_auth=http_auth, ) @property def server(self): if self._server is None: self._server = self._get_server() return self._server