This document describes the current stable version of Celery (4.2). For development docs, go here.

Source code for celery.worker.autoscale

# -*- coding: utf-8 -*-
"""Pool Autoscaling.

This module implements the internal thread responsible
for growing and shrinking the pool according to the
current autoscale settings.

The autoscale thread is only enabled if
the :option:`celery worker --autoscale` option is used.
"""
from __future__ import absolute_import, unicode_literals

import os
import threading
from time import sleep

from kombu.asynchronous.semaphore import DummyLock

from celery import bootsteps
from celery.five import monotonic
from celery.utils.log import get_logger
from celery.utils.threads import bgThread

from . import state
from .components import Pool

__all__ = ('Autoscaler', 'WorkerComponent')

logger = get_logger(__name__)
debug, info, error = logger.debug, logger.info, logger.error

AUTOSCALE_KEEPALIVE = float(os.environ.get('AUTOSCALE_KEEPALIVE', 30))


[docs]class WorkerComponent(bootsteps.StartStopStep): """Bootstep that starts the autoscaler thread/timer in the worker.""" label = 'Autoscaler' conditional = True requires = (Pool,) def __init__(self, w, **kwargs): self.enabled = w.autoscale w.autoscaler = None
[docs] def create(self, w): scaler = w.autoscaler = self.instantiate( w.autoscaler_cls, w.pool, w.max_concurrency, w.min_concurrency, worker=w, mutex=DummyLock() if w.use_eventloop else None, )
return scaler if not w.use_eventloop else None
[docs] def register_with_event_loop(self, w, hub): w.consumer.on_task_message.add(w.autoscaler.maybe_scale) hub.call_repeatedly( w.autoscaler.keepalive, w.autoscaler.maybe_scale,
)
[docs]class Autoscaler(bgThread): """Background thread to autoscale pool workers.""" def __init__(self, pool, max_concurrency, min_concurrency=0, worker=None, keepalive=AUTOSCALE_KEEPALIVE, mutex=None): super(Autoscaler, self).__init__() self.pool = pool self.mutex = mutex or threading.Lock() self.max_concurrency = max_concurrency self.min_concurrency = min_concurrency self.keepalive = keepalive self._last_scale_up = None self.worker = worker assert self.keepalive, 'cannot scale down too fast.'
[docs] def body(self): with self.mutex: self.maybe_scale()
sleep(1.0) def _maybe_scale(self, req=None): procs = self.processes cur = min(self.qty, self.max_concurrency) if cur > procs: self.scale_up(cur - procs) return True cur = max(self.qty, self.min_concurrency) if cur < procs: self.scale_down(procs - cur) return True
[docs] def maybe_scale(self, req=None): if self._maybe_scale(req):
self.pool.maintain_pool()
[docs] def update(self, max=None, min=None): with self.mutex: if max is not None: if max < self.processes: self._shrink(self.processes - max) self.max_concurrency = max if min is not None: if min > self.processes: self._grow(min - self.processes) self.min_concurrency = min
return self.max_concurrency, self.min_concurrency
[docs] def force_scale_up(self, n): with self.mutex: new = self.processes + n if new > self.max_concurrency: self.max_concurrency = new
self._grow(n)
[docs] def force_scale_down(self, n): with self.mutex: new = self.processes - n if new < self.min_concurrency: self.min_concurrency = max(new, 0)
self._shrink(min(n, self.processes))
[docs] def scale_up(self, n): self._last_scale_up = monotonic()
return self._grow(n)
[docs] def scale_down(self, n): if self._last_scale_up and ( monotonic() - self._last_scale_up > self.keepalive):
return self._shrink(n) def _grow(self, n): info('Scaling up %s processes.', n) self.pool.grow(n) self.worker.consumer._update_prefetch_count(n) def _shrink(self, n): info('Scaling down %s processes.', n) try: self.pool.shrink(n) except ValueError: debug("Autoscaler won't scale down: all processes busy.") except Exception as exc: error('Autoscaler: scale_down: %r', exc, exc_info=True) self.worker.consumer._update_prefetch_count(-n)
[docs] def info(self): return { 'max': self.max_concurrency, 'min': self.min_concurrency, 'current': self.processes, 'qty': self.qty,
} @property def qty(self): return len(state.reserved_requests) @property def processes(self):
return self.pool.num_processes