Examples#
Below are some simple examples of using Batches
tasks. Note that they the
eamples do not fully configure the Celery
instance, which depends
on your setup (e.g. which broker/backend you’re planning to use).
Simple Example#
A click counter that flushes the buffer every 100 messages, and every 10 seconds. Does not do anything with the data, but can easily be modified to store it in a database.
from collections import Counter
from celery_batches import Batches
from celery import Celery
app = Celery("counter")
# Flush after 100 messages, or 10 seconds.
@app.task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
"""Count the number of times each URL is requested."""
count = Counter(request.kwargs["url"] for request in requests)
for url, count in count.items():
print(f">>> Clicks: {url} -> {count}")
Then you can ask for a click to be counted by doing:
>>> count_click.delay(url='http://example.com')
Database example#
It can be useful to batch together tasks to reduce database updates (in situations where a missed update is not important), e.g. updating the last seen time of a user:
from celery_batches import Batches
from celery import Celery
from my_app import User
app = Celery("last_seen")
@app.task(base=Batches, flush_every=100, flush_interval=10)
def last_seen(requests):
"""De-duplicate incoming arguments to only do a task once per input."""
# Generate a map of unique args -> requests.
last_seen = {}
for request in requests:
user_id, when = request.args
if user_id not in last_seen or last_seen[user_id] < when:
last_seen[user_id] = when
# Update the datastore once per user.
for user_id, when in last_seen.items():
User.objects.filter(id=user_id).update(last_logged_in=when)
Bulk inserting/updating data#
It can also be useful to just bulk insert data as quickly as possible, but when the discrete data is from separate tasks.
from celery_batches import Batches
from celery import Celery
from my_app import MyModel
app = Celery("bulk_insert")
@app.task(base=Batches, flush_every=100, flush_interval=10)
def bulk_insert(requests):
"""Insert many rows into a database at once instead of individually."""
data = []
for request in requests:
data.append(MyModel(**request.kwargs))
# Create all the new rows at once.
MyModel.objects.bulk_create(data)
Example returning results#
An interface to the GitHub API that avoids requesting the API endpoint for each task. It flushes the buffer every 100 messages, and every 10 seconds.
import json
from urllib.request import urlopen
from celery_batches import Batches
from celery import Celery
app = Celery("github_api")
emoji_endpoint = "https://api.github.com/emojis"
@app.task(base=Batches, flush_every=100, flush_interval=10)
def check_emoji(requests):
"""Check if the requested emoji are supported by GitHub."""
supported_emoji = get_supported_emoji()
# use mark_as_done to manually return response data
for request in requests:
response = request.args[0] in supported_emoji
app.backend.mark_as_done(request.id, response, request=request)
def get_supported_emoji():
"""Fetch the supported GitHub emojis."""
response = urlopen(emoji_endpoint)
# The response is a map of emoji name to image.
return set(json.load(response))
Using the API is done as follows:
>>> result = check_emoji.delay('celery')
>>> assert result.get() is False