Skip to content

Commit

Permalink
feat: Events: Add agent, in Docker
Browse files Browse the repository at this point in the history
Signed-off-by: Samuel Giffard <[email protected]>
  • Loading branch information
Mulugruntz committed Feb 18, 2024
1 parent 699c288 commit d94155d
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 1 deletion.
48 changes: 48 additions & 0 deletions dvalin-tools/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Build Stage
FROM python:3.11-alpine as builder
LABEL authors="Samuel Giffard <[email protected]>"

# Install build dependencies
RUN apk add --no-cache \
gcc \
musl-dev \
python3-dev \
libffi-dev \
openssl-dev \
cargo \
&& python -m pip install --upgrade pip

# Install Poetry
ENV POETRY_VERSION=1.7.1
RUN pip install "poetry==$POETRY_VERSION"

# Set the working directory
WORKDIR /usr/src/app

# Copy project files
COPY poetry.lock pyproject.toml ./

# Install dependencies using Poetry in a way that they can be copied later
RUN poetry config virtualenvs.create false \
&& poetry install --no-dev --no-interaction --no-ansi

# Copy the rest of the application code
COPY . .

# Runtime Stage
FROM python:3.11-alpine as runtime
LABEL authors="Samuel Giffard <[email protected]>"

# Create a non-root user
RUN adduser -D celeryuser
USER celeryuser

# Set the working directory
WORKDIR /usr/src/app

# Copy installed packages from the builder stage
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin

# Copy application code from the builder stage
COPY --from=builder --chown=celeryuser:celeryuser /usr/src/app .
25 changes: 25 additions & 0 deletions dvalin-tools/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
version: '3.8'

services:
event_agent:
build:
context: .
dockerfile: Dockerfile
restart: unless-stopped
command: celery -A dvalin_tools.agents.event_agent worker --loglevel=info -B
environment:
- DVALIN_CELERY_BROKER_URL=${DVALIN_CELERY_BROKER_URL}
- DVALIN_CELERY_RESULT_BACKEND=${DVALIN_CELERY_RESULT_BACKEND}
- DVALIN_S3_ENDPOINT=${DVALIN_S3_ENDPOINT}
- DVALIN_S3_ACCESS_KEY=${DVALIN_S3_ACCESS_KEY}
- DVALIN_S3_SECRET_KEY=${DVALIN_S3_SECRET_KEY}
depends_on:
- redis
volumes:
- ../data:/data
- ./__scraper_cache__:/usr/src/__scraper_cache__

redis:
image: "redis:alpine"
ports:
- "6379:6379"
Empty file.
96 changes: 96 additions & 0 deletions dvalin-tools/dvalin_tools/agents/event_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import asyncio
from pathlib import Path

from celery import Celery, chain
from celery.schedules import crontab

from dvalin_tools.lib.common import is_in_docker
from dvalin_tools.lib.constants import DATA_DIR
from dvalin_tools.lib.fs_lock import fs_lock
from dvalin_tools.lib.languages import LANGUAGE_CODE_TO_DIR, LanguageCode
from dvalin_tools.lib.settings import DvalinSettings
from dvalin_tools.models.common import Game
from dvalin_tools.models.events import EventFile, MessageType
from dvalin_tools.scrapers.events import (
get_all_events,
update_event_files,
write_events,
)

settings = DvalinSettings()

app = Celery(
"event_agent",
broker=settings.celery.broker_url,
backend=settings.celery.result_backend,
)
app.conf.broker_connection_retry_on_startup = True


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs) -> None:
sender.add_periodic_task(crontab(minute="*/1"), event_flow.s())


def get_latest_event_file(data_dir: Path) -> Path:
event_dir_en = data_dir / LANGUAGE_CODE_TO_DIR[LanguageCode.ENGLISH] / "Event"
# This dir contains YYYY/MM/YYYY-MM.json files
# We want the latest one (so, we sort)
return sorted(event_dir_en.glob("*/*/*.json"))[-1]


def get_last_event_post_id(data_dir: Path) -> int:
latest_event_file = get_latest_event_file(data_dir)
contents = latest_event_file.read_text(encoding="utf-8")

if not contents.strip():
raise ValueError(f"File {latest_event_file} is empty")

existing_events = EventFile.model_validate_json(contents)
latest_event = next(existing_events.iter_chronologically(reverse=True))

return int(latest_event.post_id)


@app.task
def event_flow() -> None:
chain(check_new_events.s(), process_new_events.s()).delay()


@app.task
def check_new_events() -> bool:
with fs_lock("check_new_events") as lock_acquired:
if not lock_acquired:
print("Another task already has the lock")
return False
print("Checking for new events")
return True


@app.task
def process_new_events(there_are_new_events: bool) -> None:
with fs_lock("process_new_events") as lock_acquired:
if not lock_acquired:
print("Another task already has the lock")
return

if there_are_new_events:
print("Processing new events")
asyncio.run(process_new_events_async())


async def process_new_events_async() -> None:
print("Processing new events async")
data_dir = Path("/data") if is_in_docker() else DATA_DIR
latest_post_id = get_last_event_post_id(data_dir)
events = await get_all_events(
Game.GENSHIN_IMPACT, MessageType.INFO, limit=25, stop_at_post_id=latest_post_id
)
print(f"Retrieved {len(events)} new events")
modified_event_files = write_events(events, data_dir)
await update_event_files(modified_event_files)
print("New events processed")


if __name__ == "__main__":
app.start()
49 changes: 49 additions & 0 deletions dvalin-tools/dvalin_tools/lib/fs_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""A simple FS-based lock for Celery tasks.
The goal is to prevent having the same tasks running concurrently, if one
task is already running, the next one should be skipped.
"""

from contextlib import contextmanager
from pathlib import Path
from tempfile import gettempdir


class FSLock:
FS_LOCK_DIR = Path(gettempdir()) / "dvalin_tools"

def __init__(self, lock_file_name: str) -> None:
self.FS_LOCK_DIR.mkdir(exist_ok=True)
self.lock_file = self.FS_LOCK_DIR / f"{lock_file_name}.lock"

def acquire(self) -> bool:
if self.lock_file.exists():
return False
self.lock_file.touch()
return True

def release(self) -> None:
self.lock_file.unlink()


@contextmanager
def fs_lock(lock_file_name: str):
lock = FSLock(lock_file_name)
lock_acquired = lock.acquire()
try:
yield lock_acquired
finally:
if lock_acquired:
lock.release()


if __name__ == "__main__":
with fs_lock("my_lock") as lock_acquired:
if lock_acquired:
print("Lock acquired")
with fs_lock("my_lock") as lock_acquired2:
if lock_acquired2:
print("Should not be printed")
else:
print("Lock already acquired")
print("Lock released")
5 changes: 4 additions & 1 deletion dvalin-tools/dvalin_tools/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ class EventFile(RootModel):
root: set[EventLocalized] = Field(default_factory=set)

def __iter__(self) -> Iterator[EventLocalized]:
return iter(sorted(self.root, key=lambda x: int(x.post_id)))
return self.iter_chronologically()

def iter_chronologically(self, reverse: bool = False) -> Iterator[EventLocalized]:
return iter(sorted(self.root, key=lambda x: int(x.post_id), reverse=reverse))

@model_serializer
def file_serialize(self) -> list[EventLocalized]:
Expand Down
2 changes: 2 additions & 0 deletions dvalin-tools/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pydantic = {extras = ["dotenv"], version = "^2.6.1"}
tqdm = "^4.66.1"
tenacity = "^8.2.3"
minio = "^7.2.3"
celery = {extras = ["redis"], version = "^5.3.6"}
pydantic-settings = "^2.1.0"


[tool.poetry.group.dev.dependencies]
Expand Down

0 comments on commit d94155d

Please sign in to comment.