Skip to content

Commit

Permalink
feat: add recalc admin api
Browse files Browse the repository at this point in the history
RHINENG-15130
  • Loading branch information
psegedy committed Jan 14, 2025
1 parent a14acd4 commit fd09b4c
Showing 7 changed files with 151 additions and 3 deletions.
3 changes: 3 additions & 0 deletions conf/reevaluation.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
MESSAGE_TOPIC=vulnerability.evaluator.recalc
RE_EVALUATION_KAFKA_BATCH_SIZE=10000
RE_EVALUATION_KAFKA_BATCHES=10
3 changes: 0 additions & 3 deletions conf/vmaas-sync.env
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
POSTGRES_USER=ve_db_user_vmaas_sync
POSTGRES_PASSWORD=ve_db_user_vmaas_sync_pwd
MESSAGE_TOPIC=vulnerability.evaluator.recalc
PROMETHEUS_PORT=8087
ENABLE_RE_EVALUATION=YES
RE_EVALUATION_KAFKA_BATCH_SIZE=10000
RE_EVALUATION_KAFKA_BATCHES=10
DEFAULT_PAGE_SIZE=5000
DEFAULT_REPO_PAGE_SIZE=200
6 changes: 6 additions & 0 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
@@ -146,6 +146,12 @@ objects:
value: ${ENV_NAME}
- name: UNLEASH_BOOTSTRAP_FILE
value: ${UNLEASH_BOOTSTRAP_FILE}
- name: MESSAGE_TOPIC
value: vulnerability.evaluator.recalc
- name: RE_EVALUATION_KAFKA_BATCH_SIZE
value: '10000'
- name: RE_EVALUATION_KAFKA_BATCHES
value: '10'
resources:
limits:
cpu: ${CPU_LIMIT_MANAGER_ADMIN}
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -59,6 +59,7 @@ services:
- ./conf/common.env
- ./conf/manager_base.env
- ./conf/database.env
- ./conf/reevaluation.env
ports:
- 8400:8000
depends_on:
@@ -90,6 +91,7 @@ services:
env_file:
- ./conf/common.env
- ./conf/vmaas-sync.env
- ./conf/reevaluation.env
networks:
- default
- vmaas_default
52 changes: 52 additions & 0 deletions manager.admin.spec.yaml
Original file line number Diff line number Diff line change
@@ -507,6 +507,58 @@ paths:
200:
description: Partition was truncated

/recalc/accounts:
put:
summary: Trigger recalc of all systems in specified account org_ids.
description: Trigger recalc of all systems in specified account org_ids. Admin interface, available only to admin users.
operationId: manager.admin_handler.RecalcAccounts.put
x-methodName: recalcAccounts
security:
- ApiKeyAuthAdmin: []
requestBody:
description: Account org_ids to be re-evaluated.
required: true
content:
application/vnd.api+json:
schema:
type: object
properties:
org_ids:
type: array
items:
type: string
description: List of org_ids to be recalculated.
example: ['123456', '654321']
responses:
200:
description: Systems from account are scheduled for recalculation.

/recalc/systems:
put:
summary: Trigger recalc of specified inventory_ids.
description: Trigger recalc of specified inventory_ids. Admin interface, available only to admin users.
operationId: manager.admin_handler.RecalcSystems.put
x-methodName: recalcSystems
security:
- ApiKeyAuthAdmin: []
requestBody:
description: System inventory_ids to be re-evaluated.
required: true
content:
application/vnd.api+json:
schema:
type: object
properties:
inventory_ids:
type: array
items:
type: string
description: List of inventory_ids to be recalculated.
example: ['INV-ID-0000-1234', 'INV-ID-0000-5678']
responses:
200:
description: Systems are scheduled for recalculation.

components:
parameters:
inventory_id:
1 change: 1 addition & 0 deletions manager/admin.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@


init_logging(num_servers=CFG.gunicorn_workers)

# gunicorn expects an object called "application" hence the pylint disable
application = create_app({CFG.default_route: "manager.admin.spec.yaml", # pylint: disable=invalid-name
"": "manager.healthz.spec.yaml"})
87 changes: 87 additions & 0 deletions manager/admin_handler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
"""
Module for admin API endpoints
"""
import asyncio
import json
import subprocess
from datetime import datetime
from datetime import timezone

import requests
from connexion import context
from peewee import DatabaseError

from common import mqueue
from common.config import Config
from common.constants import EvaluatorMessageType
from common.logging import get_logger
from common.peewee_model import DB
from common.peewee_model import Announcement
@@ -28,6 +33,9 @@
CFG = Config()

LOGGER = get_logger(__name__)
EVENT_LOOP = asyncio.new_event_loop()
EVALUATOR_QUEUE = mqueue.MQWriter(CFG.evaluator_recalc_topic, loop=EVENT_LOOP)
BATCH_SEMAPHORE = asyncio.BoundedSemaphore(CFG.re_evaluation_kafka_batches)


class GetMissingInInventory(GetRequest):
@@ -463,3 +471,82 @@ def handle_delete(cls, **kwargs):
LOGGER.error("Internal server error: %s", exc)
return "Error", 500
return "Ok", 200


class RecalcBase:

@classmethod
def _create_kafka_msg_task(cls, rows, loop):
msgs = [
{
"type": EvaluatorMessageType.RE_EVALUATE_SYSTEM,
"host": {"id": str(inventory_id), "org_id": org_id},
"timestamp": str(datetime.now(timezone.utc)),
}
for inventory_id, org_id in rows
]
task = EVALUATOR_QUEUE.send_list(msgs, loop=loop)
return task, len(msgs)


class RecalcAccounts(PutRequest, RecalcBase):
"""PUT to /v1/recalc/accounts"""

_endpoint_name = r"/v1/recalc/accounts"

@classmethod
def handle_put(cls, **kwargs):
"""Trigger recalc of all systems in specified account org_ids."""
loop = EVENT_LOOP
total_scheduled = 0
LOGGER.info("kwargs: %s", kwargs)
accounts = kwargs.get("body", {}).get("org_ids", [])
if not accounts:
return "No org_ids provided", 400

with DB.cursor() as cur:
cur.execute("""
SELECT inventory_id, org_id
FROM system_platform sp JOIN rh_account acc ON sp.rh_account_id = acc.id
WHERE org_id = ANY(%s)""", (accounts,))
while True:
loop.run_until_complete(BATCH_SEMAPHORE.acquire())
rows = cur.fetchmany(size=CFG.re_evaluation_kafka_batch_size)
if not rows:
BATCH_SEMAPHORE.release()
break
task, msg_count = cls._create_kafka_msg_task(rows, loop)
total_scheduled += msg_count
task.add_done_callback(lambda x: BATCH_SEMAPHORE.release())
loop.run_until_complete(task)

return f"{total_scheduled} systems scheduled for re-evaluation", 200


class RecalcSystems(PutRequest, RecalcBase):
"""PUT to /v1/recalc/systems"""

_endpoint_name = r"/v1/recalc/systems"

@classmethod
def handle_put(cls, **kwargs):
"""Trigger recalc of specified inventory_ids."""
loop = EVENT_LOOP
total_scheduled = 0
inventory_ids = kwargs.get("body", {}).get("inventory_ids", [])
if not inventory_ids:
return "No inventory_ids provided", 400

with DB.cursor() as cur:
cur.execute("""
SELECT inventory_id, org_id
FROM system_platform sp JOIN rh_account acc ON sp.rh_account_id = acc.id
WHERE inventory_id = ANY(%s::uuid[])""", (inventory_ids,))
rows = cur.fetchall()
if not rows:
return f"{total_scheduled} systems scheduled for re-evaluation", 200

task, total_scheduled = cls._create_kafka_msg_task(rows, loop)
loop.run_until_complete(task)

return f"{total_scheduled} systems scheduled for re-evaluation", 200

0 comments on commit fd09b4c

Please sign in to comment.