celery_batches

Experimental task class that buffers messages and processes them as a list. Task requests are buffered in memory (on a worker) until either the flush count or flush interval is reached. Once the requests are flushed, they are sent to the task as a list of SimpleRequest instances.

It is possible to return a result for each task request by calling mark_as_done on your results backend. Returning a value from the Batch task call is only used to provide values to signals and does not populate into the results backend.

Warning

For this to work you have to set worker_prefetch_multiplier to zero, or some value where the final multiplied value is higher than flush_every. Note that Celery will attempt to continually pull data into memory if this is set to zero. This can cause excessive resource consumption on both Celery workers and the broker when used with a deep queue.

In the future we hope to add the ability to direct batching tasks to a channel with different QoS requirements than the task channel.

Simple Example

A click counter that flushes the buffer every 100 messages, and every 10 seconds. Does not do anything with the data, but can easily be modified to store it in a database.

# Flush after 100 messages, or 10 seconds.
@app.task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
    from collections import Counter
    count = Counter(request.kwargs['url'] for request in requests)
    for url, count in count.items():
        print('>>> Clicks: {0} -> {1}'.format(url, count))

Then you can ask for a click to be counted by doing:

>>> count_click.delay('http://example.com')

Example returning results

An interface to the Web of Trust API that flushes the buffer every 100 messages, and every 10 seconds.

import requests
from urlparse import urlparse

from celery_batches import Batches

wot_api_target = 'https://api.mywot.com/0.4/public_link_json'

@app.task(base=Batches, flush_every=100, flush_interval=10)
def wot_api(requests):
    sig = lambda url: url
    reponses = wot_api_real(
        (sig(*request.args, **request.kwargs) for request in requests)
    )
    # use mark_as_done to manually return response data
    for response, request in zip(reponses, requests):
        app.backend.mark_as_done(request.id, response)


def wot_api_real(urls):
    domains = [urlparse(url).netloc for url in urls]
    response = requests.get(
        wot_api_target,
        params={'hosts': ('/').join(set(domains)) + '/'}
    )
    return [response.json[domain] for domain in domains]

Using the API is done as follows:

>>> wot_api.delay('http://example.com')

Note

If you don’t have an app instance then use the current app proxy instead:

from celery import current_app
current_app.backend.mark_as_done(request.id, response)

API

class celery_batches.Batches
Strategy(task, app, consumer)

str(object=’‘) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

abstract = True
apply(args=None, kwargs=None, *_args, **_kwargs)

Execute this task locally as a batch of size 1, by blocking until the task returns.

Arguments:

args (Tuple): positional arguments passed on to the task.

Returns:

celery.result.EagerResult: pre-evaluated result.

apply_buffer(requests, args=(), kwargs={})
flush(requests)
flush_every = 10

Maximum number of message in buffer.

flush_interval = 30

Timeout in seconds before buffer is flushed anyway.

ignore_result = False
priority = None
rate_limit = None
reject_on_worker_lost = None
request_stack = <celery.utils.threads._LocalStack object>
run(requests)

The body of the task executed by workers.

serializer = 'json'
store_errors_even_if_ignored = False
track_started = False
typing = True
class celery_batches.SimpleRequest(id, name, args, kwargs, delivery_info, hostname)

Pickleable request.

args = ()

positional arguments

delivery_info = None

message delivery information.

classmethod from_request(request)
hostname = None

worker node name

id = None

task id

kwargs = {}

keyword arguments

name = None

task name