This document describes the current stable version of Celery (3.1). For development docs, go here.

celery.events.state

celery.events.state

This module implements a datastructure used to keep track of the state of a cluster of workers and the tasks it is working on (by consuming events).

For every event consumed the state is updated, so the state represents the state of the cluster at the time of the last event.

Snapshots (celery.events.snapshot) can be used to take “pictures” of this state at regular intervals to e.g. store that in a database.

class celery.events.state.Worker(hostname=None, pid=None, freq=60, heartbeats=None, clock=0, active=None, processed=None, loadavg=None, sw_ident=None, sw_ver=None, sw_sys=None)[source]

Worker State.

active
alive
clock
event
expire_window = 200
freq
heartbeat_expires
heartbeat_max = 4
heartbeats
hostname
id
loadavg
on_heartbeat(*args, **kwargs)[source]
on_offline(*args, **kwargs)[source]
on_online(*args, **kwargs)[source]
pid
processed
status_string
sw_ident
sw_sys
sw_ver
update(f, **kw)[source]
update_heartbeat(*args, **kwargs)[source]
class celery.events.state.Task(uuid=None, **kwargs)[source]

Task State.

args = None
as_dict()[source]
client = None
clock = 0
eta = None
event(type_, timestamp=None, local_received=None, fields=None, precedence=<function precedence>, items=<function items>, dict=<type 'dict'>, PENDING='PENDING', RECEIVED='RECEIVED', STARTED='STARTED', FAILURE='FAILURE', RETRY='RETRY', SUCCESS='SUCCESS', REVOKED='REVOKED')[source]
exception = None
exchange = None
expires = None
failed = None
info(fields=None, extra=[])[source]

Information about this task suitable for on-screen display.

kwargs = None
merge(*args, **kwargs)[source]
merge_rules = {'RECEIVED': ('name', 'args', 'kwargs', 'retries', 'eta', 'expires')}

How to merge out of order events. Disorder is detected by logical ordering (e.g. task-received must have happened before a task-failed event).

A merge rule consists of a state and a list of fields to keep from that state. (RECEIVED, ('name', 'args'), means the name and args fields are always taken from the RECEIVED state, and any values for these fields received before or after is simply ignored.

name = None
on_failed(*args, **kwargs)[source]
on_received(*args, **kwargs)[source]
on_retried(*args, **kwargs)[source]
on_revoked(*args, **kwargs)[source]
on_sent(*args, **kwargs)[source]
on_started(*args, **kwargs)[source]
on_succeeded(*args, **kwargs)[source]
on_unknown_event(*args, **kwargs)[source]
origin
ready
received = None
result = None
retried = None
retries = None
revoked = None
routing_key = None
runtime = None
sent = None
started = None
state = 'PENDING'
succeeded = None
timestamp = None
traceback = None
update(*args, **kwargs)[source]
worker = None
class celery.events.state.State(callback=None, workers=None, tasks=None, taskheap=None, max_workers_in_memory=5000, max_tasks_in_memory=10000, on_node_join=None, on_node_leave=None)[source]

Records clusters state.

class Task(uuid=None, **kwargs)

Task State.

args = None
as_dict()
client = None
clock = 0
eta = None
event(type_, timestamp=None, local_received=None, fields=None, precedence=<function precedence>, items=<function items>, dict=<type 'dict'>, PENDING='PENDING', RECEIVED='RECEIVED', STARTED='STARTED', FAILURE='FAILURE', RETRY='RETRY', SUCCESS='SUCCESS', REVOKED='REVOKED')
exception = None
exchange = None
expires = None
failed = None
info(fields=None, extra=[])

Information about this task suitable for on-screen display.

kwargs = None
merge(*args, **kwargs)
merge_rules = {'RECEIVED': ('name', 'args', 'kwargs', 'retries', 'eta', 'expires')}
name = None
on_failed(*args, **kwargs)
on_received(*args, **kwargs)
on_retried(*args, **kwargs)
on_revoked(*args, **kwargs)
on_sent(*args, **kwargs)
on_started(*args, **kwargs)
on_succeeded(*args, **kwargs)
on_unknown_event(*args, **kwargs)
origin
ready
received = None
result = None
retried = None
retries = None
revoked = None
routing_key = None
runtime = None
sent = None
started = None
state = 'PENDING'
succeeded = None
timestamp = None
traceback = None
update(*args, **kwargs)
worker = None
class Worker(hostname=None, pid=None, freq=60, heartbeats=None, clock=0, active=None, processed=None, loadavg=None, sw_ident=None, sw_ver=None, sw_sys=None)

Worker State.

active
alive
clock
event
expire_window = 200
freq
heartbeat_expires
heartbeat_max = 4
heartbeats
hostname
id
loadavg
on_heartbeat(*args, **kwargs)
on_offline(*args, **kwargs)
on_online(*args, **kwargs)
pid
processed
status_string
sw_ident
sw_sys
sw_ver
update(f, **kw)
update_heartbeat(*args, **kwargs)
alive_workers()[source]

Return a list of (seemingly) alive workers.

clear(ready=True)[source]
clear_tasks(ready=True)[source]
event(event)[source]
event_count = 0
freeze_while(fun, *args, **kwargs)[source]
get_or_create_task(uuid)[source]

Get or create task by uuid.

get_or_create_worker(hostname, **kwargs)[source]

Get or create worker by hostname.

Return tuple of (worker, was_created).

heap_multiplier = 4
itertasks(limit=None)[source]
rebuild_taskheap(timetuple=<class 'kombu.clocks.timetuple'>)[source]
task_count = 0
task_event(type_, fields)[source]

Deprecated, use event().

task_types()[source]

Return a list of all seen task types.

tasks_by_time(limit=None)[source]

Generator giving tasks ordered by time, in (uuid, Task) tuples.

tasks_by_timestamp(limit=None)

Generator giving tasks ordered by time, in (uuid, Task) tuples.

tasks_by_type(name, limit=None)[source]

Get all tasks by type.

Return a list of (uuid, Task) tuples.

tasks_by_worker(hostname, limit=None)[source]

Get all tasks by worker.

worker_event(type_, fields)[source]

Deprecated, use event().

celery.events.state.heartbeat_expires(timestamp, freq=60, expire_window=200, Decimal=<class 'decimal.Decimal'>, float=<type 'float'>, isinstance=<built-in function isinstance>)[source]