This document describes Celery 2.4. For development docs, go here.
celery.contrib.batches¶
Collect messages and processes them as a list.
Example
A click counter that flushes the buffer every 100 messages, and every 10 seconds.
from celery.task import task
from celery.contrib.batches import Batches
# Flush after 100 messages, or 10 seconds.
@task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
from collections import Counter
count = Counter(request.kwargs["url"] for request in requests)
for url, count in count.items():
print(">>> Clicks: %s -> %s" % (url, count))
Registering the click is done as follows:
>>> count_click.delay(url="http://example.com")
Warning
For this to work you have to set
CELERYD_PREFETCH_MULTIPLIER
to zero, or some value where
the final multiplied value is higher than flush_every
.
In the future we hope to add the ability to direct batching tasks to a channel with different QoS requirements than the task channel.
copyright: |
|
---|---|
license: | BSD, see LICENSE for more details. |
API
-
class
celery.contrib.batches.
Batches
¶ -
apply_buffer
(requests, args=(), kwargs={})¶
-
debug
(msg)¶
-
execute
(request, pool, loglevel, logfile)¶
-
flush
(requests)¶
-
flush_every
= 10¶ Maximum number of message in buffer.
-
flush_interval
= 30¶ Timeout in seconds before buffer is flushed anyway.
-
logger
¶
-
run
(requests)¶
-
-
class
celery.contrib.batches.
SimpleRequest
(id, name, args, kwargs, delivery_info, hostname)¶ Pickleable request.
-
args
= ()¶ positional arguments
-
delivery_info
= None¶ message delivery information.
-
classmethod
from_request
(request)¶
-
hostname
= None¶ worker node name
-
id
= None¶ task id
-
kwargs
= {}¶ keyword arguments
-
name
= None¶ task name
-