From d94155d809540f2dceda30ccb5ac709cef84d213 Mon Sep 17 00:00:00 2001 From: Samuel Giffard Date: Fri, 16 Feb 2024 16:18:56 +0100 Subject: [PATCH] feat: Events: Add agent, in Docker Signed-off-by: Samuel Giffard --- dvalin-tools/Dockerfile | 48 ++++++++++ dvalin-tools/docker-compose.yml | 25 +++++ dvalin-tools/dvalin_tools/agents/__init__.py | 0 .../dvalin_tools/agents/event_agent.py | 96 +++++++++++++++++++ dvalin-tools/dvalin_tools/lib/fs_lock.py | 49 ++++++++++ dvalin-tools/dvalin_tools/models/events.py | 5 +- dvalin-tools/pyproject.toml | 2 + 7 files changed, 224 insertions(+), 1 deletion(-) create mode 100644 dvalin-tools/Dockerfile create mode 100644 dvalin-tools/docker-compose.yml create mode 100644 dvalin-tools/dvalin_tools/agents/__init__.py create mode 100644 dvalin-tools/dvalin_tools/agents/event_agent.py create mode 100644 dvalin-tools/dvalin_tools/lib/fs_lock.py diff --git a/dvalin-tools/Dockerfile b/dvalin-tools/Dockerfile new file mode 100644 index 00000000000..a4f33a5c0f6 --- /dev/null +++ b/dvalin-tools/Dockerfile @@ -0,0 +1,48 @@ +# Build Stage +FROM python:3.11-alpine as builder +LABEL authors="Samuel Giffard " + +# Install build dependencies +RUN apk add --no-cache \ + gcc \ + musl-dev \ + python3-dev \ + libffi-dev \ + openssl-dev \ + cargo \ + && python -m pip install --upgrade pip + +# Install Poetry +ENV POETRY_VERSION=1.7.1 +RUN pip install "poetry==$POETRY_VERSION" + +# Set the working directory +WORKDIR /usr/src/app + +# Copy project files +COPY poetry.lock pyproject.toml ./ + +# Install dependencies using Poetry in a way that they can be copied later +RUN poetry config virtualenvs.create false \ + && poetry install --no-dev --no-interaction --no-ansi + +# Copy the rest of the application code +COPY . . + +# Runtime Stage +FROM python:3.11-alpine as runtime +LABEL authors="Samuel Giffard " + +# Create a non-root user +RUN adduser -D celeryuser +USER celeryuser + +# Set the working directory +WORKDIR /usr/src/app + +# Copy installed packages from the builder stage +COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin + +# Copy application code from the builder stage +COPY --from=builder --chown=celeryuser:celeryuser /usr/src/app . diff --git a/dvalin-tools/docker-compose.yml b/dvalin-tools/docker-compose.yml new file mode 100644 index 00000000000..1ad875427e4 --- /dev/null +++ b/dvalin-tools/docker-compose.yml @@ -0,0 +1,25 @@ +version: '3.8' + +services: + event_agent: + build: + context: . + dockerfile: Dockerfile + restart: unless-stopped + command: celery -A dvalin_tools.agents.event_agent worker --loglevel=info -B + environment: + - DVALIN_CELERY_BROKER_URL=${DVALIN_CELERY_BROKER_URL} + - DVALIN_CELERY_RESULT_BACKEND=${DVALIN_CELERY_RESULT_BACKEND} + - DVALIN_S3_ENDPOINT=${DVALIN_S3_ENDPOINT} + - DVALIN_S3_ACCESS_KEY=${DVALIN_S3_ACCESS_KEY} + - DVALIN_S3_SECRET_KEY=${DVALIN_S3_SECRET_KEY} + depends_on: + - redis + volumes: + - ../data:/data + - ./__scraper_cache__:/usr/src/__scraper_cache__ + + redis: + image: "redis:alpine" + ports: + - "6379:6379" diff --git a/dvalin-tools/dvalin_tools/agents/__init__.py b/dvalin-tools/dvalin_tools/agents/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dvalin-tools/dvalin_tools/agents/event_agent.py b/dvalin-tools/dvalin_tools/agents/event_agent.py new file mode 100644 index 00000000000..6dad01b7971 --- /dev/null +++ b/dvalin-tools/dvalin_tools/agents/event_agent.py @@ -0,0 +1,96 @@ +import asyncio +from pathlib import Path + +from celery import Celery, chain +from celery.schedules import crontab + +from dvalin_tools.lib.common import is_in_docker +from dvalin_tools.lib.constants import DATA_DIR +from dvalin_tools.lib.fs_lock import fs_lock +from dvalin_tools.lib.languages import LANGUAGE_CODE_TO_DIR, LanguageCode +from dvalin_tools.lib.settings import DvalinSettings +from dvalin_tools.models.common import Game +from dvalin_tools.models.events import EventFile, MessageType +from dvalin_tools.scrapers.events import ( + get_all_events, + update_event_files, + write_events, +) + +settings = DvalinSettings() + +app = Celery( + "event_agent", + broker=settings.celery.broker_url, + backend=settings.celery.result_backend, +) +app.conf.broker_connection_retry_on_startup = True + + +@app.on_after_configure.connect +def setup_periodic_tasks(sender, **kwargs) -> None: + sender.add_periodic_task(crontab(minute="*/1"), event_flow.s()) + + +def get_latest_event_file(data_dir: Path) -> Path: + event_dir_en = data_dir / LANGUAGE_CODE_TO_DIR[LanguageCode.ENGLISH] / "Event" + # This dir contains YYYY/MM/YYYY-MM.json files + # We want the latest one (so, we sort) + return sorted(event_dir_en.glob("*/*/*.json"))[-1] + + +def get_last_event_post_id(data_dir: Path) -> int: + latest_event_file = get_latest_event_file(data_dir) + contents = latest_event_file.read_text(encoding="utf-8") + + if not contents.strip(): + raise ValueError(f"File {latest_event_file} is empty") + + existing_events = EventFile.model_validate_json(contents) + latest_event = next(existing_events.iter_chronologically(reverse=True)) + + return int(latest_event.post_id) + + +@app.task +def event_flow() -> None: + chain(check_new_events.s(), process_new_events.s()).delay() + + +@app.task +def check_new_events() -> bool: + with fs_lock("check_new_events") as lock_acquired: + if not lock_acquired: + print("Another task already has the lock") + return False + print("Checking for new events") + return True + + +@app.task +def process_new_events(there_are_new_events: bool) -> None: + with fs_lock("process_new_events") as lock_acquired: + if not lock_acquired: + print("Another task already has the lock") + return + + if there_are_new_events: + print("Processing new events") + asyncio.run(process_new_events_async()) + + +async def process_new_events_async() -> None: + print("Processing new events async") + data_dir = Path("/data") if is_in_docker() else DATA_DIR + latest_post_id = get_last_event_post_id(data_dir) + events = await get_all_events( + Game.GENSHIN_IMPACT, MessageType.INFO, limit=25, stop_at_post_id=latest_post_id + ) + print(f"Retrieved {len(events)} new events") + modified_event_files = write_events(events, data_dir) + await update_event_files(modified_event_files) + print("New events processed") + + +if __name__ == "__main__": + app.start() diff --git a/dvalin-tools/dvalin_tools/lib/fs_lock.py b/dvalin-tools/dvalin_tools/lib/fs_lock.py new file mode 100644 index 00000000000..e90dddba97b --- /dev/null +++ b/dvalin-tools/dvalin_tools/lib/fs_lock.py @@ -0,0 +1,49 @@ +"""A simple FS-based lock for Celery tasks. + +The goal is to prevent having the same tasks running concurrently, if one +task is already running, the next one should be skipped. +""" + +from contextlib import contextmanager +from pathlib import Path +from tempfile import gettempdir + + +class FSLock: + FS_LOCK_DIR = Path(gettempdir()) / "dvalin_tools" + + def __init__(self, lock_file_name: str) -> None: + self.FS_LOCK_DIR.mkdir(exist_ok=True) + self.lock_file = self.FS_LOCK_DIR / f"{lock_file_name}.lock" + + def acquire(self) -> bool: + if self.lock_file.exists(): + return False + self.lock_file.touch() + return True + + def release(self) -> None: + self.lock_file.unlink() + + +@contextmanager +def fs_lock(lock_file_name: str): + lock = FSLock(lock_file_name) + lock_acquired = lock.acquire() + try: + yield lock_acquired + finally: + if lock_acquired: + lock.release() + + +if __name__ == "__main__": + with fs_lock("my_lock") as lock_acquired: + if lock_acquired: + print("Lock acquired") + with fs_lock("my_lock") as lock_acquired2: + if lock_acquired2: + print("Should not be printed") + else: + print("Lock already acquired") + print("Lock released") diff --git a/dvalin-tools/dvalin_tools/models/events.py b/dvalin-tools/dvalin_tools/models/events.py index cd89a4ac7cc..80f15d3c4c4 100644 --- a/dvalin-tools/dvalin_tools/models/events.py +++ b/dvalin-tools/dvalin_tools/models/events.py @@ -171,7 +171,10 @@ class EventFile(RootModel): root: set[EventLocalized] = Field(default_factory=set) def __iter__(self) -> Iterator[EventLocalized]: - return iter(sorted(self.root, key=lambda x: int(x.post_id))) + return self.iter_chronologically() + + def iter_chronologically(self, reverse: bool = False) -> Iterator[EventLocalized]: + return iter(sorted(self.root, key=lambda x: int(x.post_id), reverse=reverse)) @model_serializer def file_serialize(self) -> list[EventLocalized]: diff --git a/dvalin-tools/pyproject.toml b/dvalin-tools/pyproject.toml index 977413d20a7..ce5a4aeb5e7 100644 --- a/dvalin-tools/pyproject.toml +++ b/dvalin-tools/pyproject.toml @@ -15,6 +15,8 @@ pydantic = {extras = ["dotenv"], version = "^2.6.1"} tqdm = "^4.66.1" tenacity = "^8.2.3" minio = "^7.2.3" +celery = {extras = ["redis"], version = "^5.3.6"} +pydantic-settings = "^2.1.0" [tool.poetry.group.dev.dependencies]