kombu.pidbox

Generic process mailbox.

copyright:
  1. 2009 - 2012 by Ask Solem.
license:

BSD, see LICENSE for more details.

Introduction

Creating the applications Mailbox

>>> mailbox = pidbox.Mailbox("celerybeat", type="direct")

>>> @mailbox.handler
>>> def reload_schedule(state, **kwargs):
...     state["beat"].reload_schedule()

>>> @mailbox.handler
>>> def connection_info(state, **kwargs):
...     return {"connection": state["connection"].info()}

Example Node

>>> connection = kombu.BrokerConnection()
>>> state = {"beat": beat,
            "connection": connection}
>>> consumer = mailbox(connection).Node(hostname).listen()
>>> try:
...     while True:
...         connection.drain_events(timeout=1)
... finally:
...     consumer.cancel()

Example Client

>>> mailbox.cast("reload_schedule")   # cast is async.
>>> info = celerybeat.call("connection_info", timeout=1)

Mailbox

class kombu.pidbox.Mailbox(namespace, type='direct', connection=None)
namespace = None

Name of application.

connection = None

Connection (if bound).

type = 'direct'

Exchange type (usually direct, or fanout for broadcast).

exchange = None

mailbox exchange (init by constructor).

reply_exchange = None

exchange to send replies to.

Node(hostname=None, state=None, channel=None, handlers=None)
call(destination, command, kwargs={}, timeout=None, callback=None, channel=None)
cast(destination, command, kwargs={})
abcast(command, kwargs={})
multi_call(command, kwargs={}, timeout=1, limit=None, callback=None, channel=None)
get_reply_queue(ticket)
get_queue(hostname)

Node

class kombu.pidbox.Node(hostname, state=None, channel=None, handlers=None, mailbox=None)
hostname = None

hostname of the node.

mailbox = None

the Mailbox this is a node for.

handlers = None

map of method name/handlers.

state = None

current context (passed on to handlers)

channel = None

current channel.

Consumer(channel=None, **options)
handler(fun)
listen(channel=None, callback=None)
dispatch(method, arguments=None, reply_to=None)
dispatch_from_message(message)
handle_call(method, arguments)
handle_cast(method, arguments)
handle(method, arguments={})
handle_message(body, message)
reply(data, exchange, routing_key, **kwargs)

Table Of Contents

Previous topic

kombu.compat

Next topic

kombu.exceptions

This Page