celery-batches#

celery-batches is a task class that buffers messages and processes them as a list. Task requests are buffered in memory (on a worker) until either the flush count or flush interval is reached. Once the requests are flushed, they are sent to the task as a list of SimpleRequest instances.

Some potential use-cases for batching of task calls include:

  • De-duplicating tasks.

  • Accumulating / only handling the latest task with similar arguments.

  • Bulk inserting / updating of data.

  • Tasks with expensive setup that can run across a range of arguments.

For the Batches task to work properly you must configure worker_prefetch_multiplier to zero, or some value where the final multiplied value is higher than flush_every.

Warning

Celery will attempt to continually pull all data from a queue into memory if worker_prefetch_multiplier is set to zero. This can cause excessive resource consumption on both Celery workers and the broker when used with a deep queue.

In the future we hope to add the ability to direct batching tasks to a channel with different QoS requirements than the task channel.

Returning results#

It is possible to return a result for each task request by calling mark_as_done on your results backend. Returning a value from the Batches task call is only used to provide values to signals and does not populate into the results backend.

Note

If you don’t have an app instance then use the current app proxy instead:

from celery import current_app
current_app.backend.mark_as_done(request.id, response, request=request)

Retrying tasks#

In order to retry a failed task, the task must be re-executed with the original task_id, see the example below:

@app.task(base=Batches, flush_every=100, flush_interval=10)
def flaky_task(requests):
    for request in requests:
        # Do something that might fail.
        try:
            response = might_fail(*request.args, **request.kwargs)
        except TemporaryError:
            # Retry the task 10 seconds from now with the same arguments and task_id.
            flaky_task.apply_async(
                args=request.args,
                kwargs=request.kwargs,
                countdown=10,
                task_id=request.id,
            )
        else:
            app.backend.mark_as_done(request.id, response, request=request)

Note that the retried task is still bound by the flush rules of the Batches task, it is used as a lower-bound and will not run before that timeout. In the example above it will run between 10 - 20 seconds from now, assuming no other tasks are in the queue.