celery_batches¶
Experimental task class that buffers messages and processes them as a list. Task
requests are buffered in memory (on a worker) until either the flush count or
flush interval is reached. Once the requests are flushed, they are sent to the
task as a list of SimpleRequest
instances.
It is possible to return a result for each task request by calling
mark_as_done
on your results backend. Returning a value from the Batch task
call is only used to provide values to signals and does not populate into the
results backend.
Warning
For this to work you have to set
worker_prefetch_multiplier
to zero, or some value where
the final multiplied value is higher than flush_every
. Note that Celery
will attempt to continually pull data into memory if this is set to zero.
This can cause excessive resource consumption on both Celery workers and the
broker when used with a deep queue.
In the future we hope to add the ability to direct batching tasks to a channel with different QoS requirements than the task channel.
Simple Example
A click counter that flushes the buffer every 100 messages, and every 10 seconds. Does not do anything with the data, but can easily be modified to store it in a database.
# Flush after 100 messages, or 10 seconds.
@app.task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
from collections import Counter
count = Counter(request.kwargs['url'] for request in requests)
for url, count in count.items():
print('>>> Clicks: {0} -> {1}'.format(url, count))
Then you can ask for a click to be counted by doing:
>>> count_click.delay('http://example.com')
Example returning results
An interface to the Web of Trust API that flushes the buffer every 100 messages, and every 10 seconds.
import requests
from urlparse import urlparse
from celery_batches import Batches
wot_api_target = 'https://api.mywot.com/0.4/public_link_json'
@app.task(base=Batches, flush_every=100, flush_interval=10)
def wot_api(requests):
sig = lambda url: url
reponses = wot_api_real(
(sig(*request.args, **request.kwargs) for request in requests)
)
# use mark_as_done to manually return response data
for response, request in zip(reponses, requests):
app.backend.mark_as_done(request.id, response, request=request)
def wot_api_real(urls):
domains = [urlparse(url).netloc for url in urls]
response = requests.get(
wot_api_target,
params={'hosts': ('/').join(set(domains)) + '/'}
)
return [response.json[domain] for domain in domains]
Using the API is done as follows:
>>> wot_api.delay('http://example.com')
Note
If you don’t have an app
instance then use the current app proxy
instead:
from celery import current_app
current_app.backend.mark_as_done(request.id, response, request=request)
API
- class celery_batches.Batches¶
- Strategy(task, app, consumer)¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- abstract = True¶
Deprecated attribute
abstract
here for compatibility.
- apply(args=None, kwargs=None, *_args, **_kwargs)¶
Execute this task locally as a batch of size 1, by blocking until the task returns.
- Arguments:
args (Tuple): positional arguments passed on to the task.
- Returns:
celery.result.EagerResult: pre-evaluated result.
- apply_buffer(requests, args=(), kwargs={})¶
- flush(requests)¶
- flush_every = 10¶
Maximum number of message in buffer.
- flush_interval = 30¶
Timeout in seconds before buffer is flushed anyway.
- ignore_result = False¶
If enabled the worker won’t store task state and return values for this task. Defaults to the
task_ignore_result
setting.
- priority = None¶
Default task priority.
- rate_limit = None¶
Rate limit for this task type. Examples:
None
(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)
- reject_on_worker_lost = None¶
Even if
acks_late
is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>¶
Task request stack, the current request will be the topmost.
- run(requests)¶
The body of the task executed by workers.
- serializer = 'json'¶
The name of a serializer that are registered with
kombu.serialization.registry
. Default is ‘json’.
- store_errors_even_if_ignored = False¶
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- track_started = False¶
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the
task_track_started
setting.
- typing = True¶
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing
.
- class celery_batches.SimpleRequest(id, name, args, kwargs, delivery_info, hostname, reply_to, correlation_id)¶
A request to execute a task.
A list of
SimpleRequest
instances is provided to the batch task during execution.This must be pickleable (if using the prefork pool), but generally should have the same properties as
Request
.- args = ()¶
positional arguments
- chord = None¶
TODO
- correlation_id = None¶
used similarly to reply_to
- delivery_info = None¶
message delivery information.
- classmethod from_request(request)¶
- hostname = None¶
worker node name
- id = None¶
task id
- kwargs = {}¶
keyword arguments
- name = None¶
task name
- reply_to = None¶
used by rpc backend when failures reported by parent process