This document describes the current stable version of Celery (3.1). For development docs, go here.

celery.events

celery.events

Events is a stream of messages sent for certain actions occurring in the worker (and clients if CELERY_SEND_TASK_SENT_EVENT is enabled), used for monitoring purposes.

class celery.events.Events(app=None)[source]
Dispatcher[source]
Receiver[source]
State[source]
default_dispatcher(*args, **kwds)[source]
celery.events.Event(type, _fields=None, __dict__=<type 'dict'>, __now__=<built-in function time>, **fields)[source]

Create an event.

An event is a dictionary, the only required field is type. A timestamp field will be set to the current time if not provided.

class celery.events.EventDispatcher(connection=None, hostname=None, enabled=True, channel=None, buffer_while_offline=True, app=None, serializer=None, groups=None)[source]

Dispatches event messages.

Parameters:
  • connection – Connection to the broker.
  • hostname – Hostname to identify ourselves as, by default uses the hostname returned by anon_nodename().
  • groups – List of groups to send events for. send() will ignore send requests to groups not in this list. If this is None, all events will be sent. Example groups include "task" and "worker".
  • enabled – Set to False to not actually publish any events, making send() a noop operation.
  • channel – Can be used instead of connection to specify an exact channel to use when sending events.
  • buffer_while_offline – If enabled events will be buffered while the connection is down. flush() must be called as soon as the connection is re-established.

You need to close() this after use.

DISABLED_TRANSPORTS = set(['sql'])
app = None
close()[source]

Close the event dispatcher.

disable()[source]
enable()[source]
extend_buffer(other)[source]

Copies the outbound buffer of another instance.

flush()[source]

Flushes the outbound buffer.

on_disabled = None
on_enabled = None
publish(type, fields, producer, retry=False, retry_policy=None, blind=False, utcoffset=<function utcoffset>, Event=<function Event>)[source]

Publish event using a custom Producer instance.

Parameters:
  • type – Event type name, with group separated by dash (-).
  • fields – Dictionary of event fields, must be json serializable.
  • producerProducer instance to use, only the publish method will be called.
  • retry – Retry in the event of connection failure.
  • retry_policy – Dict of custom retry policy, see ensure().
  • blind – Don’t set logical clock value (also do not forward the internal logical clock).
  • Event – Event type used to create event, defaults to Event().
  • utcoffset – Function returning the current utcoffset in hours.
publisher
send(type, blind=False, **fields)[source]

Send event.

Parameters:
  • type – Event type name, with group separated by dash (-).
  • retry – Retry in the event of connection failure.
  • retry_policy – Dict of custom retry policy, see ensure().
  • blind – Don’t set logical clock value (also do not forward the internal logical clock).
  • Event – Event type used to create event, defaults to Event().
  • utcoffset – Function returning the current utcoffset in hours.
  • **fields – Event fields, must be json serializable.
warn_if_yajl()[source]
class celery.events.EventReceiver(channel, handlers=None, routing_key='#', node_id=None, app=None, queue_prefix='celeryev', accept=None)[source]

Capture events.

Parameters:
  • connection – Connection to the broker.
  • handlers – Event handlers.

handlers is a dict of event types and their handlers, the special handler “*” captures all events that doesn’t have a handler.

app = None
capture(limit=None, timeout=None, wakeup=True)[source]

Open up a consumer capturing events.

This has to run in the main process, and it will never stop unless EventDispatcher.should_stop is set to True, or forced via KeyboardInterrupt or SystemExit.

connection
event_from_message(body, localize=True, now=<built-in function time>, tzfields=<operator.itemgetter object>, adjust_timestamp=<function adjust_timestamp>, CLIENT_CLOCK_SKEW=-1)[source]
get_consumers(Consumer, channel)[source]
itercapture(limit=None, timeout=None, wakeup=True)[source]
on_consume_ready(connection, channel, consumers, wakeup=True, **kwargs)[source]
process(type, event)[source]

Process the received event by dispatching it to the appropriate handler.

wakeup_workers(channel=None)[source]