This document describes the current stable version of Celery (5.0). For development docs, go here.

Source code for celery.concurrency.eventlet

"""Eventlet execution pool."""
import sys
from time import monotonic

from kombu.asynchronous import timer as _timer  # noqa

from celery import signals  # noqa

from . import base  # noqa

__all__ = ('TaskPool',)

W_RACE = """\
Celery module with %s imported before eventlet patched\
RACE_MODS = ('billiard.', 'celery.', 'kombu.')

#: Warn if we couldn't patch early enough,
#: and thread/socket depending celery modules have already been loaded.
for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
    for side in ('thread', 'threading', 'socket'):  # pragma: no cover
        if getattr(mod, side, None):
            import warnings
            warnings.warn(RuntimeWarning(W_RACE % side))

def apply_target(target, args=(), kwargs=None, callback=None,
                 accept_callback=None, getpid=None):
    kwargs = {} if not kwargs else kwargs
    return base.apply_target(target, args, kwargs, callback, accept_callback,

class Timer(_timer.Timer):
    """Eventlet Timer."""

    def __init__(self, *args, **kwargs):
        from eventlet.greenthread import spawn_after
        from greenlet import GreenletExit
        super().__init__(*args, **kwargs)

        self.GreenletExit = GreenletExit
        self._spawn_after = spawn_after
        self._queue = set()

    def _enter(self, eta, priority, entry, **kwargs):
        secs = max(eta - monotonic(), 0)
        g = self._spawn_after(secs, entry)
        self._queue.add(g), entry)
        g.entry = entry
        g.eta = eta
        g.priority = priority
        g.canceled = False
        return g

    def _entry_exit(self, g, entry):
            except self.GreenletExit:
                g.canceled = True

    def clear(self):
        queue = self._queue
        while queue:
            except (KeyError, self.GreenletExit):

    def cancel(self, tref):
        except self.GreenletExit:

    def queue(self):
        return self._queue

[docs]class TaskPool(base.BasePool): """Eventlet Task Pool.""" Timer = Timer signal_safe = False is_green = True task_join_will_block = False _pool = None _quick_put = None def __init__(self, *args, **kwargs): from eventlet import greenthread from eventlet.greenpool import GreenPool self.Pool = GreenPool self.getcurrent = greenthread.getcurrent self.getpid = lambda: id(greenthread.getcurrent()) self.spawn_n = greenthread.spawn_n super().__init__(*args, **kwargs)
[docs] def on_start(self): self._pool = self.Pool(self.limit) signals.eventlet_pool_started.send(sender=self) self._quick_put = self._pool.spawn_n self._quick_apply_sig = signals.eventlet_pool_apply.send
[docs] def on_stop(self): signals.eventlet_pool_preshutdown.send(sender=self) if self._pool is not None: self._pool.waitall() signals.eventlet_pool_postshutdown.send(sender=self)
[docs] def on_apply(self, target, args=None, kwargs=None, callback=None, accept_callback=None, **_): self._quick_apply_sig( sender=self, target=target, args=args, kwargs=kwargs, ) self._quick_put(apply_target, target, args, kwargs, callback, accept_callback, self.getpid)
[docs] def grow(self, n=1): limit = self.limit + n self._pool.resize(limit) self.limit = limit
[docs] def shrink(self, n=1): limit = self.limit - n self._pool.resize(limit) self.limit = limit
def _get_info(self): info = super()._get_info() info.update({ 'max-concurrency': self.limit, 'free-threads':, 'running-threads': self._pool.running(), }) return info