This document describes the current stable version of Celery (4.2). For development docs, go here.

Source code for celery.worker.pidbox

"""Worker Pidbox (remote control)."""
from __future__ import absolute_import, unicode_literals

import socket
import threading

from kombu.common import ignore_errors
from kombu.utils.encoding import safe_str

from celery.utils.collections import AttributeDict
from celery.utils.functional import pass1
from celery.utils.log import get_logger

from . import control

__all__ = ('Pidbox', 'gPidbox')

logger = get_logger(__name__)
debug, error, info = logger.debug, logger.error, logger.info


[docs]class Pidbox(object): """Worker mailbox.""" consumer = None def __init__(self, c): self.c = c self.hostname = c.hostname self.node = c.app.control.mailbox.Node( safe_str(c.hostname), handlers=control.Panel.data, state=AttributeDict( app=c.app, hostname=c.hostname, consumer=c, tset=pass1 if c.controller.use_eventloop else set), ) self._forward_clock = self.c.app.clock.forward
[docs] def on_message(self, body, message): # just increase clock as clients usually don't # have a valid clock to adjust with. self._forward_clock() try: self.node.handle_message(body, message) except KeyError as exc: error('No such control command: %s', exc) except Exception as exc: error('Control command error: %r', exc, exc_info=True)
self.reset()
[docs] def start(self, c): self.node.channel = c.connection.channel() self.consumer = self.node.listen(callback=self.on_message)
self.consumer.on_decode_error = c.on_decode_error
[docs] def on_stop(self):
pass
[docs] def stop(self, c): self.on_stop()
self.consumer = self._close_channel(c)
[docs] def reset(self): self.stop(self.c)
self.start(self.c) def _close_channel(self, c): if self.node and self.node.channel: ignore_errors(c, self.node.channel.close)
[docs] def shutdown(self, c): self.on_stop() if self.consumer: debug('Canceling broadcast consumer...') ignore_errors(c, self.consumer.cancel)
self.stop(self.c)
[docs]class gPidbox(Pidbox): """Worker pidbox (greenlet).""" _node_shutdown = None _node_stopped = None _resets = 0
[docs] def start(self, c):
c.pool.spawn_n(self.loop, c)
[docs] def on_stop(self): if self._node_stopped: self._node_shutdown.set() debug('Waiting for broadcast thread to shutdown...') self._node_stopped.wait()
self._node_stopped = self._node_shutdown = None
[docs] def reset(self):
self._resets += 1 def _do_reset(self, c, connection): self._close_channel(c) self.node.channel = connection.channel() self.consumer = self.node.listen(callback=self.on_message) self.consumer.consume()
[docs] def loop(self, c): resets = [self._resets] shutdown = self._node_shutdown = threading.Event() stopped = self._node_stopped = threading.Event() try: with c.connection_for_read() as connection: info('pidbox: Connected to %s.', connection.as_uri()) self._do_reset(c, connection) while not shutdown.is_set() and c.connection: if resets[0] < self._resets: resets[0] += 1 self._do_reset(c, connection) try: connection.drain_events(timeout=1.0) except socket.timeout: pass finally:
stopped.set()