This document describes the current stable version of Celery (4.1). For development docs, go here.
Source code for celery.utils.collections
# -*- coding: utf-8 -*-
"""Custom maps, sets, sequences, and other data structures."""
from __future__ import absolute_import, unicode_literals
import sys
import time
from collections import (
Callable, Mapping, MutableMapping, MutableSet, Sequence,
OrderedDict as _OrderedDict, deque,
)
from heapq import heapify, heappush, heappop
from itertools import chain, count
from celery.five import Empty, items, keys, python_2_unicode_compatible, values
from .functional import first, uniq
from .text import match_case
try:
# pypy: dicts are ordered in recent versions
from __pypy__ import reversed_dict as _dict_is_ordered
except ImportError:
_dict_is_ordered = None
try:
from django.utils.functional import LazyObject, LazySettings
except ImportError:
class LazyObject(object): # noqa
pass
LazySettings = LazyObject # noqa
__all__ = [
'AttributeDictMixin', 'AttributeDict', 'BufferMap', 'ChainMap',
'ConfigurationView', 'DictAttribute', 'Evictable',
'LimitedSet', 'Messagebuffer', 'OrderedDict',
'force_mapping', 'lpmerge',
]
PY3 = sys.version_info[0] >= 3
REPR_LIMITED_SET = """\
<{name}({size}): maxlen={0.maxlen}, expires={0.expires}, minlen={0.minlen}>\
"""
[docs]def force_mapping(m):
# type: (Any) -> Mapping
"""Wrap object into supporting the mapping interface if necessary."""
if isinstance(m, (LazyObject, LazySettings)):
m = m._wrapped
return DictAttribute(m) if not isinstance(m, Mapping) else m
[docs]def lpmerge(L, R):
# type: (Mapping, Mapping) -> Mapping
"""In place left precedent dictionary merge.
Keeps values from `L`, if the value in `R` is :const:`None`.
"""
setitem = L.__setitem__
[setitem(k, v) for k, v in items(R) if v is not None]
return L
[docs]class OrderedDict(_OrderedDict):
"""Dict where insertion order matters."""
if PY3: # pragma: no cover
def _LRUkey(self):
# type: () -> Any
# return value of od.keys does not support __next__,
# but this version will also not create a copy of the list.
return next(iter(keys(self)))
else:
if _dict_is_ordered: # pragma: no cover
def _LRUkey(self):
# type: () -> Any
# iterkeys is iterable.
return next(self.iterkeys())
else:
def _LRUkey(self):
# type: () -> Any
return self._OrderedDict__root[1][2]
if not hasattr(_OrderedDict, 'move_to_end'):
if _dict_is_ordered: # pragma: no cover
def move_to_end(self, key, last=True):
# type: (Any, bool) -> None
if not last:
# we don't use this argument, and the only way to
# implement this on PyPy seems to be O(n): creating a
# copy with the order changed, so we just raise.
raise NotImplementedError('no last=True on PyPy')
self[key] = self.pop(key)
else:
[docs] def move_to_end(self, key, last=True):
# type: (Any, bool) -> None
link = self._OrderedDict__map[key]
link_prev = link[0]
link_next = link[1]
link_prev[1] = link_next
link_next[0] = link_prev
root = self._OrderedDict__root
if last:
last = root[0]
link[0] = last
link[1] = root
last[1] = root[0] = link
else:
first_node = root[1]
link[0] = root
link[1] = first_node
root[1] = first_node[0] = link
[docs]class AttributeDictMixin(object):
"""Mixin for Mapping interface that adds attribute access.
I.e., `d.key -> d[key]`).
"""
def __getattr__(self, k):
# type: (str) -> Any
"""`d.key -> d[key]`."""
try:
return self[k]
except KeyError:
raise AttributeError(
'{0!r} object has no attribute {1!r}'.format(
type(self).__name__, k))
def __setattr__(self, key, value):
# type: (str, Any) -> None
"""`d[key] = value -> d.key = value`."""
self[key] = value
[docs]class DictAttribute(object):
"""Dict interface to attributes.
`obj[k] -> obj.k`
`obj[k] = val -> obj.k = val`
"""
obj = None
def __init__(self, obj):
# type: (Any) -> None
object.__setattr__(self, 'obj', obj)
def __getattr__(self, key):
# type: (Any) -> Any
return getattr(self.obj, key)
def __setattr__(self, key, value):
# type: (Any, Any) -> None
return setattr(self.obj, key, value)
[docs] def get(self, key, default=None):
# type: (Any, Any) -> Any
try:
return self[key]
except KeyError:
return default
[docs] def setdefault(self, key, default=None):
# type: (Any, Any) -> None
if key not in self:
self[key] = default
def __getitem__(self, key):
# type: (Any) -> Any
try:
return getattr(self.obj, key)
except AttributeError:
raise KeyError(key)
def __setitem__(self, key, value):
# type: (Any, Any) -> Any
setattr(self.obj, key, value)
def __contains__(self, key):
# type: (Any) -> bool
return hasattr(self.obj, key)
def _iterate_keys(self):
# type: () -> Iterable
return iter(dir(self.obj))
iterkeys = _iterate_keys
def __iter__(self):
# type: () -> Iterable
return self._iterate_keys()
def _iterate_items(self):
# type: () -> Iterable
for key in self._iterate_keys():
yield key, getattr(self.obj, key)
iteritems = _iterate_items
def _iterate_values(self):
# type: () -> Iterable
for key in self._iterate_keys():
yield getattr(self.obj, key)
itervalues = _iterate_values
if sys.version_info[0] == 3: # pragma: no cover
items = _iterate_items
keys = _iterate_keys
values = _iterate_values
else:
MutableMapping.register(DictAttribute) # noqa: E305
[docs]class ChainMap(MutableMapping):
"""Key lookup on a sequence of maps."""
key_t = None
changes = None
defaults = None
maps = None
def __init__(self, *maps, **kwargs):
# type: (*Mapping, **Any) -> None
maps = list(maps or [{}])
self.__dict__.update(
key_t=kwargs.get('key_t'),
maps=maps,
changes=maps[0],
defaults=maps[1:],
)
[docs] def add_defaults(self, d):
# type: (Mapping) -> None
d = force_mapping(d)
self.defaults.insert(0, d)
self.maps.insert(1, d)
[docs] def pop(self, key, *default):
# type: (Any, *Any) -> Any
try:
return self.maps[0].pop(key, *default)
except KeyError:
raise KeyError(
'Key not found in the first mapping: {!r}'.format(key))
def __missing__(self, key):
# type: (Any) -> Any
raise KeyError(key)
def _key(self, key):
# type: (Any) -> Any
return self.key_t(key) if self.key_t is not None else key
def __getitem__(self, key):
# type: (Any) -> Any
_key = self._key(key)
for mapping in self.maps:
try:
return mapping[_key]
except KeyError:
pass
return self.__missing__(key)
def __setitem__(self, key, value):
# type: (Any, Any) -> None
self.changes[self._key(key)] = value
def __delitem__(self, key):
# type: (Any) -> None
try:
del self.changes[self._key(key)]
except KeyError:
raise KeyError('Key not found in first mapping: {0!r}'.format(key))
[docs] def get(self, key, default=None):
# type: (Any, Any) -> Any
try:
return self[self._key(key)]
except KeyError:
return default
def __len__(self):
# type: () -> int
return len(set().union(*self.maps))
def __iter__(self):
return self._iterate_keys()
def __contains__(self, key):
# type: (Any) -> bool
key = self._key(key)
return any(key in m for m in self.maps)
def __bool__(self):
# type: () -> bool
return any(self.maps)
__nonzero__ = __bool__ # Py2
[docs] def setdefault(self, key, default=None):
# type: (Any, Any) -> None
key = self._key(key)
if key not in self:
self[key] = default
[docs] def update(self, *args, **kwargs):
# type: (*Any, **Any) -> Any
return self.changes.update(*args, **kwargs)
def __repr__(self):
# type: () -> str
return '{0.__class__.__name__}({1})'.format(
self, ', '.join(map(repr, self.maps)))
@classmethod
[docs] def fromkeys(cls, iterable, *args):
# type: (type, Iterable, *Any) -> 'ChainMap'
"""Create a ChainMap with a single dict created from the iterable."""
return cls(dict.fromkeys(iterable, *args))
[docs] def copy(self):
# type: () -> 'ChainMap'
return self.__class__(self.maps[0].copy(), *self.maps[1:])
__copy__ = copy # Py2
def _iter(self, op):
# type: (Callable) -> Iterable
# defaults must be first in the stream, so values in
# changes take precedence.
# pylint: disable=bad-reversed-sequence
# Someone should teach pylint about properties.
return chain(*[op(d) for d in reversed(self.maps)])
def _iterate_keys(self):
# type: () -> Iterable
return uniq(self._iter(lambda d: d.keys()))
iterkeys = _iterate_keys
def _iterate_items(self):
# type: () -> Iterable
return ((key, self[key]) for key in self)
iteritems = _iterate_items
def _iterate_values(self):
# type: () -> Iterable
return (self[key] for key in self)
itervalues = _iterate_values
if sys.version_info[0] == 3: # pragma: no cover
keys = _iterate_keys
items = _iterate_items
values = _iterate_values
else: # noqa
@python_2_unicode_compatible
[docs]class ConfigurationView(ChainMap, AttributeDictMixin):
"""A view over an applications configuration dictionaries.
Custom (but older) version of :class:`collections.ChainMap`.
If the key does not exist in ``changes``, the ``defaults``
dictionaries are consulted.
Arguments:
changes (Mapping): Map of configuration changes.
defaults (List[Mapping]): List of dictionaries containing
the default configuration.
"""
def __init__(self, changes, defaults=None, keys=None, prefix=None):
# type: (Mapping, Mapping, List[str], str) -> None
defaults = [] if defaults is None else defaults
super(ConfigurationView, self).__init__(changes, *defaults)
self.__dict__.update(
prefix=prefix.rstrip('_') + '_' if prefix else prefix,
_keys=keys,
)
def _to_keys(self, key):
# type: (str) -> Sequence[str]
prefix = self.prefix
if prefix:
pkey = prefix + key if not key.startswith(prefix) else key
return match_case(pkey, prefix), key
return key,
def __getitem__(self, key):
# type: (str) -> Any
keys = self._to_keys(key)
getitem = super(ConfigurationView, self).__getitem__
for k in keys + (
tuple(f(key) for f in self._keys) if self._keys else ()):
try:
return getitem(k)
except KeyError:
pass
try:
# support subclasses implementing __missing__
return self.__missing__(key)
except KeyError:
if len(keys) > 1:
raise KeyError(
'Key not found: {0!r} (with prefix: {0!r})'.format(*keys))
raise
def __setitem__(self, key, value):
# type: (str, Any) -> Any
self.changes[self._key(key)] = value
[docs] def first(self, *keys):
# type: (*str) -> Any
return first(None, (self.get(key) for key in keys))
[docs] def get(self, key, default=None):
# type: (str, Any) -> Any
try:
return self[key]
except KeyError:
return default
[docs] def clear(self):
# type: () -> None
"""Remove all changes, but keep defaults."""
self.changes.clear()
def __contains__(self, key):
# type: (str) -> bool
keys = self._to_keys(key)
return any(any(k in m for k in keys) for m in self.maps)
[docs] def swap_with(self, other):
# type: (ConfigurationView) -> None
changes = other.__dict__['changes']
defaults = other.__dict__['defaults']
self.__dict__.update(
changes=changes,
defaults=defaults,
key_t=other.__dict__['key_t'],
prefix=other.__dict__['prefix'],
maps=[changes] + defaults
)
@python_2_unicode_compatible
[docs]class LimitedSet(object):
"""Kind-of Set (or priority queue) with limitations.
Good for when you need to test for membership (`a in set`),
but the set should not grow unbounded.
``maxlen`` is enforced at all times, so if the limit is reached
we'll also remove non-expired items.
You can also configure ``minlen``: this is the minimal residual size
of the set.
All arguments are optional, and no limits are enabled by default.
Arguments:
maxlen (int): Optional max number of items.
Adding more items than ``maxlen`` will result in immediate
removal of items sorted by oldest insertion time.
expires (float): TTL for all items.
Expired items are purged as keys are inserted.
minlen (int): Minimal residual size of this set.
.. versionadded:: 4.0
Value must be less than ``maxlen`` if both are configured.
Older expired items will be deleted, only after the set
exceeds ``minlen`` number of items.
data (Sequence): Initial data to initialize set with.
Can be an iterable of ``(key, value)`` pairs,
a dict (``{key: insertion_time}``), or another instance
of :class:`LimitedSet`.
Example:
>>> s = LimitedSet(maxlen=50000, expires=3600, minlen=4000)
>>> for i in range(60000):
... s.add(i)
... s.add(str(i))
...
>>> 57000 in s # last 50k inserted values are kept
True
>>> '10' in s # '10' did expire and was purged from set.
False
>>> len(s) # maxlen is reached
50000
>>> s.purge(now=time.time() + 7200) # clock + 2 hours
>>> len(s) # now only minlen items are cached
4000
>>>> 57000 in s # even this item is gone now
False
"""
max_heap_percent_overload = 15
def __init__(self, maxlen=0, expires=0, data=None, minlen=0):
# type: (int, float, Mapping, int) -> None
self.maxlen = 0 if maxlen is None else maxlen
self.minlen = 0 if minlen is None else minlen
self.expires = 0 if expires is None else expires
self._data = {}
self._heap = []
if data:
# import items from data
self.update(data)
if not self.maxlen >= self.minlen >= 0:
raise ValueError(
'minlen must be a positive number, less or equal to maxlen.')
if self.expires < 0:
raise ValueError('expires cannot be negative!')
def _refresh_heap(self):
# type: () -> None
"""Time consuming recreating of heap. Don't run this too often."""
self._heap[:] = [entry for entry in values(self._data)]
heapify(self._heap)
def _maybe_refresh_heap(self):
# type: () -> None
if self._heap_overload >= self.max_heap_percent_overload:
self._refresh_heap()
[docs] def clear(self):
# type: () -> None
"""Clear all data, start from scratch again."""
self._data.clear()
self._heap[:] = []
[docs] def add(self, item, now=None):
# type: (Any, float) -> None
"""Add a new item, or reset the expiry time of an existing item."""
now = now or time.time()
if item in self._data:
self.discard(item)
entry = (now, item)
self._data[item] = entry
heappush(self._heap, entry)
if self.maxlen and len(self._data) >= self.maxlen:
self.purge()
[docs] def update(self, other):
# type: (Iterable) -> None
"""Update this set from other LimitedSet, dict or iterable."""
if not other:
return
if isinstance(other, LimitedSet):
self._data.update(other._data)
self._refresh_heap()
self.purge()
elif isinstance(other, dict):
# revokes are sent as a dict
for key, inserted in items(other):
if isinstance(inserted, (tuple, list)):
# in case someone uses ._data directly for sending update
inserted = inserted[0]
if not isinstance(inserted, float):
raise ValueError(
'Expecting float timestamp, got type '
'{0!r} with value: {1}'.format(
type(inserted), inserted))
self.add(key, inserted)
else:
# XXX AVOID THIS, it could keep old data if more parties
# exchange them all over and over again
for obj in other:
self.add(obj)
[docs] def discard(self, item):
# type: (Any) -> None
# mark an existing item as removed. If KeyError is not found, pass.
self._data.pop(item, None)
self._maybe_refresh_heap()
pop_value = discard
[docs] def purge(self, now=None):
# type: (float) -> None
"""Check oldest items and remove them if needed.
Arguments:
now (float): Time of purging -- by default right now.
This can be useful for unit testing.
"""
now = now or time.time()
now = now() if isinstance(now, Callable) else now
if self.maxlen:
while len(self._data) > self.maxlen:
self.pop()
# time based expiring:
if self.expires:
while len(self._data) > self.minlen >= 0:
inserted_time, _ = self._heap[0]
if inserted_time + self.expires > now:
break # oldest item hasn't expired yet
self.pop()
[docs] def pop(self, default=None):
# type: (Any) -> Any
"""Remove and return the oldest item, or :const:`None` when empty."""
while self._heap:
_, item = heappop(self._heap)
try:
self._data.pop(item)
except KeyError:
pass
else:
return item
return default
[docs] def as_dict(self):
# type: () -> Dict
"""Whole set as serializable dictionary.
Example:
>>> s = LimitedSet(maxlen=200)
>>> r = LimitedSet(maxlen=200)
>>> for i in range(500):
... s.add(i)
...
>>> r.update(s.as_dict())
>>> r == s
True
"""
return {key: inserted for inserted, key in values(self._data)}
def __eq__(self, other):
# type: (Any) -> bool
return self._data == other._data
def __ne__(self, other):
# type: (Any) -> bool
return not self.__eq__(other)
def __repr__(self):
# type: () -> str
return REPR_LIMITED_SET.format(
self, name=type(self).__name__, size=len(self),
)
def __iter__(self):
# type: () -> Iterable
return (i for _, i in sorted(values(self._data)))
def __len__(self):
# type: () -> int
return len(self._data)
def __contains__(self, key):
# type: (Any) -> bool
return key in self._data
def __reduce__(self):
# type: () -> Any
return self.__class__, (
self.maxlen, self.expires, self.as_dict(), self.minlen)
def __bool__(self):
# type: () -> bool
return bool(self._data)
__nonzero__ = __bool__ # Py2
@property
def _heap_overload(self):
# type: () -> float
"""Compute how much is heap bigger than data [percents]."""
return len(self._heap) * 100 / max(len(self._data), 1) - 100
MutableSet.register(LimitedSet) # noqa: E305
[docs]class Evictable(object):
"""Mixin for classes supporting the ``evict`` method."""
Empty = Empty
[docs] def evict(self):
# type: () -> None
"""Force evict until maxsize is enforced."""
self._evict(range=count)
def _evict(self, limit=100, range=range):
# type: (int) -> None
try:
[self._evict1() for _ in range(limit)]
except IndexError:
pass
def _evict1(self):
# type: () -> None
if self._evictcount <= self.maxsize:
raise IndexError()
try:
self._pop_to_evict()
except self.Empty:
raise IndexError()
@python_2_unicode_compatible
[docs]class Messagebuffer(Evictable):
"""A buffer of pending messages."""
Empty = Empty
def __init__(self, maxsize, iterable=None, deque=deque):
# type: (int, Iterable, Any) -> None
self.maxsize = maxsize
self.data = deque(iterable or [])
self._append = self.data.append
self._pop = self.data.popleft
self._len = self.data.__len__
self._extend = self.data.extend
[docs] def extend(self, it):
# type: (Iterable) -> None
self._extend(it)
self.maxsize and self._evict()
[docs] def take(self, *default):
# type: (*Any) -> Any
try:
return self._pop()
except IndexError:
if default:
return default[0]
raise self.Empty()
def _pop_to_evict(self):
# type: () -> None
return self.take()
def __repr__(self):
# type: () -> str
return '<{0}: {1}/{2}>'.format(
type(self).__name__, len(self), self.maxsize,
)
def __iter__(self):
# type: () -> Iterable
while 1:
try:
yield self._pop()
except IndexError:
break
def __len__(self):
# type: () -> int
return self._len()
def __contains__(self, item):
# type: () -> bool
return item in self.data
def __reversed__(self):
# type: () -> Iterable
return reversed(self.data)
def __getitem__(self, index):
# type: (Any) -> Any
return self.data[index]
@property
def _evictcount(self):
# type: () -> int
return len(self)
Sequence.register(Messagebuffer) # noqa: E305
@python_2_unicode_compatible
[docs]class BufferMap(OrderedDict, Evictable):
"""Map of buffers."""
Buffer = Messagebuffer
Empty = Empty
maxsize = None
total = 0
bufmaxsize = None
def __init__(self, maxsize, iterable=None, bufmaxsize=1000):
# type: (int, Iterable, int) -> None
super(BufferMap, self).__init__()
self.maxsize = maxsize
self.bufmaxsize = 1000
if iterable:
self.update(iterable)
self.total = sum(len(buf) for buf in items(self))
[docs] def put(self, key, item):
# type: (Any, Any) -> None
self._get_or_create_buffer(key).put(item)
self.total += 1
self.move_to_end(key) # least recently used.
self.maxsize and self._evict()
[docs] def extend(self, key, it):
# type: (Any, Iterable) -> None
self._get_or_create_buffer(key).extend(it)
self.total += len(it)
self.maxsize and self._evict()
[docs] def take(self, key, *default):
# type: (Any, *Any) -> Any
item, throw = None, False
try:
buf = self[key]
except KeyError:
throw = True
else:
try:
item = buf.take()
self.total -= 1
except self.Empty:
throw = True
else:
self.move_to_end(key) # mark as LRU
if throw:
if default:
return default[0]
raise self.Empty()
return item
def _get_or_create_buffer(self, key):
# type: (Any) -> Messagebuffer
try:
return self[key]
except KeyError:
buf = self[key] = self._new_buffer()
return buf
def _new_buffer(self):
# type: () -> Messagebuffer
return self.Buffer(maxsize=self.bufmaxsize)
def _LRUpop(self, *default):
# type: (*Any) -> Any
return self[self._LRUkey()].take(*default)
def _pop_to_evict(self):
# type: () -> None
for _ in range(100):
key = self._LRUkey()
buf = self[key]
try:
buf.take()
except (IndexError, self.Empty):
# buffer empty, remove it from mapping.
self.pop(key)
else:
# we removed one item
self.total -= 1
# if buffer is empty now, remove it from mapping.
if not len(buf):
self.pop(key)
else:
# move to least recently used.
self.move_to_end(key)
break
def __repr__(self):
# type: () -> str
return '<{0}: {1}/{2}>'.format(
type(self).__name__, self.total, self.maxsize,
)
@property
def _evictcount(self):
# type: () -> int
return self.total