Skip to content

Commit

Permalink
Add Celery as an executor and refactor threadng logic
Browse files Browse the repository at this point in the history
- Implement Celery for task execution
- Refactor code to improve thread handling and structure
  • Loading branch information
Mohammad Torkashvand committed Nov 13, 2024
1 parent 64fb491 commit 12895fe
Show file tree
Hide file tree
Showing 16 changed files with 336 additions and 199 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.0.3
current_version = 2.0.0
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(rc(?P<build>\d+))?
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/run-unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ jobs:
FLIT_ROOT_INSTALL: 1
- name: Run Unit tests
run: pytest --cov-branch --cov=lso --cov-report=xml
env:
SETTINGS_FILENAME: dummy.json
- name: "Upload coverage to Codecov"
uses: codecov/codecov-action@v3
with:
Expand Down
33 changes: 27 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,20 @@ Use the Docker image to then spin up an environment. An example Docker compose f
services:
lso:
image: my-lso:latest
env_file:
.env # Load default environment variables from the .env file
environment:
SETTINGS_FILENAME: /app/config.json
ANSIBLE_ROLES_PATH: /app/lso/ansible_roles
ANSIBLE_ROLES_PATH: /app/lso/ansible_roles # Set specific Ansible roles path
volumes:
- "/home/user/config.json:/app/config.json:ro"
- "/home/user/ansible_inventory:/opt/ansible_inventory:ro"
- "~/.ssh/id_ed25519.pub:/root/.ssh/id_ed25519.pub:ro"
- "~/.ssh/id_ed25519:/root/.ssh/id_ed25519:ro"
```
This will expose the API on port 8000. The container requires some more files to be mounted:
* A `config.json` that references to the location where the Ansible playbooks are stored **inside the container**.
* An .env file: Sets default environment variables, like ANSIBLE_PLAYBOOKS_ROOT_DIR for the location of Ansible playbooks **inside the container**.
* Environment variables: Specific configurations, such as ANSIBLE_ROLES_PATH, can be directly set in the environment section. This is ideal for values you may want to override without modifying the .env file.
* An Ansible inventory for all host and group variables that are used in the playbooks
* A public/private key pair for SSH authentication on external machines that are targeted by Ansible playbooks.
* Any Ansible-specific configuration (such as `collections_path`, `roles_path`, etc.) should be set using
Expand Down Expand Up @@ -75,10 +76,30 @@ As an alternative, below are a set of instructions for installing and running LS

### Running the app

* Create a settings file, see `config.json.example` for an example.
* Set required environment variables; see `env.example` for reference.
* If necessary, set the environment variable `ANSIBLE_HOME` to a custom path.
* Run the app like this (`app.py` starts the server on port 44444):

```bash
SETTINGS_FILENAME=/absolute/path/to/config.json python -m lso.app
source .env && python -m lso.app
```

### Task Execution Options
1. Celery (Distributed Execution)

- For distributed task execution, set `EXECUTOR=celery`.
- Add Celery config in your environment variables:

```bash
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0
WORKER_QUEUE_NAME=lso-worker-queue # default value is None so you don't need this by default.
```
- Start a Celery worker:

```bash
celery -A lso.worker worker --loglevel=info -Q lso-worker-queue
```
2. ThreadPoolExecutor (Local Execution)

For local concurrent tasks, set `EXECUTOR=threadpool` and configure `MAX_THREAD_POOL_WORKERS`.
3 changes: 0 additions & 3 deletions config.json.example

This file was deleted.

22 changes: 22 additions & 0 deletions env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Environment configuration for LSO application

# Ansible configuration
ANSIBLE_PLAYBOOKS_ROOT_DIR=/path/to/ansible/playbooks

# Executor configuration
EXECUTOR=threadpool # Options: "threadpool", "celery"
MAX_THREAD_POOL_WORKERS=10

# Request settings
REQUEST_TIMEOUT_SEC=10

# Celery configuration
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0
CELERY_TIMEZONE=Europe/Amsterdam
CELERY_ENABLE_UTC=True
CELERY_RESULT_EXPIRES=3600
WORKER_QUEUE_NAME=lso-worker-queue

# Debug/Testing
TESTING=False
10 changes: 2 additions & 8 deletions lso/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from lso import config, environment
from lso import environment
from lso.routes.default import router as default_router
from lso.routes.playbook import router as playbook_router


def create_app() -> FastAPI:
"""Override default settings with those found in the file read from environment variable `SETTINGS_FILENAME`.
:return: a new flask app instance
"""
"""Initialise the :term:`LSO` app."""
app = FastAPI()

app.add_middleware(
Expand All @@ -39,9 +36,6 @@ def create_app() -> FastAPI:
app.include_router(default_router, prefix="/api")
app.include_router(playbook_router, prefix="/api/playbook")

# test that configuration parameters are loaded and available
config.load()

environment.setup_logging()

logging.info("FastAPI app initialized")
Expand Down
66 changes: 22 additions & 44 deletions lso/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,60 +11,38 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""A module for loading configuration data, including a configuration schema that data is validated against.
"""Module for loading and managing configuration settings for the LSO app.
Data is loaded from a file, the location of which may be specified when using :func:`load_from_file`.
Configuration file location can also be loaded from environment variable ``$SETTINGS_FILENAME``, which is default
behaviour in :func:`load`.
Uses `pydantic`'s `BaseSettings` to load settings from environment variables.
"""

import json
import os
from pathlib import Path
from enum import Enum

import jsonschema
from pydantic import BaseModel
from pydantic_settings import BaseSettings

CONFIG_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {"ansible_playbooks_root_dir": {"type": "string"}},
"required": ["ansible_playbooks_root_dir"],
"additionalProperties": False,
}
DEFAULT_REQUEST_TIMEOUT = 10

class ExecutorType(Enum):
"""Enum representing the types of executors available for task execution."""

class Config(BaseModel):
"""Simple Configuration class.
WORKER = "celery"
THREADPOOL = "threadpool"

Contains the root directory at which Ansible playbooks are present.
"""

ansible_playbooks_root_dir: str
class Config(BaseSettings):
"""The set of parameters required for running :term:`LSO`."""

TESTING: bool = False
ANSIBLE_PLAYBOOKS_ROOT_DIR: str = "/path/to/ansible/playbooks"
EXECUTOR: ExecutorType = ExecutorType.THREADPOOL
MAX_THREAD_POOL_WORKERS: int = min(32, (os.cpu_count() or 1) + 4)
REQUEST_TIMEOUT_SEC: int = 10
CELERY_BROKER_URL: str = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0"
CELERY_TIMEZONE: str = "Europe/Amsterdam"
CELERY_ENABLE_UTC: bool = True
CELERY_RESULT_EXPIRES: int = 3600
WORKER_QUEUE_NAME: str | None = None

def load_from_file(file: Path) -> Config:
"""Load, validate and return configuration parameters.

Input is validated against this JSON schema:
.. asjson:: lso.config.CONFIG_SCHEMA
:param file: :class:`Path` object that produces the configuration file.
:return: a dict containing the parsed configuration parameters.
"""
config = json.loads(file.read_text())
jsonschema.validate(config, CONFIG_SCHEMA)
return Config(**config)


def load() -> Config:
"""Load a configuration file, located at the path specified in the environment variable ``$SETTINGS_FILENAME``.
Loading and validating the file is performed by :func:`load_from_file`.
:return: a dict containing the parsed configuration parameters
"""
assert "SETTINGS_FILENAME" in os.environ, "Environment variable SETTINGS_FILENAME not set" # noqa: S101
return load_from_file(Path(os.environ["SETTINGS_FILENAME"]))
settings = Config()
74 changes: 29 additions & 45 deletions lso/playbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,39 @@

"""Module that gathers common API responses and data models."""

import logging
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any

import ansible_runner
import requests
from fastapi import status
from fastapi.responses import JSONResponse
from pydantic import HttpUrl

from lso import config
from lso.config import DEFAULT_REQUEST_TIMEOUT
from lso.config import ExecutorType, settings
from lso.tasks import run_playbook_proc_task

logger = logging.getLogger(__name__)
_executor = None


def get_thread_pool() -> ThreadPoolExecutor:
"""Get and optionally initialise a ThreadPoolExecutor.
Returns:
ThreadPoolExecutor
"""
global _executor # noqa: PLW0603
if _executor is None:
_executor = ThreadPoolExecutor(max_workers=settings.MAX_THREAD_POOL_WORKERS)

return _executor


def get_playbook_path(playbook_name: str) -> Path:
"""Get the path of a playbook on the local filesystem."""
config_params = config.load()
return Path(config_params.ansible_playbooks_root_dir) / playbook_name
return Path(settings.ANSIBLE_PLAYBOOKS_ROOT_DIR) / playbook_name


def playbook_launch_success(job_id: str) -> JSONResponse:
Expand All @@ -56,32 +67,6 @@ def playbook_launch_error(reason: str, status_code: int = status.HTTP_400_BAD_RE
return JSONResponse(content={"error": reason}, status_code=status_code)


def _run_playbook_proc(
job_id: str, playbook_path: str, extra_vars: dict, inventory: dict[str, Any] | str, callback: str
) -> None:
"""Run a playbook, internal function.
:param str job_id: Identifier of the job that's executed.
:param str playbook_path: Ansible playbook to be executed.
:param dict extra_vars: Extra variables passed to the Ansible playbook.
:param str callback: Callback URL to return output to when execution is completed.
:param dict[str, Any] | str inventory: Ansible inventory to run the playbook against.
"""
ansible_playbook_run = ansible_runner.run(playbook=playbook_path, inventory=inventory, extravars=extra_vars)

payload = {
"status": ansible_playbook_run.status,
"job_id": job_id,
"output": ansible_playbook_run.stdout.readlines(),
"return_code": int(ansible_playbook_run.rc),
}

request_result = requests.post(callback, json=payload, timeout=DEFAULT_REQUEST_TIMEOUT)
if not status.HTTP_200_OK <= request_result.status_code < status.HTTP_300_MULTIPLE_CHOICES:
msg = f"Callback failed: {request_result.text}"
logger.error(msg)


def run_playbook(
playbook_path: Path,
extra_vars: dict[str, Any],
Expand All @@ -107,16 +92,15 @@ def run_playbook(
return playbook_launch_error(reason=msg, status_code=status.HTTP_400_BAD_REQUEST)

job_id = str(uuid.uuid4())
thread = threading.Thread(
target=_run_playbook_proc,
kwargs={
"job_id": job_id,
"playbook_path": str(playbook_path),
"inventory": inventory,
"extra_vars": extra_vars,
"callback": callback,
},
)
thread.start()
if settings.EXECUTOR == ExecutorType.THREADPOOL:
executor = get_thread_pool()
executor_handle = executor.submit(
run_playbook_proc_task, job_id, str(playbook_path), extra_vars, inventory, str(callback)
)
if settings.TESTING:
executor_handle.result()

elif settings.EXECUTOR == ExecutorType.WORKER:
run_playbook_proc_task.delay(job_id, str(playbook_path), extra_vars, inventory, str(callback))

return playbook_launch_success(job_id=job_id)
2 changes: 1 addition & 1 deletion lso/routes/playbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class PlaybookRunParams(BaseModel):
"""Parameters for executing an Ansible playbook."""

#: The filename of a playbook that's executed. It should be present inside the directory defined in the
#: configuration option ``ansible_playbooks_root_dir``.
#: configuration option ``ANSIBLE_PLAYBOOKS_ROOT_DIR``.
playbook_name: str
#: The address where LSO should call back to upon completion.
callback: HttpUrl
Expand Down
60 changes: 60 additions & 0 deletions lso/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2023-2024 GÉANT Vereniging.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module defines tasks for executing Ansible playbooks asynchronously using Celery.
The primary task, `run_playbook_proc_task`, runs an Ansible playbook and sends a POST request with
the results to a specified callback URL.
"""

import logging
from typing import Any

import ansible_runner
import requests
from starlette import status

from lso.config import settings
from lso.worker import RUN_PLAYBOOK, celery

logger = logging.getLogger(__name__)


@celery.task(name=RUN_PLAYBOOK) # type: ignore[misc]
def run_playbook_proc_task(
job_id: str, playbook_path: str, extra_vars: dict[str, Any], inventory: dict[str, Any] | str, callback: str
) -> None:
"""Celery task to run a playbook.
:param str job_id: Identifier of the job being executed.
:param str playbook_path: Path to the playbook to be executed.
:param dict[str, Any] extra_vars: Extra variables to pass to the playbook.
:param dict[str, Any] | str inventory: Inventory to run the playbook against.
:param HttpUrl callback: Callback URL for status updates.
:return: None
"""
msg = f"playbook_path: {playbook_path}"
logger.info(msg)
ansible_playbook_run = ansible_runner.run(playbook=playbook_path, inventory=inventory, extravars=extra_vars)

payload = {
"status": ansible_playbook_run.status,
"job_id": job_id,
"output": ansible_playbook_run.stdout.readlines(),
"return_code": int(ansible_playbook_run.rc),
}

request_result = requests.post(str(callback), json=payload, timeout=settings.REQUEST_TIMEOUT_SEC)
if not status.HTTP_200_OK <= request_result.status_code < status.HTTP_300_MULTIPLE_CHOICES:
msg = f"Callback failed: {request_result.text}, url: {callback}"
logger.error(msg)

Check warning on line 60 in lso/tasks.py

View check run for this annotation

Codecov / codecov/patch

lso/tasks.py#L59-L60

Added lines #L59 - L60 were not covered by tests
Loading

0 comments on commit 12895fe

Please sign in to comment.