-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement graceful handling of WDQS rate-limiting #48
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
release: python manage.py migrate | ||
web: gunicorn commons_api.wsgi:application | ||
worker: celery -A commons_api worker --beat --without-heartbeat -X shapefiles -n default-worker@%h | ||
worker: celery -A commons_api worker --beat --without-heartbeat -Q celery,wdqs -n default-worker@%h | ||
# celery_beat: celery -A commons_api beat --without-heartbeat | ||
# shapefiles_worker: celery -A commons_api worker --without-heartbeat -c 1 -Q shapefiles -n shapefiles-worker@%h --max-tasks-per-child=1 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import http.client | ||
import uuid | ||
from unittest import mock | ||
from urllib.error import HTTPError | ||
|
||
from celery.app.task import Context | ||
from django.conf import settings | ||
from django.test import TestCase | ||
|
||
from commons_api import celery_app | ||
from commons_api.wikidata.tasks import refresh_country_list | ||
|
||
|
||
class WDQSRateLimitingTestCase(TestCase): | ||
@mock.patch('SPARQLWrapper.Wrapper.urlopener') | ||
@mock.patch('time.sleep') | ||
def testRetriesIfTooManyRequests(self, time_sleep, urlopen): | ||
retry_after = 10 | ||
urlopen.side_effect = HTTPError(url=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Identical blocks of code found in 2 locations. Consider refactoring. |
||
code=http.client.TOO_MANY_REQUESTS, | ||
hdrs={'Retry-After': str(retry_after)}, | ||
msg='', fp=None) | ||
with self.assertRaises(HTTPError): | ||
refresh_country_list() | ||
|
||
self.assertEqual(settings.WDQS_RETRIES, urlopen.call_count) | ||
self.assertEqual(settings.WDQS_RETRIES - 1, time_sleep.call_count) | ||
time_sleep.assert_has_calls([mock.call(retry_after)] * (settings.WDQS_RETRIES - 1)) | ||
|
||
@mock.patch('SPARQLWrapper.Wrapper.urlopener') | ||
@mock.patch('time.sleep') | ||
def testSuspendsConsuming(self, time_sleep, urlopen): | ||
retry_after = 10 | ||
urlopen.side_effect = HTTPError(url=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Identical blocks of code found in 2 locations. Consider refactoring. |
||
code=http.client.TOO_MANY_REQUESTS, | ||
hdrs={'Retry-After': str(retry_after)}, | ||
msg='', fp=None) | ||
refresh_country_list._default_request = Context(id=str(uuid.uuid4()), | ||
called_directly=False, | ||
delivery_info={'routing_key': 'wdqs'}, | ||
hostname='nodename') | ||
|
||
with mock.patch.object(celery_app.control, 'cancel_consumer') as cancel_consumer, \ | ||
mock.patch.object(celery_app.control, 'add_consumer') as add_consumer: | ||
manager = mock.Mock() | ||
manager.attach_mock(cancel_consumer, 'cancel_consumer') | ||
manager.attach_mock(add_consumer, 'add_consumer') | ||
manager.attach_mock(time_sleep, 'sleep') | ||
with self.assertRaises(HTTPError): | ||
refresh_country_list.run() | ||
manager.assert_has_calls([mock.call.cancel_consumer('wdqs', connection=mock.ANY, destination=['nodename']), | ||
mock.call.sleep(retry_after), | ||
mock.call.sleep(retry_after), | ||
mock.call.sleep(retry_after), | ||
mock.call.sleep(retry_after), | ||
mock.call.add_consumer('wdqs', connection=mock.ANY, destination=['nodename'])]) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,27 @@ | ||
import logging | ||
import time | ||
|
||
import celery.app | ||
import celery.task | ||
import functools | ||
import http.client | ||
import itertools | ||
from typing import Mapping | ||
import urllib.error | ||
from typing import Callable, Mapping | ||
|
||
import re | ||
import SPARQLWrapper | ||
from django.conf import settings | ||
from django.dispatch import Signal | ||
from django.template.loader import get_template | ||
from django.utils import translation | ||
|
||
from . import models | ||
from .namespaces import WD, WDS | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def lang_dict(terms): | ||
return {term.language: str(term) for term in terms} | ||
|
||
|
@@ -83,9 +94,89 @@ def group(first): | |
yield group(first) | ||
|
||
|
||
def templated_wikidata_query(query_name, context): | ||
wdqs_rate_limiting = Signal(['retry_after']) | ||
|
||
|
||
def templated_wikidata_query(query_name: str, context: dict, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function |
||
rate_limiting_handler: Callable[[bool], None]=None) -> dict: | ||
"""Constructs a query for Wikidata using django.template and returns the parsed results | ||
|
||
If the query elicits a `429 Too Many Requests` response, it retries up to `settings.WDQS_RETRIES` times, and | ||
calls the rate_limiting_handler callback if provided to signal the start and end of a "stop querying" period. | ||
|
||
:param query_name: A template name that can be loaded by Django's templating system | ||
:param context: A template context dict to use when rendering the query | ||
:param rate_limiting_handler: A function to handle rate-limiting requests. Should suspend all querying if called | ||
with `True`, and resume it if called with `False`. | ||
:returns: The parsed SRJ results as a basic Python data structure | ||
""" | ||
|
||
rate_limiting_handler = rate_limiting_handler or (lambda suspend: None) | ||
sparql = SPARQLWrapper.SPARQLWrapper(settings.WDQS_URL) | ||
sparql.setMethod(SPARQLWrapper.POST) | ||
sparql.setQuery(get_template(query_name).render(context)) | ||
sparql.setReturnFormat(SPARQLWrapper.JSON) | ||
return sparql.query().convert() | ||
has_suspended = False | ||
try: | ||
for i in range(1, settings.WDQS_RETRIES + 1): | ||
try: | ||
logger.info("Performing query %r (attempt %d/%d)", query_name, i, settings.WDQS_RETRIES) | ||
response = sparql.query() | ||
except urllib.error.HTTPError as e: | ||
if e.code == http.client.TOO_MANY_REQUESTS and i < settings.WDQS_RETRIES: | ||
if not has_suspended: | ||
has_suspended = True | ||
rate_limiting_handler(True) | ||
retry_after = int(e.headers.get('Retry-After', 60)) | ||
time.sleep(retry_after) | ||
else: | ||
raise | ||
else: | ||
return response.convert() | ||
finally: | ||
if has_suspended: | ||
rate_limiting_handler(False) | ||
|
||
|
||
def queries_wikidata(task_func): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function |
||
"""Decorator for task functions that query Wikidata | ||
|
||
This decorator passes a handle_ratelimiting argument to the wrapped task that should be passed to | ||
`templated_wikidata_query` to handle rate-limiting requests from WDQS by suspending the execution of tasks that | ||
query Wikidata. This is achieved by having celery cancel consumption of the given queue by the worker if `suspend` | ||
is True, and resume it otherwise. | ||
|
||
This behaviour doesn't occur if the task was called directly — i.e. not in a worker. | ||
|
||
Tasks that query Wikidata should be separated from other tasks by being sent to a different queue, by e.g. | ||
|
||
@celery.shared_task(bind=True, queue='wdqs') | ||
@utils.queries_wikidata | ||
def task_function(self, …, templated_wikidata_query=None): | ||
… | ||
""" | ||
@functools.wraps(task_func) | ||
def new_task_func(self: celery.task.Task, *args, **kwargs): | ||
def handle_ratelimiting(suspend): | ||
app = self.app | ||
# Celery routes to the right queue using the default exchange and a routing key, so the routing key tells | ||
# us our queue name. See <https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-default>. | ||
queue = self.request.delivery_info['routing_key'] | ||
# This identifies the current celery worker | ||
nodename = self.request.hostname | ||
with app.connection_or_acquire() as conn: | ||
if suspend: | ||
logger.info("WDQS rate-limiting started; WDQS task consumption suspended") | ||
app.control.cancel_consumer(queue, connection=conn, destination=[nodename]) | ||
self.update_state(state='RATE_LIMITED') | ||
else: | ||
logger.info("WDQS rate-limiting finished; WDQS task consumption resumed") | ||
app.control.add_consumer(queue, connection=conn, destination=[nodename]) | ||
self.update_state(state='ACTIVE') | ||
|
||
# Only use a handler if executing in a celery worker. | ||
rate_limiting_handler = handle_ratelimiting if not self.request.called_directly else None | ||
|
||
return task_func(self, *args, rate_limiting_handler=rate_limiting_handler, **kwargs) | ||
|
||
return new_task_func |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function
refresh_labels
has 6 arguments (exceeds 4 allowed). Consider refactoring.