This document describes the current stable version of Celery (4.4). For development docs, go here.

Source code for celery.apps.beat

# -*- coding: utf-8 -*-
"""Beat command-line program.

This module is the 'program-version' of :mod:`celery.beat`.

It does everything necessary to run that module
as an actual application, like installing signal handlers
and so on.
from __future__ import absolute_import, print_function, unicode_literals

import numbers
import socket
import sys
from datetime import datetime

from celery import VERSION_BANNER, beat, platforms
from celery.five import text_t
from celery.utils.imports import qualname
from celery.utils.log import LOG_LEVELS, get_logger
from celery.utils.time import humanize_seconds

__all__ = ('Beat',)

LocalTime -> {timestamp}
Configuration ->
    . broker -> {conninfo}
    . loader -> {loader}
    . scheduler -> {scheduler}
    . logfile -> {logfile}@%{loglevel}
    . maxinterval -> {hmax_interval} ({max_interval}s)

logger = get_logger('celery.beat')

[docs]class Beat(object): """Beat as a service.""" Service = beat.Service app = None def __init__(self, max_interval=None, app=None, socket_timeout=30, pidfile=None, no_color=None, loglevel='WARN', logfile=None, schedule=None, scheduler=None, scheduler_cls=None, # XXX use scheduler redirect_stdouts=None, redirect_stdouts_level=None, **kwargs): = app = app or either = self.loglevel = loglevel self.logfile = logfile self.schedule = either('beat_schedule_filename', schedule) self.scheduler_cls = either( 'beat_scheduler', scheduler, scheduler_cls) self.redirect_stdouts = either( 'worker_redirect_stdouts', redirect_stdouts) self.redirect_stdouts_level = either( 'worker_redirect_stdouts_level', redirect_stdouts_level) self.max_interval = max_interval self.socket_timeout = socket_timeout self.no_color = no_color self.colored = app.log.colored( self.logfile, enabled=not no_color if no_color is not None else no_color, ) self.pidfile = pidfile if not isinstance(self.loglevel, numbers.Integral): self.loglevel = LOG_LEVELS[self.loglevel.upper()]
[docs] def run(self): print(str(self.colored.cyan( 'celery beat v{0} is starting.'.format(VERSION_BANNER)))) self.init_loader() self.set_process_title() self.start_scheduler()
[docs] def setup_logging(self, colorize=None): if colorize is None and self.no_color is not None: colorize = not self.no_color, self.logfile, self.redirect_stdouts, self.redirect_stdouts_level, colorize=colorize)
[docs] def start_scheduler(self): if self.pidfile: platforms.create_pidlock(self.pidfile) service = self.Service(, max_interval=self.max_interval, scheduler_cls=self.scheduler_cls, schedule_filename=self.schedule, ) print(self.banner(service)) self.setup_logging() if self.socket_timeout: logger.debug('Setting default socket timeout to %r', self.socket_timeout) socket.setdefaulttimeout(self.socket_timeout) try: self.install_sync_handler(service) service.start() except Exception as exc: logger.critical('beat raised exception %s: %r', exc.__class__, exc, exc_info=True) raise
[docs] def banner(self, service): c = self.colored return text_t( # flake8: noqa'__ ', c.magenta('-'),' ... __ '), c.magenta('-'),' _\n'), c.reset(self.startup_info(service))), )
[docs] def init_loader(self): # Run the worker init handler. # (Usually imports task modules and such.)
[docs] def startup_info(self, service): scheduler = service.get_scheduler(lazy=True) return STARTUP_INFO_FMT.format(,, logfile=self.logfile or '[stderr]', loglevel=LOG_LEVELS[self.loglevel], loader=qualname(, scheduler=qualname(scheduler),, hmax_interval=humanize_seconds(scheduler.max_interval), max_interval=scheduler.max_interval, )
[docs] def set_process_title(self): arg_start = 'manage' in sys.argv[0] and 2 or 1 platforms.set_process_title( 'celery beat', info=' '.join(sys.argv[arg_start:]), )
[docs] def install_sync_handler(self, service): """Install a `SIGTERM` + `SIGINT` handler saving the schedule.""" def _sync(signum, frame): service.sync() raise SystemExit() platforms.signals.update(SIGTERM=_sync, SIGINT=_sync)