Skip to content

Commit

Permalink
Add Celery as an executor and refactor threadng logic
Browse files Browse the repository at this point in the history
- Implement Celery for task execution
- Refactor code to improve thread handling and structure
  • Loading branch information
Mohammad Torkashvand committed Nov 11, 2024
1 parent 64fb491 commit cb6aaff
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 145 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/run-unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ jobs:
FLIT_ROOT_INSTALL: 1
- name: Run Unit tests
run: pytest --cov-branch --cov=lso --cov-report=xml
env:
SETTINGS_FILENAME: dummy.json
- name: "Upload coverage to Codecov"
uses: codecov/codecov-action@v3
with:
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ services:
lso:
image: my-lso:latest
environment:
SETTINGS_FILENAME: /app/config.json
ANSIBLE_ROLES_PATH: /app/lso/ansible_roles
volumes:
- "/home/user/config.json:/app/config.json:ro"
Expand Down Expand Up @@ -80,5 +79,5 @@ As an alternative, below are a set of instructions for installing and running LS
* Run the app like this (`app.py` starts the server on port 44444):

```bash
SETTINGS_FILENAME=/absolute/path/to/config.json python -m lso.app
python -m lso.app
```
3 changes: 0 additions & 3 deletions config.json.example

This file was deleted.

24 changes: 24 additions & 0 deletions env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Environment configuration for LSO application

# General settings
LSO_ENV_PREFIX=LSO_

# Ansible configuration
LSO_ANSIBLE_PLAYBOOKS_ROOT_DIR=/path/to/ansible/playbooks

# Executor configuration
LSO_EXECUTOR=threadpool # Options: "threadpool", "celery"
LSO_MAX_THREAD_POOL_WORKERS=10

# Request settings
LSO_DEFAULT_REQUEST_TIMEOUT_SEC=10

# Celery configuration
LSO_CELERY_BROKER_URL=redis://localhost:6379/0
LSO_CELERY_RESULT_BACKEND=redis://localhost:6379/0
LSO_CELERY_TIMEZONE=Europe/Amsterdam
LSO_CELERY_ENABLE_UTC=True
LSO_CELERY_RESULT_EXPIRES=3600

# Debug/Testing
LSO_TESTING=False
10 changes: 2 additions & 8 deletions lso/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from lso import config, environment
from lso import environment
from lso.routes.default import router as default_router
from lso.routes.playbook import router as playbook_router


def create_app() -> FastAPI:
"""Override default settings with those found in the file read from environment variable `SETTINGS_FILENAME`.
:return: a new flask app instance
"""
"""Initialise the :term:`LSO` app."""
app = FastAPI()

app.add_middleware(
Expand All @@ -39,9 +36,6 @@ def create_app() -> FastAPI:
app.include_router(default_router, prefix="/api")
app.include_router(playbook_router, prefix="/api/playbook")

# test that configuration parameters are loaded and available
config.load()

environment.setup_logging()

logging.info("FastAPI app initialized")
Expand Down
67 changes: 26 additions & 41 deletions lso/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,60 +11,45 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""A module for loading configuration data, including a configuration schema that data is validated against.
"""Module for loading and managing configuration settings for the LSO app.
Data is loaded from a file, the location of which may be specified when using :func:`load_from_file`.
Configuration file location can also be loaded from environment variable ``$SETTINGS_FILENAME``, which is default
behaviour in :func:`load`.
Uses `pydantic`'s `BaseSettings` to load settings from environment variables.
"""

import json
import logging
import os
from pathlib import Path
from enum import Enum

import jsonschema
from pydantic import BaseModel
from pydantic_settings import BaseSettings

CONFIG_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {"ansible_playbooks_root_dir": {"type": "string"}},
"required": ["ansible_playbooks_root_dir"],
"additionalProperties": False,
}
DEFAULT_REQUEST_TIMEOUT = 10
logger = logging.getLogger(__name__)


class Config(BaseModel):
"""Simple Configuration class.
class ExecutorType(Enum):
"""Enum representing the types of executors available for task execution."""

Contains the root directory at which Ansible playbooks are present.
"""
WORKER = "celery"
THREADPOOL = "threadpool"

ansible_playbooks_root_dir: str

class Config(BaseSettings):
"""The set of parameters required for running :term:`LSO`."""

def load_from_file(file: Path) -> Config:
"""Load, validate and return configuration parameters.
TESTING: bool = False
ANSIBLE_PLAYBOOKS_ROOT_DIR: str = "/path/to/ansible/playbooks"
EXECUTOR: ExecutorType = ExecutorType.THREADPOOL
MAX_THREAD_POOL_WORKERS: int = min(32, (os.cpu_count() or 1) + 4)
DEFAULT_REQUEST_TIMEOUT_SEC: int = 10
CELERY_BROKER_URL: str = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0"
CELERY_TIMEZONE: str = "Europe/Amsterdam"
CELERY_ENABLE_UTC: bool = True
CELERY_RESULT_EXPIRES: int = 3600

Input is validated against this JSON schema:
class Config:
"""Sets the prefix for environment variables used in the main Config class."""

.. asjson:: lso.config.CONFIG_SCHEMA
env_prefix = os.environ.get("LSO_ENV_PREFIX", "LSO_")

:param file: :class:`Path` object that produces the configuration file.
:return: a dict containing the parsed configuration parameters.
"""
config = json.loads(file.read_text())
jsonschema.validate(config, CONFIG_SCHEMA)
return Config(**config)


def load() -> Config:
"""Load a configuration file, located at the path specified in the environment variable ``$SETTINGS_FILENAME``.
Loading and validating the file is performed by :func:`load_from_file`.
:return: a dict containing the parsed configuration parameters
"""
assert "SETTINGS_FILENAME" in os.environ, "Environment variable SETTINGS_FILENAME not set" # noqa: S101
return load_from_file(Path(os.environ["SETTINGS_FILENAME"]))
settings = Config()
61 changes: 43 additions & 18 deletions lso/playbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,42 @@
"""Module that gathers common API responses and data models."""

import logging
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any

import ansible_runner
import requests
from celery import shared_task
from fastapi import status
from fastapi.responses import JSONResponse
from pydantic import HttpUrl

from lso import config
from lso.config import DEFAULT_REQUEST_TIMEOUT
from lso.config import ExecutorType, settings

logger = logging.getLogger(__name__)

_lso_executor = None


def get_thread_pool() -> ThreadPoolExecutor:
"""Get and optionally initialise a ThreadPoolExecutor.
Returns:
ThreadPoolExecutor
"""
global _lso_executor # noqa: PLW0603
if _lso_executor is None:
_lso_executor = ThreadPoolExecutor(max_workers=settings.MAX_THREAD_POOL_WORKERS)

return _lso_executor


def get_playbook_path(playbook_name: str) -> Path:
"""Get the path of a playbook on the local filesystem."""
config_params = config.load()
return Path(config_params.ansible_playbooks_root_dir) / playbook_name
return Path(settings.ANSIBLE_PLAYBOOKS_ROOT_DIR) / playbook_name


def playbook_launch_success(job_id: str) -> JSONResponse:
Expand All @@ -57,7 +72,7 @@ def playbook_launch_error(reason: str, status_code: int = status.HTTP_400_BAD_RE


def _run_playbook_proc(
job_id: str, playbook_path: str, extra_vars: dict, inventory: dict[str, Any] | str, callback: str
job_id: str, playbook_path: str, extra_vars: dict, inventory: dict[str, Any] | str, callback: HttpUrl
) -> None:
"""Run a playbook, internal function.
Expand All @@ -76,12 +91,28 @@ def _run_playbook_proc(
"return_code": int(ansible_playbook_run.rc),
}

request_result = requests.post(callback, json=payload, timeout=DEFAULT_REQUEST_TIMEOUT)
request_result = requests.post(str(callback), json=payload, timeout=settings.DEFAULT_REQUEST_TIMEOUT_SEC)
if not status.HTTP_200_OK <= request_result.status_code < status.HTTP_300_MULTIPLE_CHOICES:
msg = f"Callback failed: {request_result.text}"
logger.error(msg)


@shared_task # type: ignore[misc]
def run_playbook_proc_task(
job_id: str, playbook_path: str, extra_vars: dict[str, Any], inventory: dict[str, Any] | str, callback: HttpUrl
) -> None:
"""Celery task to run a playbook.
:param str job_id: Identifier of the job being executed.
:param str playbook_path: Path to the playbook to be executed.
:param dict[str, Any] extra_vars: Extra variables to pass to the playbook.
:param dict[str, Any] | str inventory: Inventory to run the playbook against.
:param HttpUrl callback: Callback URL for status updates.
:return: None
"""
_run_playbook_proc(job_id, playbook_path, extra_vars, inventory, callback)


def run_playbook(
playbook_path: Path,
extra_vars: dict[str, Any],
Expand All @@ -107,16 +138,10 @@ def run_playbook(
return playbook_launch_error(reason=msg, status_code=status.HTTP_400_BAD_REQUEST)

job_id = str(uuid.uuid4())
thread = threading.Thread(
target=_run_playbook_proc,
kwargs={
"job_id": job_id,
"playbook_path": str(playbook_path),
"inventory": inventory,
"extra_vars": extra_vars,
"callback": callback,
},
)
thread.start()
if settings.EXECUTOR == ExecutorType.THREADPOOL:
executor = get_thread_pool()
executor.submit(_run_playbook_proc, job_id, str(playbook_path), extra_vars, inventory, callback)
elif settings.EXECUTOR == ExecutorType.WORKER:
run_playbook_proc_task.delay(job_id, str(playbook_path), extra_vars, inventory, callback)

return playbook_launch_success(job_id=job_id)
2 changes: 1 addition & 1 deletion lso/routes/playbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class PlaybookRunParams(BaseModel):
"""Parameters for executing an Ansible playbook."""

#: The filename of a playbook that's executed. It should be present inside the directory defined in the
#: configuration option ``ansible_playbooks_root_dir``.
#: configuration option ``ANSIBLE_PLAYBOOKS_ROOT_DIR``.
playbook_name: str
#: The address where LSO should call back to upon completion.
callback: HttpUrl
Expand Down
34 changes: 34 additions & 0 deletions lso/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Module that sets up :term:`LSO` as a Celery worker."""

from celery import Celery
from celery.signals import worker_shutting_down

from lso.config import settings

celery = Celery(
"lso-worker",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND,
include=[
"lso.schedules.task_vacuum",
],
)

if settings.TESTING:
celery.conf.update(backend=settings.CELERY_RESULT_BACKEND, task_ignore_result=False)
else:
celery.conf.update(task_ignore_result=True)

celery.conf.update(
result_expires=settings.CELERY_RESULT_EXPIRES,
worker_prefetch_multiplier=1,
worker_send_task_event=True,
task_send_sent_event=True,
redbeat_redis_url=settings.CELERY_BROKER_URL,
)


@worker_shutting_down.connect # type: ignore[misc]
def worker_shutting_down_handler(sig, how, exitcode, **kwargs) -> None: # type: ignore[no-untyped-def] # noqa: ARG001
"""Handle the Celery worker shutdown event."""
celery.close()
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ dependencies = [
"ansible~=10.5.0",
"fastapi~=0.115.2",
"httpx~=0.27.0",
"jsonschema~=4.21.1",
"uvicorn[standard]~=0.32.0",
"requests~=2.31.0"
"requests~=2.31.0",
"pydantic-settings~=2.5.2",
"celery~=5.3.6",
"redis==5.0.3",
]

readme = "README.md"
Expand Down
Loading

0 comments on commit cb6aaff

Please sign in to comment.