Skip to content

Commit

Permalink
Merge pull request #44 from dval-in/sgiffard/43-events-periodically-c…
Browse files Browse the repository at this point in the history
…heck-and-update-events

Events periodically check and update events
  • Loading branch information
Mulugruntz authored Feb 24, 2024
2 parents bcb665e + f59402e commit 6a4d149
Show file tree
Hide file tree
Showing 17 changed files with 668 additions and 41 deletions.
6 changes: 6 additions & 0 deletions dvalin-tools/.env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
DVALIN_S3_ENDPOINT=s3-api.dval.in
DVALIN_S3_ACCESS_KEY=your_dvalin_s3_access_key
DVALIN_S3_SECRET_KEY=your_dvalin_s3_secret_key
DVALIN_CELERY_BROKER_URL=redis://redis:6379/0
DVALIN_CELERY_RESULT_BACKEND=redis://redis:6379/0
DVALIN_GIT_USER_EMAIL=your_git_user_email
DVALIN_GIT_USER_NAME=your_git_user_name
DVALIN_GIT_GITHUB_USERNAME=your_github_username
DVALIN_GIT_PRIVATE_ACCESS_TOKEN=your_git_private_access_token
69 changes: 69 additions & 0 deletions dvalin-tools/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Build Stage
FROM python:3.11-alpine as builder
LABEL authors="Samuel Giffard <[email protected]>"

# Install build dependencies
RUN apk add --no-cache \
gcc \
musl-dev \
python3-dev \
libffi-dev \
openssl-dev \
cargo \
&& python -m pip install --upgrade pip

# Install Poetry
ENV POETRY_VERSION=1.7.1
RUN pip install "poetry==$POETRY_VERSION"

# Set the working directory
WORKDIR /usr/src/app

# Copy project files
COPY poetry.lock pyproject.toml ./

# Install dependencies using Poetry in a way that they can be copied later
RUN poetry config virtualenvs.create false \
&& poetry install --no-dev --no-interaction --no-ansi

# Copy the rest of the application code
COPY . .

# Runtime Stage (local development)
FROM python:3.11-alpine as runtime-localdev
LABEL authors="Samuel Giffard <[email protected]>"

# Create a non-root user
RUN adduser -D celeryuser
USER celeryuser

# Set the working directory
WORKDIR /usr/src/app

# Copy installed packages from the builder stage
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin

# Copy application code from the builder stage
COPY --from=builder --chown=celeryuser:celeryuser /usr/src/app/dvalin_tools ./dvalin_tools

# Runtime Stage (production)
FROM runtime-localdev as runtime-prod

ARG DVALIN_REPO_ROOT_DIR=${DVALIN_REPO_ROOT_DIR:-/usr/src/repo/dvalin-data}

USER root
# Install build dependencies
RUN apk fix && \
apk --no-cache --update add git git-lfs gpg less openssh patch && \
git lfs install

USER celeryuser

WORKDIR ${DVALIN_REPO_ROOT_DIR}

RUN git clone https://github.com/dval-in/dvalin-data.git ${DVALIN_REPO_ROOT_DIR}

WORKDIR /usr/src/app

COPY --chown=celeryuser:celeryuser __scraper_cache__ /usr/src/__scraper_cache__
25 changes: 24 additions & 1 deletion dvalin-tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
cp .env.example .env
```

## Usage
## Usage (manual)

- Run the tool:
- Windows:
Expand All @@ -32,3 +32,26 @@
```bash
dvalin-event-scraper update --mode=IMAGES_SAVE_TO_S3
```

## Usage (automation)

Use docker-compose for automation.

### Local development

The local dev mounts the current project into the container, so you can edit the code
and run the tool without rebuilding the container.
You can use the following command to run the tool in local-dev:

```bash
docker compose -f docker-compose.yml -f docker-compose.dev.yml -p dvalin-tools up -d
```

### Production

The production environment bases its data on the GitHub repository.
You can use the following command to run the tool in production:

```bash
docker compose -f docker-compose.yml up -p dvalin-tools up -d
```
13 changes: 13 additions & 0 deletions dvalin-tools/docker-compose.dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: '3.8'

services:
event_agent:
build:
context: .
dockerfile: Dockerfile
target: runtime-localdev
environment:
- DVALIN_REPO_ROOT_DIR=/usr/src/repo/dvalin-data
volumes:
- ..:/usr/src/repo/dvalin-data
- ./__scraper_cache__:/usr/src/app/__scraper_cache__
29 changes: 29 additions & 0 deletions dvalin-tools/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
version: '3.8'

services:
event_agent:
build:
context: .
dockerfile: Dockerfile
target: runtime-prod
restart: unless-stopped
command: celery -A dvalin_tools.agents.event_agent worker --loglevel=info -B
environment:
- DVALIN_CELERY_BROKER_URL=${DVALIN_CELERY_BROKER_URL}
- DVALIN_CELERY_RESULT_BACKEND=${DVALIN_CELERY_RESULT_BACKEND}
- DVALIN_S3_ENDPOINT=${DVALIN_S3_ENDPOINT}
- DVALIN_S3_ACCESS_KEY=${DVALIN_S3_ACCESS_KEY}
- DVALIN_S3_SECRET_KEY=${DVALIN_S3_SECRET_KEY}
- DVALIN_GIT_USER_EMAIL=${DVALIN_GIT_USER_EMAIL}
- DVALIN_GIT_USER_NAME=${DVALIN_GIT_USER_NAME}
- DVALIN_GIT_GITHUB_USERNAME=${DVALIN_GIT_GITHUB_USERNAME}
- DVALIN_GIT_PRIVATE_ACCESS_TOKEN=${DVALIN_GIT_PRIVATE_ACCESS_TOKEN}
- DVALIN_REPO_ROOT_DIR=/usr/src/repo/dvalin-data
depends_on:
- redis

redis:
image: "redis:alpine"
ports:
- "6379:6379"
restart: unless-stopped
Empty file.
107 changes: 107 additions & 0 deletions dvalin-tools/dvalin_tools/agents/event_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import asyncio
from pathlib import Path

from celery import Celery
from celery.schedules import crontab

from dvalin_tools.lib.fs_lock import fs_lock
from dvalin_tools.lib.languages import LANGUAGE_CODE_TO_DIR, LanguageCode
from dvalin_tools.lib.repository import (
initialize_git_repo,
persist_on_remote,
prepare_local_auto_branch,
)
from dvalin_tools.lib.settings import DvalinSettings
from dvalin_tools.models.common import Game
from dvalin_tools.models.events import EventFile, EventI18N, MessageType
from dvalin_tools.scrapers.events import (
get_all_events,
update_event_files,
write_events,
)

settings = DvalinSettings()

app = Celery(
"event_agent",
broker=settings.celery.broker_url,
backend=settings.celery.result_backend,
)
app.conf.broker_connection_retry_on_startup = True


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs) -> None:
initialize_git_repo(settings.repo_root_dir)
sender.add_periodic_task(crontab(minute="*/1"), process_new_events.s())


def get_latest_event_file(data_dir: Path) -> Path:
event_dir_en = data_dir / LANGUAGE_CODE_TO_DIR[LanguageCode.ENGLISH] / "Event"
# This dir contains YYYY/MM/YYYY-MM.json files
# We want the latest one (so, we sort)
return sorted(event_dir_en.glob("*/*/*.json"))[-1]


def get_last_event_post_id(data_dir: Path) -> int:
latest_event_file = get_latest_event_file(data_dir)
contents = latest_event_file.read_text(encoding="utf-8")

if not contents.strip():
raise ValueError(f"File {latest_event_file} is empty")

existing_events = EventFile.model_validate_json(contents)
latest_event = next(existing_events.iter_chronologically(reverse=True))

return int(latest_event.post_id)


@app.task
def process_new_events() -> None:
with fs_lock("process_new_events") as lock_acquired:
if not lock_acquired:
print("Another task already has the lock")
return

asyncio.run(process_new_events_async())


async def process_new_events_async() -> None:
print("Checking for new events...")
prepare_local_auto_branch(settings.repo_root_dir)
data_dir = settings.data_path
latest_post_id = get_last_event_post_id(data_dir)
events = await get_all_events(
Game.GENSHIN_IMPACT, MessageType.INFO, limit=25, stop_at_post_id=latest_post_id
)
if not events:
print("No new events")
return

print(f"Retrieved {len(events)} new events")
modified_event_files = write_events(events, data_dir)
await update_event_files(modified_event_files)
print("New events processed")
if modified_event_files:
persist_on_remote(
settings.repo_root_dir,
modified_event_files,
get_commit_message_body(events),
)


def get_commit_message_body(events: list[EventI18N]) -> str:
lines = ["Contains the following events:"]
for event in events:
lines.append(
f"* [{event.created_at:%Y-%m-%d %H:%M}] "
f"{event.subject} "
f"([{event.post_id}]({event.article_url}))"
)
lines.append("")

return "\n".join(lines)


if __name__ == "__main__":
app.start()
6 changes: 5 additions & 1 deletion dvalin-tools/dvalin_tools/lib/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from itertools import islice
from pathlib import PurePath
from pathlib import Path, PurePath

from httpx import URL, Headers

Expand Down Expand Up @@ -70,3 +70,7 @@ def determine_content_type(headers: Headers, url: URL) -> str:
return ENRICH_CONTENT_TYPE.get(
PurePath(url.path).suffix[1:], "application/octet-stream"
)


def is_in_docker() -> bool:
return Path("/.dockerenv").exists()
1 change: 0 additions & 1 deletion dvalin-tools/dvalin_tools/lib/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@

ROOT_DIR_DVALIN_TOOLS = Path(__file__).parent.parent.parent
ROOT_DIR_DVALIN_DATA = ROOT_DIR_DVALIN_TOOLS.parent
DATA_DIR = ROOT_DIR_DVALIN_DATA / "data"
49 changes: 49 additions & 0 deletions dvalin-tools/dvalin_tools/lib/fs_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""A simple FS-based lock for Celery tasks.
The goal is to prevent having the same tasks running concurrently, if one
task is already running, the next one should be skipped.
"""

from contextlib import contextmanager
from pathlib import Path
from tempfile import gettempdir


class FSLock:
FS_LOCK_DIR = Path(gettempdir()) / "dvalin_tools"

def __init__(self, lock_file_name: str) -> None:
self.FS_LOCK_DIR.mkdir(exist_ok=True)
self.lock_file = self.FS_LOCK_DIR / f"{lock_file_name}.lock"

def acquire(self) -> bool:
if self.lock_file.exists():
return False
self.lock_file.touch()
return True

def release(self) -> None:
self.lock_file.unlink()


@contextmanager
def fs_lock(lock_file_name: str):
lock = FSLock(lock_file_name)
lock_acquired = lock.acquire()
try:
yield lock_acquired
finally:
if lock_acquired:
lock.release()


if __name__ == "__main__":
with fs_lock("my_lock") as lock_acquired:
if lock_acquired:
print("Lock acquired")
with fs_lock("my_lock") as lock_acquired2:
if lock_acquired2:
print("Should not be printed")
else:
print("Lock already acquired")
print("Lock released")
Loading

0 comments on commit 6a4d149

Please sign in to comment.