This document describes the current stable version of Celery (4.4). For development docs, go here.

Source code for celery.worker.pidbox

"""Worker Pidbox (remote control)."""
from __future__ import absolute_import, unicode_literals

import socket
import threading

from kombu.common import ignore_errors
from kombu.utils.encoding import safe_str

from celery.utils.collections import AttributeDict
from celery.utils.functional import pass1
from celery.utils.log import get_logger

from . import control

__all__ = ('Pidbox', 'gPidbox')

logger = get_logger(__name__)
debug, error, info = logger.debug, logger.error,

[docs]class Pidbox(object): """Worker mailbox.""" consumer = None def __init__(self, c): self.c = c self.hostname = c.hostname self.node = safe_str(c.hostname),, state=AttributeDict(, hostname=c.hostname, consumer=c, tset=pass1 if c.controller.use_eventloop else set), ) self._forward_clock =
[docs] def on_message(self, body, message): # just increase clock as clients usually don't # have a valid clock to adjust with. self._forward_clock() try: self.node.handle_message(body, message) except KeyError as exc: error('No such control command: %s', exc) except Exception as exc: error('Control command error: %r', exc, exc_info=True) self.reset()
[docs] def start(self, c): = self.consumer = self.node.listen(callback=self.on_message) self.consumer.on_decode_error = c.on_decode_error
[docs] def on_stop(self): pass
[docs] def stop(self, c): self.on_stop() self.consumer = self._close_channel(c)
[docs] def reset(self): self.stop(self.c) self.start(self.c)
def _close_channel(self, c): if self.node and ignore_errors(c,
[docs] def shutdown(self, c): self.on_stop() if self.consumer: debug('Canceling broadcast consumer...') ignore_errors(c, self.consumer.cancel) self.stop(self.c)
[docs]class gPidbox(Pidbox): """Worker pidbox (greenlet).""" _node_shutdown = None _node_stopped = None _resets = 0
[docs] def start(self, c): c.pool.spawn_n(self.loop, c)
[docs] def on_stop(self): if self._node_stopped: self._node_shutdown.set() debug('Waiting for broadcast thread to shutdown...') self._node_stopped.wait() self._node_stopped = self._node_shutdown = None
[docs] def reset(self): self._resets += 1
def _do_reset(self, c, connection): self._close_channel(c) = self.consumer = self.node.listen(callback=self.on_message) self.consumer.consume()
[docs] def loop(self, c): resets = [self._resets] shutdown = self._node_shutdown = threading.Event() stopped = self._node_stopped = threading.Event() try: with c.connection_for_read() as connection: info('pidbox: Connected to %s.', connection.as_uri()) self._do_reset(c, connection) while not shutdown.is_set() and c.connection: if resets[0] < self._resets: resets[0] += 1 self._do_reset(c, connection) try: connection.drain_events(timeout=1.0) except socket.timeout: pass finally: stopped.set()