Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement graceful handling of WDQS rate-limiting #48

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
release: python manage.py migrate
web: gunicorn commons_api.wsgi:application
worker: celery -A commons_api worker --beat --without-heartbeat -X shapefiles -n default-worker@%h
worker: celery -A commons_api worker --beat --without-heartbeat -Q celery,wdqs -n default-worker@%h
# celery_beat: celery -A commons_api beat --without-heartbeat
# shapefiles_worker: celery -A commons_api worker --without-heartbeat -c 1 -Q shapefiles -n shapefiles-worker@%h --max-tasks-per-child=1

1 change: 1 addition & 0 deletions commons_api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@

MEDIA_ROOT = os.environ.get('MEDIA_ROOT') or os.path.expanduser('~/media')

WDQS_RETRIES = int(os.environ.get('WDQS_RETRIES', 5))
WDQS_URL = 'https://query.wikidata.org/sparql'

ENABLE_MODERATION = bool(os.environ.get('ENABLE_MODERATION'))
Expand Down
18 changes: 6 additions & 12 deletions commons_api/wikidata/tasks/country.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
import celery
from SPARQLWrapper import SPARQLWrapper, JSON
from django.conf import settings
from django.template.loader import get_template

from commons_api.wikidata.utils import item_uri_to_id
from commons_api.wikidata import utils
from commons_api.wikidata import models


__all__ = ['refresh_country_list']


@celery.shared_task
def refresh_country_list():
sparql = SPARQLWrapper(settings.WDQS_URL)
sparql.setQuery(get_template('wikidata/query/country_list.rq').render())
sparql.setReturnFormat(JSON)
results = sparql.query().convert()
@celery.shared_task(bind=True, queue='wdqs')
@utils.queries_wikidata
def refresh_country_list(self, rate_limiting_handler):
results = utils.templated_wikidata_query('wikidata/query/country_list.rq', {}, rate_limiting_handler)
seen_ids = set()
for result in results['results']['bindings']:
id = item_uri_to_id(result['item'])
id = utils.item_uri_to_id(result['item'])
country = models.Country.objects.for_id_and_label(id, str(result['itemLabel']['value']))
country.iso_3166_1_code = result['itemCode']['value'].upper() if result.get('itemCode') else None
country.save()
Expand Down
35 changes: 18 additions & 17 deletions commons_api/wikidata/tasks/legislature.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,18 @@
import celery
import collections
import itertools
from SPARQLWrapper import SPARQLWrapper, JSON
from django.conf import settings
from django.template.loader import get_template

from commons_api.wikidata.namespaces import WD
from commons_api.wikidata.utils import item_uri_to_id, statement_uri_to_id, get_date, templated_wikidata_query
from .. import models
from .. import models, utils


@with_periodic_queuing_task(superclass=models.Country)
@celery.shared_task
def refresh_legislatures(id, queued_at):
@celery.shared_task(bind=True, queue='wdqs')
@utils.queries_wikidata
def refresh_legislatures(self, id, queued_at, rate_limiting_handler):
country = models.Country.objects.get(id=id, refresh_legislatures_last_queued=queued_at)
results = templated_wikidata_query('wikidata/query/legislature_list.rq', {'country': country})
# print(get_template('wikidata/query/legislature_list.rq').render())
# print(len(results['results']['bindings']))
results = templated_wikidata_query('wikidata/query/legislature_list.rq', {'country': country},
rate_limiting_handler)
legislature_positions = collections.defaultdict(list)
legislative_terms = collections.defaultdict(list)
for result in results['results']['bindings']:
Expand Down Expand Up @@ -52,7 +48,8 @@ def refresh_legislatures(id, queued_at):
for position in positions]

results = templated_wikidata_query('wikidata/query/legislature_terms_list.rq',
{'house_positions': house_positions})
{'house_positions': house_positions},
rate_limiting_handler)
for result in results['results']['bindings']:
if 'termSpecificPositionLabel' in result:
term_specific_position = models.Position.objects.for_id_and_label(
Expand Down Expand Up @@ -83,12 +80,14 @@ def refresh_legislatures(id, queued_at):


@with_periodic_queuing_task(superclass=models.LegislativeHouse)
@celery.shared_task
def refresh_members(id, queued_at):
@celery.shared_task(bind=True, queue='wdqs')
@utils.queries_wikidata
def refresh_members(self, id, queued_at, rate_limiting_handler):
house = models.LegislativeHouse.objects.get(id=id, refresh_members_last_queued=queued_at)

results = templated_wikidata_query('wikidata/query/legislature_memberships.rq',
{'positions': house.positions.all()})
{'positions': house.positions.all()},
rate_limiting_handler)
seen_statement_ids = set()
for i, (statement, rows) in enumerate(itertools.groupby(results['results']['bindings'],
key=lambda row: row['statement']['value'])):
Expand Down Expand Up @@ -182,12 +181,14 @@ def refresh_members(id, queued_at):


@with_periodic_queuing_task(superclass=models.LegislativeHouse)
@celery.shared_task
def refresh_districts(id, queued_at):
@celery.shared_task(bind=True, queue='wdqs')
@utils.queries_wikidata
def refresh_districts(self, id, queued_at, rate_limiting_handler):
house = models.LegislativeHouse.objects.get(id=id, refresh_districts_last_queued=queued_at)

results = templated_wikidata_query('wikidata/query/legislature_constituencies.rq',
{'house': house})
{'house': house},
rate_limiting_handler)

for result in results['results']['bindings']:
electoral_district = models.ElectoralDistrict.objects.for_id_and_label(item_uri_to_id(result['constituency']),
Expand Down
9 changes: 6 additions & 3 deletions commons_api/wikidata/tasks/wikidata_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@


@with_periodic_queuing_task
@celery.shared_task
def refresh_labels(app_label, model, ids=None, queued_at=None):
@celery.shared_task(bind=True, queue='wdqs')
@utils.queries_wikidata
def refresh_labels(self, app_label, model, ids=None, queued_at=None, rate_limiting_handler=None):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function refresh_labels has 6 arguments (exceeds 4 allowed). Consider refactoring.

"""Refreshes all labels for the given model"""
queryset = get_wikidata_model_by_name(app_label, model).objects.all()
if queued_at is not None:
Expand All @@ -20,7 +21,9 @@ def refresh_labels(app_label, model, ids=None, queued_at=None):
queryset = queryset.objects.filter(id__in=ids)
for items in utils.split_every(queryset, 250):
items = {item.id: item for item in items}
results = utils.templated_wikidata_query('wikidata/query/labels.rq', {'ids': sorted(items)})
results = utils.templated_wikidata_query('wikidata/query/labels.rq',
{'ids': sorted(items)},
rate_limiting_handler)
for id, rows in itertools.groupby(results['results']['bindings'],
key=lambda row: row['id']['value']):
id = utils.item_uri_to_id(id)
Expand Down
1 change: 1 addition & 0 deletions commons_api/wikidata/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .moderation import *
from .popolo import *
from .serializers import *
from .wdqs_rate_limiting import *
from .updating import *
from .utils import *
from .views import *
2 changes: 1 addition & 1 deletion commons_api/wikidata/tests/updating.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ def testRefreshForModelRefreshesMatchingLastQueued(self, templated_wikidata_quer
}]}
}
wikidata_item.refresh_labels('wikidata', 'country', queued_at=self.refresh_labels_last_queued)
templated_wikidata_query.assert_called_once_with('wikidata/query/labels.rq', {'ids': [self.country.id]})
templated_wikidata_query.assert_called_once_with('wikidata/query/labels.rq', {'ids': [self.country.id]}, None)
self.country.refresh_from_db()
self.assertEqual({'en': 'France', 'de': 'Frankreich'}, self.country.labels)
56 changes: 56 additions & 0 deletions commons_api/wikidata/tests/wdqs_rate_limiting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import http.client
import uuid
from unittest import mock
from urllib.error import HTTPError

from celery.app.task import Context
from django.conf import settings
from django.test import TestCase

from commons_api import celery_app
from commons_api.wikidata.tasks import refresh_country_list


class WDQSRateLimitingTestCase(TestCase):
@mock.patch('SPARQLWrapper.Wrapper.urlopener')
@mock.patch('time.sleep')
def testRetriesIfTooManyRequests(self, time_sleep, urlopen):
retry_after = 10
urlopen.side_effect = HTTPError(url=None,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Identical blocks of code found in 2 locations. Consider refactoring.

code=http.client.TOO_MANY_REQUESTS,
hdrs={'Retry-After': str(retry_after)},
msg='', fp=None)
with self.assertRaises(HTTPError):
refresh_country_list()

self.assertEqual(settings.WDQS_RETRIES, urlopen.call_count)
self.assertEqual(settings.WDQS_RETRIES - 1, time_sleep.call_count)
time_sleep.assert_has_calls([mock.call(retry_after)] * (settings.WDQS_RETRIES - 1))

@mock.patch('SPARQLWrapper.Wrapper.urlopener')
@mock.patch('time.sleep')
def testSuspendsConsuming(self, time_sleep, urlopen):
retry_after = 10
urlopen.side_effect = HTTPError(url=None,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Identical blocks of code found in 2 locations. Consider refactoring.

code=http.client.TOO_MANY_REQUESTS,
hdrs={'Retry-After': str(retry_after)},
msg='', fp=None)
refresh_country_list._default_request = Context(id=str(uuid.uuid4()),
called_directly=False,
delivery_info={'routing_key': 'wdqs'},
hostname='nodename')

with mock.patch.object(celery_app.control, 'cancel_consumer') as cancel_consumer, \
mock.patch.object(celery_app.control, 'add_consumer') as add_consumer:
manager = mock.Mock()
manager.attach_mock(cancel_consumer, 'cancel_consumer')
manager.attach_mock(add_consumer, 'add_consumer')
manager.attach_mock(time_sleep, 'sleep')
with self.assertRaises(HTTPError):
refresh_country_list.run()
manager.assert_has_calls([mock.call.cancel_consumer('wdqs', connection=mock.ANY, destination=['nodename']),
mock.call.sleep(retry_after),
mock.call.sleep(retry_after),
mock.call.sleep(retry_after),
mock.call.sleep(retry_after),
mock.call.add_consumer('wdqs', connection=mock.ANY, destination=['nodename'])])
99 changes: 95 additions & 4 deletions commons_api/wikidata/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
import logging
import time

import celery.app
import celery.task
import functools
import http.client
import itertools
from typing import Mapping
import urllib.error
from typing import Callable, Mapping

import re
import SPARQLWrapper
from django.conf import settings
from django.dispatch import Signal
from django.template.loader import get_template
from django.utils import translation

from . import models
from .namespaces import WD, WDS


logger = logging.getLogger(__name__)


def lang_dict(terms):
return {term.language: str(term) for term in terms}

Expand Down Expand Up @@ -83,9 +94,89 @@ def group(first):
yield group(first)


def templated_wikidata_query(query_name, context):
wdqs_rate_limiting = Signal(['retry_after'])


def templated_wikidata_query(query_name: str, context: dict,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function templated_wikidata_query has a Cognitive Complexity of 15 (exceeds 5 allowed). Consider refactoring.

rate_limiting_handler: Callable[[bool], None]=None) -> dict:
"""Constructs a query for Wikidata using django.template and returns the parsed results

If the query elicits a `429 Too Many Requests` response, it retries up to `settings.WDQS_RETRIES` times, and
calls the rate_limiting_handler callback if provided to signal the start and end of a "stop querying" period.

:param query_name: A template name that can be loaded by Django's templating system
:param context: A template context dict to use when rendering the query
:param rate_limiting_handler: A function to handle rate-limiting requests. Should suspend all querying if called
with `True`, and resume it if called with `False`.
:returns: The parsed SRJ results as a basic Python data structure
"""

rate_limiting_handler = rate_limiting_handler or (lambda suspend: None)
sparql = SPARQLWrapper.SPARQLWrapper(settings.WDQS_URL)
sparql.setMethod(SPARQLWrapper.POST)
sparql.setQuery(get_template(query_name).render(context))
sparql.setReturnFormat(SPARQLWrapper.JSON)
return sparql.query().convert()
has_suspended = False
try:
for i in range(1, settings.WDQS_RETRIES + 1):
try:
logger.info("Performing query %r (attempt %d/%d)", query_name, i, settings.WDQS_RETRIES)
response = sparql.query()
except urllib.error.HTTPError as e:
if e.code == http.client.TOO_MANY_REQUESTS and i < settings.WDQS_RETRIES:
if not has_suspended:
has_suspended = True
rate_limiting_handler(True)
retry_after = int(e.headers.get('Retry-After', 60))
time.sleep(retry_after)
else:
raise
else:
return response.convert()
finally:
if has_suspended:
rate_limiting_handler(False)


def queries_wikidata(task_func):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function queries_wikidata has a Cognitive Complexity of 6 (exceeds 5 allowed). Consider refactoring.

"""Decorator for task functions that query Wikidata

This decorator passes a handle_ratelimiting argument to the wrapped task that should be passed to
`templated_wikidata_query` to handle rate-limiting requests from WDQS by suspending the execution of tasks that
query Wikidata. This is achieved by having celery cancel consumption of the given queue by the worker if `suspend`
is True, and resume it otherwise.

This behaviour doesn't occur if the task was called directly — i.e. not in a worker.

Tasks that query Wikidata should be separated from other tasks by being sent to a different queue, by e.g.

@celery.shared_task(bind=True, queue='wdqs')
@utils.queries_wikidata
def task_function(self, …, templated_wikidata_query=None):
"""
@functools.wraps(task_func)
def new_task_func(self: celery.task.Task, *args, **kwargs):
def handle_ratelimiting(suspend):
app = self.app
# Celery routes to the right queue using the default exchange and a routing key, so the routing key tells
# us our queue name. See <https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-default>.
queue = self.request.delivery_info['routing_key']
# This identifies the current celery worker
nodename = self.request.hostname
with app.connection_or_acquire() as conn:
if suspend:
logger.info("WDQS rate-limiting started; WDQS task consumption suspended")
app.control.cancel_consumer(queue, connection=conn, destination=[nodename])
self.update_state(state='RATE_LIMITED')
else:
logger.info("WDQS rate-limiting finished; WDQS task consumption resumed")
app.control.add_consumer(queue, connection=conn, destination=[nodename])
self.update_state(state='ACTIVE')

# Only use a handler if executing in a celery worker.
rate_limiting_handler = handle_ratelimiting if not self.request.called_directly else None

return task_func(self, *args, rate_limiting_handler=rate_limiting_handler, **kwargs)

return new_task_func