This document describes the current stable version of Celery (3.1). For development docs, go here.

Source code for celery.worker.autoreload

# -*- coding: utf-8 -*-

    This module implements automatic module reloading
from __future__ import absolute_import

import hashlib
import os
import select
import sys
import time

from collections import defaultdict
from threading import Event

from kombu.utils import eventio
from kombu.utils.encoding import ensure_bytes

from celery import bootsteps
from celery.five import items
from celery.platforms import ignore_errno
from celery.utils.imports import module_file
from celery.utils.log import get_logger
from celery.utils.threads import bgThread

from .components import Pool

try:                        # pragma: no cover
    import pyinotify
    _ProcessEvent = pyinotify.ProcessEvent
except ImportError:         # pragma: no cover
    pyinotify = None        # noqa
    _ProcessEvent = object  # noqa

__all__ = [
    'WorkerComponent', 'Autoreloader', 'Monitor', 'BaseMonitor',
    'StatMonitor', 'KQueueMonitor', 'InotifyMonitor', 'file_hash',

logger = get_logger(__name__)

[docs]class WorkerComponent(bootsteps.StartStopStep): label = 'Autoreloader' conditional = True requires = (Pool, ) def __init__(self, w, autoreload=None, **kwargs): self.enabled = w.autoreload = autoreload w.autoreloader = None
[docs] def create(self, w): w.autoreloader = self.instantiate(w.autoreloader_cls, w) return w.autoreloader if not w.use_eventloop else None
[docs] def register_with_event_loop(self, w, hub): w.autoreloader.register_with_event_loop(hub) hub.on_close.add(w.autoreloader.on_event_loop_close)
[docs]def file_hash(filename, algorithm='md5'): hobj = with open(filename, 'rb') as f: for chunk in iter(lambda: ** 20), ''): hobj.update(ensure_bytes(chunk)) return hobj.digest()
[docs]class BaseMonitor(object): def __init__(self, files, on_change=None, shutdown_event=None, interval=0.5): self.files = files self.interval = interval self._on_change = on_change self.modify_times = defaultdict(int) self.shutdown_event = shutdown_event or Event()
[docs] def start(self): raise NotImplementedError('Subclass responsibility')
[docs] def stop(self): pass
[docs] def on_change(self, modified): if self._on_change: return self._on_change(modified)
[docs] def on_event_loop_close(self, hub): pass
[docs]class StatMonitor(BaseMonitor): """File change monitor based on the ``stat`` system call.""" def _mtimes(self): return ((f, self._mtime(f)) for f in self.files) def _maybe_modified(self, f, mt): return mt is not None and self.modify_times[f] != mt
[docs] def register_with_event_loop(self, hub): hub.call_repeatedly(2.0, self.find_changes)
[docs] def find_changes(self): maybe_modified = self._maybe_modified modified = dict((f, mt) for f, mt in self._mtimes() if maybe_modified(f, mt)) if modified: self.on_change(modified) self.modify_times.update(modified)
[docs] def start(self): while not self.shutdown_event.is_set(): self.find_changes() time.sleep(self.interval)
@staticmethod def _mtime(path): try: return os.stat(path).st_mtime except Exception: pass
[docs]class KQueueMonitor(BaseMonitor): """File change monitor based on BSD kernel event notifications""" def __init__(self, *args, **kwargs): super(KQueueMonitor, self).__init__(*args, **kwargs) self.filemap = dict((f, None) for f in self.files) self.fdmap = {}
[docs] def register_with_event_loop(self, hub): if eventio.kqueue is not None: self._kq = eventio._kqueue() self.add_events(self._kq) self._kq.on_file_change = self.handle_event hub.add_reader(self._kq._kqueue, self._kq.poll, 0)
[docs] def on_event_loop_close(self, hub): self.close(self._kq)
[docs] def add_events(self, poller): for f in self.filemap: self.filemap[f] = fd =, os.O_RDONLY) self.fdmap[fd] = f poller.watch_file(fd)
[docs] def handle_event(self, events): self.on_change([self.fdmap[e.ident] for e in events])
[docs] def start(self): self.poller = eventio.poll() self.add_events(self.poller) self.poller.on_file_change = self.handle_event while not self.shutdown_event.is_set(): self.poller.poll(1)
[docs] def close(self, poller): for f, fd in items(self.filemap): if fd is not None: poller.unregister(fd) with ignore_errno('EBADF'): # pragma: no cover os.close(fd) self.filemap.clear() self.fdmap.clear()
[docs] def stop(self): self.close(self.poller) self.poller.close()
[docs]class InotifyMonitor(_ProcessEvent): """File change monitor based on Linux kernel `inotify` subsystem""" def __init__(self, modules, on_change=None, **kwargs): assert pyinotify self._modules = modules self._on_change = on_change self._wm = None self._notifier = None
[docs] def register_with_event_loop(self, hub): self.create_notifier() hub.add_reader(self._wm.get_fd(), self.on_readable)
[docs] def on_event_loop_close(self, hub): pass
[docs] def on_readable(self): self._notifier.read_events() self._notifier.process_events()
[docs] def create_notifier(self): self._wm = pyinotify.WatchManager() self._notifier = pyinotify.Notifier(self._wm, self) add_watch = self._wm.add_watch flags = pyinotify.IN_MODIFY | pyinotify.IN_ATTRIB for m in self._modules: add_watch(m, flags)
[docs] def start(self): try: self.create_notifier() self._notifier.loop() finally: if self._wm: self._wm.close() # Notifier.close is called at the end of Notifier.loop self._wm = self._notifier = None
[docs] def stop(self): pass
[docs] def process_(self, event): self.on_change([event.path])
process_IN_ATTRIB = process_IN_MODIFY = process_
[docs] def on_change(self, modified): if self._on_change: return self._on_change(modified)
def default_implementation(): if hasattr(select, 'kqueue') and eventio.kqueue is not None: return 'kqueue' elif sys.platform.startswith('linux') and pyinotify: return 'inotify' else: return 'stat' implementations = {'kqueue': KQueueMonitor, 'inotify': InotifyMonitor, 'stat': StatMonitor} Monitor = implementations[ os.environ.get('CELERYD_FSNOTIFY') or default_implementation()]
[docs]class Autoreloader(bgThread): """Tracks changes in modules and fires reload commands""" Monitor = Monitor def __init__(self, controller, modules=None, monitor_cls=None, **options): super(Autoreloader, self).__init__() self.controller = controller app = self.modules = app.loader.task_modules if modules is None else modules self.options = options self._monitor = None self._hashes = None self.file_to_module = {}
[docs] def on_init(self): files = self.file_to_module files.update(dict( (module_file(sys.modules[m]), m) for m in self.modules)) self._monitor = self.Monitor( files, self.on_change, shutdown_event=self._is_shutdown, **self.options) self._hashes = dict([(f, file_hash(f)) for f in files])
[docs] def register_with_event_loop(self, hub): if self._monitor is None: self.on_init() self._monitor.register_with_event_loop(hub)
[docs] def on_event_loop_close(self, hub): if self._monitor is not None: self._monitor.on_event_loop_close(hub)
[docs] def body(self): self.on_init() with ignore_errno('EINTR', 'EAGAIN'): self._monitor.start()
def _maybe_modified(self, f): if os.path.exists(f): digest = file_hash(f) if digest != self._hashes[f]: self._hashes[f] = digest return True return False
[docs] def on_change(self, files): modified = [f for f in files if self._maybe_modified(f)] if modified: names = [self.file_to_module[module] for module in modified]'Detected modified modules: %r', names) self._reload(names)
def _reload(self, modules): self.controller.reload(modules, reload=True)
[docs] def stop(self): if self._monitor: self._monitor.stop()