This document describes Celery 2.4. For development docs, go here.

celery.contrib.batches

Collect messages and processes them as a list.

Example

A click counter that flushes the buffer every 100 messages, and every 10 seconds.

from celery.task import task
from celery.contrib.batches import Batches

# Flush after 100 messages, or 10 seconds.
@task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
    from collections import Counter
    count = Counter(request.kwargs["url"] for request in requests)
    for url, count in count.items():
        print(">>> Clicks: %s -> %s" % (url, count))

Registering the click is done as follows:

>>> count_click.delay(url="http://example.com")

Warning

For this to work you have to set CELERYD_PREFETCH_MULTIPLIER to zero, or some value where the final multiplied value is higher than flush_every.

In the future we hope to add the ability to direct batching tasks to a channel with different QoS requirements than the task channel.

copyright:
  1. 2009 - 2011 by Ask Solem.
license:

BSD, see LICENSE for more details.

API

class celery.contrib.batches.Batches
apply_buffer(requests, args=(), kwargs={})
debug(msg)
execute(request, pool, loglevel, logfile)
flush(requests)
flush_every = 10

Maximum number of message in buffer.

flush_interval = 30

Timeout in seconds before buffer is flushed anyway.

logger
run(requests)
class celery.contrib.batches.SimpleRequest(id, name, args, kwargs, delivery_info, hostname)

Pickleable request.

args = ()

positional arguments

delivery_info = None

message delivery information.

classmethod from_request(request)
hostname = None

worker node name

id = None

task id

kwargs = {}

keyword arguments

name = None

task name