Source code for kombu.utils.limits

"""Token bucket implementation for rate limiting."""

from collections import deque
from time import monotonic

__all__ = ('TokenBucket',)

[docs]class TokenBucket: """Token Bucket Algorithm. See Also: Most of this code was stolen from an entry in the ASPN Python Cookbook: Warning: Thread Safety: This implementation is not thread safe. Access to a `TokenBucket` instance should occur within the critical section of any multithreaded code. """ #: The rate in tokens/second that the bucket will be refilled. fill_rate = None #: Maximum number of tokens in the bucket. capacity = 1 #: Timestamp of the last time a token was taken out of the bucket. timestamp = None def __init__(self, fill_rate, capacity=1): self.capacity = float(capacity) self._tokens = capacity self.fill_rate = float(fill_rate) self.timestamp = monotonic() self.contents = deque()
[docs] def add(self, item): self.contents.append(item)
[docs] def pop(self): return self.contents.popleft()
[docs] def clear_pending(self): self.contents.clear()
[docs] def can_consume(self, tokens=1): """Check if one or more tokens can be consumed. Returns: bool: true if the number of tokens can be consumed from the bucket. If they can be consumed, a call will also consume the requested number of tokens from the bucket. Calls will only consume `tokens` (the number requested) or zero tokens -- it will never consume a partial number of tokens. """ if tokens <= self._get_tokens(): self._tokens -= tokens return True return False
[docs] def expected_time(self, tokens=1): """Return estimated time of token availability. Returns: float: the time in seconds. """ _tokens = self._get_tokens() tokens = max(tokens, _tokens) return (tokens - _tokens) / self.fill_rate
def _get_tokens(self): if self._tokens < self.capacity: now = monotonic() delta = self.fill_rate * (now - self.timestamp) self._tokens = min(self.capacity, self._tokens + delta) self.timestamp = now return self._tokens