Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Celery as an executor and refactor threading logic #41

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.0.3
current_version = 2.0.0
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(rc(?P<build>\d+))?
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/run-unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ jobs:
FLIT_ROOT_INSTALL: 1
- name: Run Unit tests
run: pytest --cov-branch --cov=lso --cov-report=xml
env:
SETTINGS_FILENAME: dummy.json
- name: "Upload coverage to Codecov"
uses: codecov/codecov-action@v3
with:
Expand Down
33 changes: 26 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,18 @@ Use the Docker image to then spin up an environment. An example Docker compose f
services:
lso:
image: my-lso:latest
environment:
SETTINGS_FILENAME: /app/config.json
ANSIBLE_ROLES_PATH: /app/lso/ansible_roles
env_file:
.env # Load default environment variables from the .env file
volumes:
- "/home/user/config.json:/app/config.json:ro"
- "/home/user/ansible_inventory:/opt/ansible_inventory:ro"
- "~/.ssh/id_ed25519.pub:/root/.ssh/id_ed25519.pub:ro"
- "~/.ssh/id_ed25519:/root/.ssh/id_ed25519:ro"
```

This will expose the API on port 8000. The container requires some more files to be mounted:

* A `config.json` that references to the location where the Ansible playbooks are stored **inside the container**.
* An .env file: Sets default environment variables, like ANSIBLE_PLAYBOOKS_ROOT_DIR for the location of Ansible playbooks **inside the container**.
* Environment variables: Specific configurations, such as ANSIBLE_ROLES_PATH, can be directly set in the environment section. This is ideal for values you may want to override without modifying the .env file.
* An Ansible inventory for all host and group variables that are used in the playbooks
* A public/private key pair for SSH authentication on external machines that are targeted by Ansible playbooks.
* Any Ansible-specific configuration (such as `collections_path`, `roles_path`, etc.) should be set using
Expand Down Expand Up @@ -75,10 +74,30 @@ As an alternative, below are a set of instructions for installing and running LS

### Running the app

* Create a settings file, see `config.json.example` for an example.
* Set required environment variables; see `env.example` for reference.
* If necessary, set the environment variable `ANSIBLE_HOME` to a custom path.
* Run the app like this (`app.py` starts the server on port 44444):

```bash
SETTINGS_FILENAME=/absolute/path/to/config.json python -m lso.app
source .env && python -m lso.app
```

### Task Execution Options
1. Celery (Distributed Execution)

- For distributed task execution, set `EXECUTOR=celery`.
- Add Celery config in your environment variables:

```bash
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0
WORKER_QUEUE_NAME=lso-worker-queue # default value is None so you don't need this by default.
```
- Start a Celery worker:

```bash
celery -A lso.worker worker --loglevel=info -Q lso-worker-queue
```
2. ThreadPoolExecutor (Local Execution)

For local concurrent tasks, set `EXECUTOR=threadpool` and configure `MAX_THREAD_POOL_WORKERS`.
3 changes: 0 additions & 3 deletions config.json.example

This file was deleted.

23 changes: 23 additions & 0 deletions env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Environment configuration for LSO application

# Ansible configuration
ANSIBLE_PLAYBOOKS_ROOT_DIR="/path/to/ansible/playbooks"
ANSIBLE_ROLES_PATH="/app/lso/ansible_roles" # Set specific Ansible roles path

# Executor configuration
EXECUTOR="threadpool" # Options: "threadpool", "celery"
MAX_THREAD_POOL_WORKERS=10

# Request settings
REQUEST_TIMEOUT_SEC=10

# Celery configuration
CELERY_BROKER_URL="redis://localhost:6379/0"
CELERY_RESULT_BACKEND="redis://localhost:6379/0"
CELERY_TIMEZONE="Europe/Amsterdam"
CELERY_ENABLE_UTC=True
CELERY_RESULT_EXPIRES=3600
WORKER_QUEUE_NAME="lso-worker-queue"

# Debug/Testing
TESTING=False
12 changes: 3 additions & 9 deletions lso/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from lso import config, environment
from lso import environment
from lso.routes.default import router as default_router
from lso.routes.playbook import router as playbook_router


def create_app() -> FastAPI:
"""Override default settings with those found in the file read from environment variable `SETTINGS_FILENAME`.

:return: a new flask app instance
"""
app = FastAPI()
"""Initialise the :term:`LSO` app."""
app = FastAPI(docs_url="/api/doc", redoc_url="/api/redoc", openapi_url="/api/openapi.json")

app.add_middleware(
CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]
Expand All @@ -39,9 +36,6 @@ def create_app() -> FastAPI:
app.include_router(default_router, prefix="/api")
app.include_router(playbook_router, prefix="/api/playbook")

# test that configuration parameters are loaded and available
config.load()

environment.setup_logging()

logging.info("FastAPI app initialized")
Expand Down
66 changes: 22 additions & 44 deletions lso/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,60 +11,38 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""A module for loading configuration data, including a configuration schema that data is validated against.
"""Module for loading and managing configuration settings for the LSO app.

Data is loaded from a file, the location of which may be specified when using :func:`load_from_file`.
Configuration file location can also be loaded from environment variable ``$SETTINGS_FILENAME``, which is default
behaviour in :func:`load`.
Uses `pydantic`'s `BaseSettings` to load settings from environment variables.
"""

import json
import os
from pathlib import Path
from enum import Enum

import jsonschema
from pydantic import BaseModel
from pydantic_settings import BaseSettings

CONFIG_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {"ansible_playbooks_root_dir": {"type": "string"}},
"required": ["ansible_playbooks_root_dir"],
"additionalProperties": False,
}
DEFAULT_REQUEST_TIMEOUT = 10

class ExecutorType(Enum):
"""Enum representing the types of executors available for task execution."""

class Config(BaseModel):
"""Simple Configuration class.
WORKER = "celery"
THREADPOOL = "threadpool"

Contains the root directory at which Ansible playbooks are present.
"""

ansible_playbooks_root_dir: str
class Config(BaseSettings):
"""The set of parameters required for running :term:`LSO`."""

TESTING: bool = False
ANSIBLE_PLAYBOOKS_ROOT_DIR: str = "/path/to/ansible/playbooks"
EXECUTOR: ExecutorType = ExecutorType.THREADPOOL
MAX_THREAD_POOL_WORKERS: int = min(32, (os.cpu_count() or 1) + 4)
kvklink marked this conversation as resolved.
Show resolved Hide resolved
REQUEST_TIMEOUT_SEC: int = 10
CELERY_BROKER_URL: str = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0"
CELERY_TIMEZONE: str = "Europe/Amsterdam"
CELERY_ENABLE_UTC: bool = True
CELERY_RESULT_EXPIRES: int = 3600
WORKER_QUEUE_NAME: str | None = None

def load_from_file(file: Path) -> Config:
"""Load, validate and return configuration parameters.

Input is validated against this JSON schema:

.. asjson:: lso.config.CONFIG_SCHEMA

:param file: :class:`Path` object that produces the configuration file.
:return: a dict containing the parsed configuration parameters.
"""
config = json.loads(file.read_text())
jsonschema.validate(config, CONFIG_SCHEMA)
return Config(**config)


def load() -> Config:
"""Load a configuration file, located at the path specified in the environment variable ``$SETTINGS_FILENAME``.

Loading and validating the file is performed by :func:`load_from_file`.

:return: a dict containing the parsed configuration parameters
"""
assert "SETTINGS_FILENAME" in os.environ, "Environment variable SETTINGS_FILENAME not set" # noqa: S101
return load_from_file(Path(os.environ["SETTINGS_FILENAME"]))
settings = Config()
104 changes: 29 additions & 75 deletions lso/playbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,81 +13,44 @@

"""Module that gathers common API responses and data models."""

import logging
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any

import ansible_runner
import requests
from fastapi import status
from fastapi.responses import JSONResponse
from pydantic import HttpUrl

from lso import config
from lso.config import DEFAULT_REQUEST_TIMEOUT
from lso.config import ExecutorType, settings
from lso.tasks import run_playbook_proc_task

logger = logging.getLogger(__name__)
_executor = None


def get_playbook_path(playbook_name: str) -> Path:
"""Get the path of a playbook on the local filesystem."""
config_params = config.load()
return Path(config_params.ansible_playbooks_root_dir) / playbook_name


def playbook_launch_success(job_id: str) -> JSONResponse:
"""Return a :class:`PlaybookLaunchResponse` for the successful start of a playbook execution.

:return JSONResponse: A playbook launch response that's successful.
"""
return JSONResponse(content={"job_id": job_id}, status_code=status.HTTP_201_CREATED)
def get_thread_pool() -> ThreadPoolExecutor:
"""Get and optionally initialise a ThreadPoolExecutor.

Returns:
ThreadPoolExecutor

def playbook_launch_error(reason: str, status_code: int = status.HTTP_400_BAD_REQUEST) -> JSONResponse:
"""Return a :class:`PlaybookLaunchResponse` for the erroneous start of a playbook execution.

:param str reason: The reason why a request has failed.
:param status status_code: The HTTP status code that should be associated with this request. Defaults to HTTP 400:
Bad request.
:return JSONResponse: A playbook launch response that's unsuccessful.
"""
return JSONResponse(content={"error": reason}, status_code=status_code)

global _executor # noqa: PLW0603
if _executor is None:
_executor = ThreadPoolExecutor(max_workers=settings.MAX_THREAD_POOL_WORKERS)

def _run_playbook_proc(
job_id: str, playbook_path: str, extra_vars: dict, inventory: dict[str, Any] | str, callback: str
) -> None:
"""Run a playbook, internal function.
return _executor

:param str job_id: Identifier of the job that's executed.
:param str playbook_path: Ansible playbook to be executed.
:param dict extra_vars: Extra variables passed to the Ansible playbook.
:param str callback: Callback URL to return output to when execution is completed.
:param dict[str, Any] | str inventory: Ansible inventory to run the playbook against.
"""
ansible_playbook_run = ansible_runner.run(playbook=playbook_path, inventory=inventory, extravars=extra_vars)

payload = {
"status": ansible_playbook_run.status,
"job_id": job_id,
"output": ansible_playbook_run.stdout.readlines(),
"return_code": int(ansible_playbook_run.rc),
}

request_result = requests.post(callback, json=payload, timeout=DEFAULT_REQUEST_TIMEOUT)
if not status.HTTP_200_OK <= request_result.status_code < status.HTTP_300_MULTIPLE_CHOICES:
msg = f"Callback failed: {request_result.text}"
logger.error(msg)
def get_playbook_path(playbook_name: Path) -> Path:
"""Get the path of a playbook on the local filesystem."""
return Path(settings.ANSIBLE_PLAYBOOKS_ROOT_DIR) / playbook_name


def run_playbook(
playbook_path: Path,
extra_vars: dict[str, Any],
inventory: dict[str, Any] | str,
callback: HttpUrl,
) -> JSONResponse:
) -> uuid.UUID:
"""Run an Ansible playbook against a specified inventory.

:param Path playbook_path: playbook to be executed.
Expand All @@ -98,25 +61,16 @@ def run_playbook(
:return: Result of playbook launch, this could either be successful or unsuccessful.
:rtype: :class:`fastapi.responses.JSONResponse`
"""
if not Path.exists(playbook_path):
msg = f"Filename '{playbook_path}' does not exist."
return playbook_launch_error(reason=msg, status_code=status.HTTP_404_NOT_FOUND)

if not ansible_runner.utils.isinventory(inventory):
msg = "Invalid inventory provided. Should be a string, or JSON object."
return playbook_launch_error(reason=msg, status_code=status.HTTP_400_BAD_REQUEST)

job_id = str(uuid.uuid4())
thread = threading.Thread(
target=_run_playbook_proc,
kwargs={
"job_id": job_id,
"playbook_path": str(playbook_path),
"inventory": inventory,
"extra_vars": extra_vars,
"callback": callback,
},
)
thread.start()

return playbook_launch_success(job_id=job_id)
job_id = uuid.uuid4()
if settings.EXECUTOR == ExecutorType.THREADPOOL:
executor = get_thread_pool()
executor_handle = executor.submit(
run_playbook_proc_task, str(job_id), str(playbook_path), extra_vars, inventory, str(callback)
)
if settings.TESTING:
executor_handle.result()

elif settings.EXECUTOR == ExecutorType.WORKER:
run_playbook_proc_task.delay(str(job_id), str(playbook_path), extra_vars, inventory, str(callback))

return job_id
Loading
Loading