This document describes Celery 2.4. For development docs, go here.


Collect messages and processes them as a list.


A click counter that flushes the buffer every 100 messages, and every 10 seconds.

from celery.task import task
from celery.contrib.batches import Batches

# Flush after 100 messages, or 10 seconds.
@task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
    from collections import Counter
    count = Counter(request.kwargs["url"] for request in requests)
    for url, count in count.items():
        print(">>> Clicks: %s -> %s" % (url, count))

Registering the click is done as follows:

>>> count_click.delay(url="")


For this to work you have to set CELERYD_PREFETCH_MULTIPLIER to zero, or some value where the final multiplied value is higher than flush_every.

In the future we hope to add the ability to direct batching tasks to a channel with different QoS requirements than the task channel.

  1. 2009 - 2011 by Ask Solem.

BSD, see LICENSE for more details.


class celery.contrib.batches.Batches
apply_buffer(requests, args=(), kwargs={})
execute(request, pool, loglevel, logfile)
flush_every = 10

Maximum number of message in buffer.

flush_interval = 30

Timeout in seconds before buffer is flushed anyway.

class celery.contrib.batches.SimpleRequest(id, name, args, kwargs, delivery_info, hostname)

Pickleable request.

args = ()

positional arguments

delivery_info = None

message delivery information.

classmethod from_request(request)
hostname = None

worker node name

id = None

task id

kwargs = {}

keyword arguments

name = None

task name