From bff4e5fb2410b54dc14bc3cb70a7ea798842cc8b Mon Sep 17 00:00:00 2001 From: Patrik Segedy Date: Thu, 9 Jan 2025 17:15:41 +0100 Subject: [PATCH] fix: add recalc admin api RHINENG-15130 --- conf/manager-admin.env | 3 ++ deploy/clowdapp.yaml | 6 +++ docker-compose.yml | 1 + manager.admin.spec.yaml | 52 ++++++++++++++++++ manager/admin.py | 1 + manager/admin_handler.py | 111 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 174 insertions(+) create mode 100644 conf/manager-admin.env diff --git a/conf/manager-admin.env b/conf/manager-admin.env new file mode 100644 index 000000000..a2d53395c --- /dev/null +++ b/conf/manager-admin.env @@ -0,0 +1,3 @@ +MESSAGE_TOPIC=vulnerability.evaluator.recalc +RE_EVALUATION_KAFKA_BATCH_SIZE=10000 +RE_EVALUATION_KAFKA_BATCHES=10 diff --git a/deploy/clowdapp.yaml b/deploy/clowdapp.yaml index d167be29e..7700a4e27 100644 --- a/deploy/clowdapp.yaml +++ b/deploy/clowdapp.yaml @@ -146,6 +146,12 @@ objects: value: ${ENV_NAME} - name: UNLEASH_BOOTSTRAP_FILE value: ${UNLEASH_BOOTSTRAP_FILE} + - name: MESSAGE_TOPIC + value: vulnerability.evaluator.recalc + - name: RE_EVALUATION_KAFKA_BATCH_SIZE + value: '10000' + - name: RE_EVALUATION_KAFKA_BATCHES + value: '10' resources: limits: cpu: ${CPU_LIMIT_MANAGER_ADMIN} diff --git a/docker-compose.yml b/docker-compose.yml index c7fb40872..d480a58ce 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -59,6 +59,7 @@ services: - ./conf/common.env - ./conf/manager_base.env - ./conf/database.env + - ./conf/manager-admin.env ports: - 8400:8000 depends_on: diff --git a/manager.admin.spec.yaml b/manager.admin.spec.yaml index 9f2ac0253..3d8b148e8 100644 --- a/manager.admin.spec.yaml +++ b/manager.admin.spec.yaml @@ -507,6 +507,58 @@ paths: 200: description: Partition was truncated + /recalc/accounts: + put: + summary: Trigger recalc of all systems in specified account org_ids. + description: Trigger recalc of all systems in specified account org_ids. Admin interface, available only to admin users. + operationId: manager.admin_handler.RecalcAccounts.put + x-methodName: recalcAccounts + security: + - ApiKeyAuthAdmin: [] + requestBody: + description: Account org_ids to be re-evaluated. + required: true + content: + application/vnd.api+json: + schema: + type: object + properties: + org_ids: + type: array + items: + type: string + description: List of org_ids to be recalculated. + example: ['123456', '654321'] + responses: + 200: + description: Systems from account are scheduled for recalculation. + + /recalc/systems: + put: + summary: Trigger recalc of specified inventory_ids. + description: Trigger recalc of specified inventory_ids. Admin interface, available only to admin users. + operationId: manager.admin_handler.RecalcSystems.put + x-methodName: recalcSystems + security: + - ApiKeyAuthAdmin: [] + requestBody: + description: System inventory_ids to be re-evaluated. + required: true + content: + application/vnd.api+json: + schema: + type: object + properties: + inventory_ids: + type: array + items: + type: string + description: List of inventory_ids to be recalculated. + example: ['INV-ID-0000-1234', 'INV-ID-0000-5678'] + responses: + 200: + description: Systems are scheduled for recalculation. + components: parameters: inventory_id: diff --git a/manager/admin.py b/manager/admin.py index 64d70a7e5..86d6d1e3e 100755 --- a/manager/admin.py +++ b/manager/admin.py @@ -13,6 +13,7 @@ init_logging(num_servers=CFG.gunicorn_workers) + # gunicorn expects an object called "application" hence the pylint disable application = create_app({CFG.default_route: "manager.admin.spec.yaml", # pylint: disable=invalid-name "": "manager.healthz.spec.yaml"}) diff --git a/manager/admin_handler.py b/manager/admin_handler.py index 98d8da7ad..51033e71c 100644 --- a/manager/admin_handler.py +++ b/manager/admin_handler.py @@ -1,14 +1,19 @@ """ Module for admin API endpoints """ +import asyncio import json import subprocess +from datetime import datetime +from datetime import timezone import requests from connexion import context from peewee import DatabaseError +from common import mqueue from common.config import Config +from common.constants import EvaluatorMessageType from common.logging import get_logger from common.peewee_model import DB from common.peewee_model import Announcement @@ -28,6 +33,9 @@ CFG = Config() LOGGER = get_logger(__name__) +EVENT_LOOP = asyncio.new_event_loop() +EVALUATOR_QUEUE = mqueue.MQWriter(CFG.evaluator_recalc_topic, loop=EVENT_LOOP) +BATCH_SEMAPHORE = asyncio.BoundedSemaphore(CFG.re_evaluation_kafka_batches) class GetMissingInInventory(GetRequest): @@ -463,3 +471,106 @@ def handle_delete(cls, **kwargs): LOGGER.error("Internal server error: %s", exc) return "Error", 500 return "Ok", 200 + + +class RecalcBase: + + @classmethod + def _create_event(cls): + with DB.transaction(): + with DB.cursor() as cur: + cur.execute("""insert into recalc_event (changed_packages) values ('{}') returning id""") + ret = cur.fetchone() + if not ret: + raise None + return ret[0] + + +class RecalcAccounts(PutRequest, RecalcBase): + """PUT to /v1/recalc/accounts""" + + _endpoint_name = r"/v1/recalc/accounts" + + @classmethod + def handle_put(cls, **kwargs): + """Trigger recalc of all systems in specified account org_ids.""" + loop = EVENT_LOOP + total_scheduled = 0 + LOGGER.info("kwargs: %s", kwargs) + accounts = kwargs.get("body", {}).get("org_ids", []) + if not accounts: + return "No org_ids provided", 400 + + event_id = cls._create_event() + if not event_id: + return "Error creating recalc_event", 500 + + with DB.cursor() as cur: + cur.execute(""" + SELECT inventory_id, org_id + FROM system_platform sp JOIN rh_account acc ON sp.rh_account_id = acc.id + WHERE org_id = ANY(%s)""", (accounts,)) + while True: + loop.run_until_complete(BATCH_SEMAPHORE.acquire()) + rows = cur.fetchmany(size=CFG.re_evaluation_kafka_batch_size) + if not rows: + BATCH_SEMAPHORE.release() + break + msgs = [ + { + "type": EvaluatorMessageType.RE_EVALUATE_SYSTEM, + "host": {"id": str(inventory_id), "org_id": org_id}, + "timestamp": str(datetime.now(timezone.utc)), + "recalc_event_id": event_id, + } + for inventory_id, org_id in rows + ] + total_scheduled += len(msgs) + task = EVALUATOR_QUEUE.send_list(msgs, loop=loop) + task.add_done_callback(lambda x: BATCH_SEMAPHORE.release()) + loop.run_until_complete(task) + + return f"{total_scheduled} systems scheduled for re-evaluation", 200 + + +class RecalcSystems(PutRequest, RecalcBase): + """PUT to /v1/recalc/systems""" + + _endpoint_name = r"/v1/recalc/systems" + + @classmethod + def handle_put(cls, **kwargs): + """Trigger recalc of specified inventory_ids.""" + loop = EVENT_LOOP + total_scheduled = 0 + inventory_ids = kwargs.get("body", {}).get("inventory_ids", []) + if not inventory_ids: + return "No inventory_ids provided", 400 + + event_id = cls._create_event() + if not event_id: + return "Error creating recalc_event", 500 + + with DB.cursor() as cur: + cur.execute(""" + SELECT inventory_id, org_id + FROM system_platform sp JOIN rh_account acc ON sp.rh_account_id = acc.id + WHERE inventory_id = ANY(%s::uuid[])""", (inventory_ids,)) + rows = cur.fetchall() + if not rows: + return f"{total_scheduled} systems scheduled for re-evaluation", 200 + + msgs = [ + { + "type": EvaluatorMessageType.RE_EVALUATE_SYSTEM, + "host": {"id": str(inventory_id), "org_id": org_id}, + "timestamp": str(datetime.now(timezone.utc)), + "recalc_event_id": event_id, + } + for inventory_id, org_id in rows + ] + total_scheduled += len(msgs) + task = EVALUATOR_QUEUE.send_list(msgs, loop=loop) + loop.run_until_complete(task) + + return f"{total_scheduled} systems scheduled for re-evaluation", 200