Examples#

Below are some simple examples of using Batches tasks. Note that they the eamples do not fully configure the Celery instance, which depends on your setup (e.g. which broker/backend you’re planning to use).

Simple Example#

A click counter that flushes the buffer every 100 messages, and every 10 seconds. Does not do anything with the data, but can easily be modified to store it in a database.

from collections import Counter

from celery_batches import Batches

from celery import Celery

app = Celery("counter")


# Flush after 100 messages, or 10 seconds.
@app.task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
    """Count the number of times each URL is requested."""
    count = Counter(request.kwargs["url"] for request in requests)
    for url, count in count.items():
        print(f">>> Clicks: {url} -> {count}")

Then you can ask for a click to be counted by doing:

>>> count_click.delay(url='http://example.com')

Database example#

It can be useful to batch together tasks to reduce database updates (in situations where a missed update is not important), e.g. updating the last seen time of a user:

from celery_batches import Batches

from celery import Celery

from my_app import User

app = Celery("last_seen")


@app.task(base=Batches, flush_every=100, flush_interval=10)
def last_seen(requests):
    """De-duplicate incoming arguments to only do a task once per input."""
    # Generate a map of unique args -> requests.
    last_seen = {}
    for request in requests:
        user_id, when = request.args
        if user_id not in last_seen or last_seen[user_id] < when:
            last_seen[user_id] = when

    # Update the datastore once per user.
    for user_id, when in last_seen.items():
        User.objects.filter(id=user_id).update(last_logged_in=when)

Bulk inserting/updating data#

It can also be useful to just bulk insert data as quickly as possible, but when the discrete data is from separate tasks.

from celery_batches import Batches

from celery import Celery

from my_app import MyModel

app = Celery("bulk_insert")


@app.task(base=Batches, flush_every=100, flush_interval=10)
def bulk_insert(requests):
    """Insert many rows into a database at once instead of individually."""
    data = []
    for request in requests:
        data.append(MyModel(**request.kwargs))

    # Create all the new rows at once.
    MyModel.objects.bulk_create(data)

Example returning results#

An interface to the GitHub API that avoids requesting the API endpoint for each task. It flushes the buffer every 100 messages, and every 10 seconds.

import json
from urllib.request import urlopen

from celery_batches import Batches

from celery import Celery

app = Celery("github_api")

emoji_endpoint = "https://api.github.com/emojis"


@app.task(base=Batches, flush_every=100, flush_interval=10)
def check_emoji(requests):
    """Check if the requested emoji are supported by GitHub."""
    supported_emoji = get_supported_emoji()
    # use mark_as_done to manually return response data
    for request in requests:
        response = request.args[0] in supported_emoji
        app.backend.mark_as_done(request.id, response, request=request)


def get_supported_emoji():
    """Fetch the supported GitHub emojis."""
    response = urlopen(emoji_endpoint)
    # The response is a map of emoji name to image.
    return set(json.load(response))

Using the API is done as follows:

>>> result = check_emoji.delay('celery')
>>> assert result.get() is False