This document describes the current stable version of Celery (5.0). For development docs, go here.

celery.events.state

In-memory representation of cluster state.

This module implements a data-structure used to keep track of the state of a cluster of workers and the tasks it is working on (by consuming events).

For every event consumed the state is updated, so the state represents the state of the cluster at the time of the last event.

Snapshots (celery.events.snapshot) can be used to take “pictures” of this state at regular intervals to for example, store that in a database.

class celery.events.state.State(callback=None, workers=None, tasks=None, taskheap=None, max_workers_in_memory=5000, max_tasks_in_memory=10000, on_node_join=None, on_node_leave=None, tasks_by_type=None, tasks_by_worker=None)[source]

Records clusters state.

class Task(uuid=None, cluster_state=None, children=None, **kwargs)

Task State.

args = None
as_dict()
client = None
clock = 0
eta = None
event(type_, timestamp=None, local_received=None, fields=None, precedence=<function precedence>, setattr=<built-in function setattr>, task_event_to_state=<built-in method get of dict object>, RETRY='RETRY')
exception = None
exchange = None
expires = None
failed = None
property id
info(fields=None, extra=None)

Information about this task suitable for on-screen display.

kwargs = None
merge_rules = {'RECEIVED': ('name', 'args', 'kwargs', 'parent_id', 'root_id', 'retries', 'eta', 'expires')}
name = None
property origin
parent
parent_id = None
property ready
received = None
rejected = None
result = None
retried = None
retries = None
revoked = None
root
root_id = None
routing_key = None
runtime = None
sent = None
started = None
state = 'PENDING'
succeeded = None
timestamp = None
traceback = None
worker = None
class Worker(hostname=None, pid=None, freq=60, heartbeats=None, clock=0, active=None, processed=None, loadavg=None, sw_ident=None, sw_ver=None, sw_sys=None)

Worker State.

active
property alive
clock
event
expire_window = 200
freq
property heartbeat_expires
heartbeat_max = 4
heartbeats
hostname
property id
loadavg
pid
processed
property status_string
sw_ident
sw_sys
sw_ver
update(f, **kw)
alive_workers()[source]

Return a list of (seemingly) alive workers.

clear(ready=True)[source]
clear_tasks(ready=True)[source]
event(event)[source]
event_count = 0
freeze_while(fun, *args, **kwargs)[source]
get_or_create_task(uuid)[source]

Get or create task by uuid.

get_or_create_worker(hostname, **kwargs)[source]

Get or create worker by hostname.

Returns

of (worker, was_created) pairs.

Return type

Tuple

heap_multiplier = 4
itertasks(limit=None)[source]
rebuild_taskheap(timetuple=<class 'kombu.clocks.timetuple'>)[source]
task_count = 0
task_event(type_, fields)[source]

Deprecated, use event().

task_types()[source]

Return a list of all seen task types.

tasks_by_time(limit=None, reverse=True)[source]

Generator yielding tasks ordered by time.

Yields

Tuples of (uuid, Task).

tasks_by_timestamp(limit=None, reverse=True)

Generator yielding tasks ordered by time.

Yields

Tuples of (uuid, Task).

worker_event(type_, fields)[source]

Deprecated, use event().

class celery.events.state.Task(uuid=None, cluster_state=None, children=None, **kwargs)[source]

Task State.

args = None
as_dict()[source]
client = None
clock = 0
eta = None
event(type_, timestamp=None, local_received=None, fields=None, precedence=<function precedence>, setattr=<built-in function setattr>, task_event_to_state=<built-in method get of dict object>, RETRY='RETRY')[source]
exception = None
exchange = None
expires = None
failed = None
property id
info(fields=None, extra=None)[source]

Information about this task suitable for on-screen display.

kwargs = None
merge_rules = {'RECEIVED': ('name', 'args', 'kwargs', 'parent_id', 'root_id', 'retries', 'eta', 'expires')}

How to merge out of order events. Disorder is detected by logical ordering (e.g., task-received must’ve happened before a task-failed event).

A merge rule consists of a state and a list of fields to keep from that state. (RECEIVED, ('name', 'args'), means the name and args fields are always taken from the RECEIVED state, and any values for these fields received before or after is simply ignored.

name = None
property origin
parent[source]
parent_id = None
property ready
received = None
rejected = None
result = None
retried = None
retries = None
revoked = None
root[source]
root_id = None
routing_key = None
runtime = None
sent = None
started = None
state = 'PENDING'
succeeded = None
timestamp = None
traceback = None
worker = None
class celery.events.state.Worker(hostname=None, pid=None, freq=60, heartbeats=None, clock=0, active=None, processed=None, loadavg=None, sw_ident=None, sw_ver=None, sw_sys=None)[source]

Worker State.

active
property alive
clock
event
expire_window = 200
freq
property heartbeat_expires
heartbeat_max = 4
heartbeats
hostname
property id
loadavg
pid
processed
property status_string
sw_ident
sw_sys
sw_ver
update(f, **kw)[source]
celery.events.state.heartbeat_expires(timestamp, freq=60, expire_window=200, Decimal=<class 'decimal.Decimal'>, float=<class 'float'>, isinstance=<built-in function isinstance>)[source]

Return time when heartbeat expires.