This document describes an older version of Celery (2.5). For the latest stable version please go here.
Collect messages and processes them as a list.
Example
A click counter that flushes the buffer every 100 messages, and every 10 seconds.
from celery.task import task
from celery.contrib.batches import Batches
# Flush after 100 messages, or 10 seconds.
@task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
from collections import Counter
count = Counter(request.kwargs["url"] for request in requests)
for url, count in count.items():
print(">>> Clicks: %s -> %s" % (url, count))
Registering the click is done as follows:
>>> count_click.delay(url="http://example.com")
Warning
For this to work you have to set CELERYD_PREFETCH_MULTIPLIER to zero, or some value where the final multiplied value is higher than flush_every.
In the future we hope to add the ability to direct batching tasks to a channel with different QoS requirements than the task channel.
copyright: |
|
---|---|
license: | BSD, see LICENSE for more details. |
API
Maximum number of message in buffer.
Timeout in seconds before buffer is flushed anyway.
Pickleable request.
positional arguments
message delivery information.
worker node name
task id
keyword arguments
task name