Skip to content

Commit

Permalink
Make notifier more resilient to API errors (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
SebastiaanZ authored Jul 19, 2023
1 parent 62a70aa commit fcf377c
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 44 deletions.
8 changes: 3 additions & 5 deletions EuroPythonBot/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class Config(metaclass=Singleton):
def __init__(self):
# Configuration file
config = None
base_path = Path(__file__).resolve().parent
self.CONFIG_PATH = self._get_config_path(base_path)
self.BASE_PATH = Path(__file__).resolve().parent
self.CONFIG_PATH = self._get_config_path(self.BASE_PATH)
with open(self.CONFIG_PATH) as f:
config = toml.loads(f.read())

Expand All @@ -50,9 +50,7 @@ def __init__(self):
self.LOG_LEVEL = config.get("logging", {}).get("LOG_LEVEL", "INFO")

# Mapping
with open(
base_path.joinpath(base_path.joinpath(self.TICKET_TO_ROLES_JSON))
) as ticket_to_roles_file:
with self.BASE_PATH.joinpath(self.TICKET_TO_ROLES_JSON).open() as ticket_to_roles_file:
ticket_to_roles = json.load(ticket_to_roles_file)

self.TICKET_TO_ROLE = ticket_to_roles
Expand Down
8 changes: 5 additions & 3 deletions EuroPythonBot/extensions/programme_notifications/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ async def setup(bot: commands.Bot) -> None:
if config.timewarp:
_logger.info("Time warping is enabled! Time traveling to the conference days.")
now = arrow.now(tz=config.timezone)
# Diff with notifications of first round on first conference day
diff = arrow.get("2023-07-19T10:39:45+02:00") - now
# Diff with some point in time during the conference
diff = arrow.get("2023-07-19T15:59:45+02:00") - now

def _get_now() -> arrow.Arrow:
return arrow.now(tz=config.timezone) + diff
Expand Down Expand Up @@ -68,8 +68,10 @@ def _create_aiohttp_session() -> aiohttp.ClientSession:
"""Create a ClientSession and return it."""
ssl_context = ssl.create_default_context(cafile=certifi.where())
connector = aiohttp.TCPConnector(ssl=ssl_context)
timeout = aiohttp.ClientTimeout(total=20)
return aiohttp.ClientSession(
connector=connector,
headers={"User-Agent": "EuroPython Programme Notifier/2023.1"},
headers={"User-Agent": "EP2023 Programme Notifier/2023.2"},
raise_for_status=True,
timeout=timeout,
)
2 changes: 1 addition & 1 deletion EuroPythonBot/extensions/programme_notifications/cog.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def stats(self, context: commands.Context) -> None:
"""
await context.send(f"There are currently {len(self._notifier)} scheduled notifications.")

@tasks.loop(minutes=15.0)
@tasks.loop(minutes=30.0)
async def _update_schedule(self) -> None:
"""Update the schedule from Pretalx."""
_logger.info("Starting the periodic schedule update...")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
schedule.json -merge -text

Large diffs are not rendered by default.

57 changes: 51 additions & 6 deletions EuroPythonBot/extensions/programme_notifications/services/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import hashlib
import json
import logging
import pathlib
from collections.abc import Iterable
from typing import Any, Protocol, TypeVar

Expand All @@ -17,18 +18,22 @@
import attrs
import cattrs
import yarl
from attrs import validators

from extensions.programme_notifications import configuration, exceptions
from extensions.programme_notifications.domain import discord, europython

_DEFAULT_SCHEDULE_CACHE_PATH: pathlib.Path = (
pathlib.Path(__file__).resolve().parent / "_cached" / "schedule.json"
)
_logger = logging.getLogger(f"bot.{__name__}")
_T = TypeVar("_T")


class IApiClient(Protocol):
"""Protocol for an API client."""

async def fetch_schedule(self) -> europython.Schedule:
async def fetch_schedule(self) -> "ScheduleResponse":
"""Fetch the latest schedule."""

async def fetch_session_details(self, session_id: str) -> tuple[yarl.URL, str]:
Expand All @@ -44,23 +49,55 @@ class ApiClient:

session: aiohttp.ClientSession = attrs.field(kw_only=True)
config: configuration.NotifierConfiguration = attrs.field(kw_only=True)

async def fetch_schedule(self) -> europython.Schedule:
_schedule_cache_path: pathlib.Path = attrs.field(
kw_only=True,
default=_DEFAULT_SCHEDULE_CACHE_PATH,
validator=validators.instance_of(pathlib.Path),
)

@_schedule_cache_path.validator
def _cache_path_exists_validator(self, attribute: str, value: pathlib.Path):
"""Validate that the schedule cache path exists."""
del attribute # unused
if not value.exists():
raise ValueError("The path '%s' does not exist!")

async def fetch_schedule(self) -> "ScheduleResponse":
"""Fetch the schedule from the Pretalx API.
:return: A `europython.Schedule` instance,
"""
url = self.config.pretalx_schedule_url
async with self.session.get(url=url, raise_for_status=True) as response:
response_content = await response.read()
try:
response_content = await self._fetch_schedule(url)
except Exception:
_logger.exception("Fetching the schedule failed, returned cached version.")
response_content = self._cached_schedule_response_content
from_cache = True
else:
_logger.info("Fetched schedule from Pretalx, not using cache.")
from_cache = False

raw_schedule = json.loads(response_content)
return europython.Schedule(
schedule = europython.Schedule(
sessions=self._convert(raw_schedule["slots"], europython.Session),
breaks=self._convert(raw_schedule["breaks"], europython.Break),
version=raw_schedule["version"],
schedule_hash=hashlib.sha1(response_content).hexdigest(),
)
return ScheduleResponse(schedule=schedule, from_cache=from_cache)

async def _fetch_schedule(self, url: str) -> bytes:
"""Fetch the schedule from Pretalx."""
_logger.info("Making call to Pretalx API.")
async with self.session.get(url=url, raise_for_status=True) as response:
response_content = await response.read()
return response_content

@functools.cached_property
def _cached_schedule_response_content(self) -> bytes:
"""Get and cache the cached schedule response content."""
return self._schedule_cache_path.read_bytes()

def _convert(self, raw_instances: Iterable[dict[str, Any]], target_cls: type[_T]) -> list[_T]:
"""Convert the iterable of instance values to a class instance.
Expand Down Expand Up @@ -129,3 +166,11 @@ async def execute_webhook(self, message: discord.WebhookMessage, *, webhook: str
webhook=webhook, status=exc.status, message=exc.message
) from None
_logger.info("Delivered webhook message to webhook %r", webhook)


@attrs.define(frozen=True)
class ScheduleResponse:
"""A response returned by `fetch_sessions`."""

schedule: europython.Schedule
from_cache: bool
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,18 @@ async def schedule_notifications(self, force: bool = False) -> None:
fetched schedule has the same hash as the previously fetched
schedule.
"""
new_schedule = await self._api_client.fetch_schedule()
if new_schedule.schedule_hash == self._previous_schedule_hash and not force:
_logger.info("Scheduled hasn't changed; not rescheduling notifications.")
try:
response = await self._api_client.fetch_schedule()
except Exception:
_logger.exception("Fetching the schedule failed!")
return

if not self._should_update(response) and not force:
_logger.info("No changed schedule available; not rescheduling notifications.")
return

new_schedule = response.schedule
_logger.info("Schedule has changed, updating notifications!")
self._scheduler.cancel_all()
sessions = list(services.filter_conference_days(new_schedule.sessions, self._config))
self._session_information.refresh_from_sessions(sessions)
Expand All @@ -63,6 +70,35 @@ async def schedule_notifications(self, force: bool = False) -> None:
self._previous_schedule_hash = new_schedule.schedule_hash
_logger.info("Scheduled notifications!")

def _should_update(self, api_response: api.ScheduleResponse) -> bool:
"""Check if this response should result in new notifications.
:param api_response: The API response
:return: `True` if notifications need to be updated, `False`
otherwise
"""
if self._previous_schedule_hash is None:
# The absence of a hash indicates that there was no schedule
# yet, which means we should always update.
_logger.info("No schedule cache yet, we should update the notifications.")
return True

if api_response.from_cache:
# We already have a previous schedule hash, indicating that
# notifications are in place, so there's no need to fall
# back to a statically cached version of the schedule that
# may already be outdated.
_logger.info(
"This is a cached schedule response, but we already have notifications in place, so"
" there's no need to fallback to the cached schedule for notifications."
)
return False

# Only update if the hash of the newly fetched schedule is
# different from the hash of the schedule that was used to
# schedule the notifications.
return self._previous_schedule_hash != api_response.schedule.schedule_hash

def __len__(self) -> int:
"""Return the number of scheduled notifications."""
return len(self._scheduler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ async def fetch_session(self, code: str) -> europython.Session:
"""
session = self._session_repository.get(code)
if session.url is None or session.experience is None:
session.url, session.experience = await self._api_client.fetch_session_details(code)
try:
session.url, session.experience = await self._api_client.fetch_session_details(code)
except Exception:
_logger.exception("Fetching addition session details failed!")

session.livestream_url = self._get_livestream_url(session)
session.discord_channel_id = self._get_discord_channel_id(session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ def schedule_tasks_at(self, *coroutines: Coroutine[None, None, None], at: arrow.

def cancel_all(self) -> None:
"""Cancel all scheduled tasks."""
_logger.info("Cancelling all tasks...")
i = 0
for task in self._tasks:
task.cancel()
i += 1
_logger.info("Cancelled %r tasks.", i)

def _schedule_task_at(self, coro: Coroutine[None, None, None], at: arrow.Arrow) -> asyncio.Task:
"""Schedule a task at the specified datetime.
Expand Down
23 changes: 17 additions & 6 deletions tests/programme_notifications/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,42 @@
@pytest.fixture
def bytes_from_data_file(request: pytest.FixtureRequest) -> bytes:
"""Return bytes from a test _data file for a parameterized test."""
return _get_datafile(getattr(request, "param"))
return _get_data_file(getattr(request, "param"))


@pytest.fixture
def get_bytes_from_data_file() -> Callable[[str], bytes]:
"""Allow tests to retrieve bytes from test _data files."""
return _get_datafile
return _get_data_file


def _get_datafile(filename: str) -> bytes:
@pytest.fixture
def get_data_file_path() -> Callable[[str], pathlib.Path]:
"""Get the path to a datafile."""
return _get_data_file_path


def _get_data_file(filename: str) -> bytes:
"""Get a _data file from the test _data directory."""
return (_DATA_DIR / filename).read_bytes()
return _get_data_file_path(filename).read_bytes()


def _get_data_file_path(filename: str) -> pathlib.Path:
"""Get the path to a datafile."""
return _DATA_DIR / filename


@pytest.fixture
def pretalx_response_stub() -> bytes:
"""Get a pretalx response stub with an actual cached response."""
return _get_datafile("pretalx_schedule_response_20230701.testdata.json")
return _get_data_file("pretalx_schedule_response_20230701.testdata.json")


@pytest.fixture
def europython_response_stub(request: pytest.FixtureRequest) -> bytes:
"""Get a pretalx response stub with an actual cached response."""
identifier = getattr(request, "param", "session_response_20230702")
return _get_datafile(f"europython_{identifier}.testdata.json")
return _get_data_file(f"europython_{identifier}.testdata.json")


@pytest.fixture
Expand Down
Loading

0 comments on commit fcf377c

Please sign in to comment.