This document describes the current stable version of Celery (4.0). For development docs, go here.

Source code for celery.concurrency.base

# -*- coding: utf-8 -*-
"""Base Execution Pool."""
from __future__ import absolute_import, unicode_literals

import logging
import os
import sys

from billiard.einfo import ExceptionInfo
from billiard.exceptions import WorkerLostError
from kombu.utils.encoding import safe_repr

from celery.exceptions import WorkerShutdown, WorkerTerminate
from celery.five import monotonic, reraise
from celery.utils import timer2
from celery.utils.text import truncate
from celery.utils.log import get_logger

__all__ = ['BasePool', 'apply_target']

logger = get_logger('celery.pool')


[docs]def apply_target(target, args=(), kwargs={}, callback=None, accept_callback=None, pid=None, getpid=os.getpid, propagate=(), monotonic=monotonic, **_): """Apply function within pool context.""" if accept_callback: accept_callback(pid or getpid(), monotonic()) try: ret = target(*args, **kwargs) except propagate: raise except Exception: raise except (WorkerShutdown, WorkerTerminate): raise except BaseException as exc: try: reraise(WorkerLostError, WorkerLostError(repr(exc)), sys.exc_info()[2]) except WorkerLostError: callback(ExceptionInfo()) else: callback(ret)
[docs]class BasePool(object): """Task pool.""" RUN = 0x1 CLOSE = 0x2 TERMINATE = 0x3 Timer = timer2.Timer #: set to true if the pool can be shutdown from within #: a signal handler. signal_safe = True #: set to true if pool uses greenlets. is_green = False _state = None _pool = None _does_debug = True #: only used by multiprocessing pool uses_semaphore = False task_join_will_block = True body_can_be_buffer = False def __init__(self, limit=None, putlocks=True, forking_enable=True, callbacks_propagate=(), app=None, **options): self.limit = limit self.putlocks = putlocks self.options = options self.forking_enable = forking_enable self.callbacks_propagate = callbacks_propagate self.app = app
[docs] def on_start(self): pass
[docs] def did_start_ok(self): return True
[docs] def flush(self): pass
[docs] def on_stop(self): pass
[docs] def register_with_event_loop(self, loop): pass
[docs] def on_apply(self, *args, **kwargs): pass
[docs] def on_terminate(self): pass
[docs] def on_soft_timeout(self, job): pass
[docs] def on_hard_timeout(self, job): pass
[docs] def maintain_pool(self, *args, **kwargs): pass
[docs] def terminate_job(self, pid, signal=None): raise NotImplementedError( '{0} does not implement kill_job'.format(type(self)))
[docs] def restart(self): raise NotImplementedError( '{0} does not implement restart'.format(type(self)))
[docs] def stop(self): self.on_stop() self._state = self.TERMINATE
[docs] def terminate(self): self._state = self.TERMINATE self.on_terminate()
[docs] def start(self): self._does_debug = logger.isEnabledFor(logging.DEBUG) self.on_start() self._state = self.RUN
[docs] def close(self): self._state = self.CLOSE self.on_close()
[docs] def on_close(self): pass
[docs] def apply_async(self, target, args=[], kwargs={}, **options): """Equivalent of the :func:`apply` built-in function. Callbacks should optimally return as soon as possible since otherwise the thread which handles the result will get blocked. """ if self._does_debug: logger.debug('TaskPool: Apply %s (args:%s kwargs:%s)', target, truncate(safe_repr(args), 1024), truncate(safe_repr(kwargs), 1024)) return self.on_apply(target, args, kwargs, waitforslot=self.putlocks, callbacks_propagate=self.callbacks_propagate, **options)
def _get_info(self): return { 'max-concurrency': self.limit, } @property def info(self): return self._get_info() @property def active(self): return self._state == self.RUN @property def num_processes(self): return self.limit