This document describes the current stable version of Celery (5.0). For development docs, go here.

Source code for celery.events.snapshot

"""Periodically store events in a database.

Consuming the events as a stream isn't always suitable
so this module implements a system to take snapshots of the
state of a cluster at regular intervals.  There's a full
implementation of this writing the snapshots to a database
in :mod:`djcelery.snapshots` in the `django-celery` distribution.
"""
from kombu.utils.limits import TokenBucket

from celery import platforms
from celery.app import app_or_default
from celery.utils.dispatch import Signal
from celery.utils.imports import instantiate
from celery.utils.log import get_logger
from celery.utils.time import rate
from celery.utils.timer2 import Timer

__all__ = ('Polaroid', 'evcam')

logger = get_logger('celery.evcam')


[docs]class Polaroid: """Record event snapshots.""" timer = None shutter_signal = Signal(name='shutter_signal', providing_args={'state'}) cleanup_signal = Signal(name='cleanup_signal') clear_after = False _tref = None _ctref = None def __init__(self, state, freq=1.0, maxrate=None, cleanup_freq=3600.0, timer=None, app=None): self.app = app_or_default(app) self.state = state self.freq = freq self.cleanup_freq = cleanup_freq self.timer = timer or self.timer or Timer() self.logger = logger self.maxrate = maxrate and TokenBucket(rate(maxrate))
[docs] def install(self): self._tref = self.timer.call_repeatedly(self.freq, self.capture) self._ctref = self.timer.call_repeatedly( self.cleanup_freq, self.cleanup, )
[docs] def on_shutter(self, state): pass
[docs] def on_cleanup(self): pass
[docs] def cleanup(self): logger.debug('Cleanup: Running...') self.cleanup_signal.send(sender=self.state) self.on_cleanup()
[docs] def shutter(self): if self.maxrate is None or self.maxrate.can_consume(): logger.debug('Shutter: %s', self.state) self.shutter_signal.send(sender=self.state) self.on_shutter(self.state)
[docs] def capture(self): self.state.freeze_while(self.shutter, clear_after=self.clear_after)
[docs] def cancel(self): if self._tref: self._tref() # flush all received events. self._tref.cancel() if self._ctref: self._ctref.cancel()
def __enter__(self): self.install() return self def __exit__(self, *exc_info): self.cancel()
[docs]def evcam(camera, freq=1.0, maxrate=None, loglevel=0, logfile=None, pidfile=None, timer=None, app=None): """Start snapshot recorder.""" app = app_or_default(app) if pidfile: platforms.create_pidlock(pidfile) app.log.setup_logging_subsystem(loglevel, logfile) print(f'-> evcam: Taking snapshots with {camera} (every {freq} secs.)') state = app.events.State() cam = instantiate(camera, state, app=app, freq=freq, maxrate=maxrate, timer=timer) cam.install() conn = app.connection_for_read() recv = app.events.Receiver(conn, handlers={'*': state.event}) try: try: recv.capture(limit=None) except KeyboardInterrupt: raise SystemExit finally: cam.cancel() conn.close()