Skip to content

Commit

Permalink
fix: add recalc admin api
Browse files Browse the repository at this point in the history
RHINENG-15130
  • Loading branch information
psegedy committed Jan 9, 2025
1 parent 32414d0 commit bff4e5f
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 0 deletions.
3 changes: 3 additions & 0 deletions conf/manager-admin.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
MESSAGE_TOPIC=vulnerability.evaluator.recalc
RE_EVALUATION_KAFKA_BATCH_SIZE=10000
RE_EVALUATION_KAFKA_BATCHES=10
6 changes: 6 additions & 0 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ objects:
value: ${ENV_NAME}
- name: UNLEASH_BOOTSTRAP_FILE
value: ${UNLEASH_BOOTSTRAP_FILE}
- name: MESSAGE_TOPIC
value: vulnerability.evaluator.recalc
- name: RE_EVALUATION_KAFKA_BATCH_SIZE
value: '10000'
- name: RE_EVALUATION_KAFKA_BATCHES
value: '10'
resources:
limits:
cpu: ${CPU_LIMIT_MANAGER_ADMIN}
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ services:
- ./conf/common.env
- ./conf/manager_base.env
- ./conf/database.env
- ./conf/manager-admin.env
ports:
- 8400:8000
depends_on:
Expand Down
52 changes: 52 additions & 0 deletions manager.admin.spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,58 @@ paths:
200:
description: Partition was truncated

/recalc/accounts:
put:
summary: Trigger recalc of all systems in specified account org_ids.
description: Trigger recalc of all systems in specified account org_ids. Admin interface, available only to admin users.
operationId: manager.admin_handler.RecalcAccounts.put
x-methodName: recalcAccounts
security:
- ApiKeyAuthAdmin: []
requestBody:
description: Account org_ids to be re-evaluated.
required: true
content:
application/vnd.api+json:
schema:
type: object
properties:
org_ids:
type: array
items:
type: string
description: List of org_ids to be recalculated.
example: ['123456', '654321']
responses:
200:
description: Systems from account are scheduled for recalculation.

/recalc/systems:
put:
summary: Trigger recalc of specified inventory_ids.
description: Trigger recalc of specified inventory_ids. Admin interface, available only to admin users.
operationId: manager.admin_handler.RecalcSystems.put
x-methodName: recalcSystems
security:
- ApiKeyAuthAdmin: []
requestBody:
description: System inventory_ids to be re-evaluated.
required: true
content:
application/vnd.api+json:
schema:
type: object
properties:
inventory_ids:
type: array
items:
type: string
description: List of inventory_ids to be recalculated.
example: ['INV-ID-0000-1234', 'INV-ID-0000-5678']
responses:
200:
description: Systems are scheduled for recalculation.

components:
parameters:
inventory_id:
Expand Down
1 change: 1 addition & 0 deletions manager/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@


init_logging(num_servers=CFG.gunicorn_workers)

# gunicorn expects an object called "application" hence the pylint disable
application = create_app({CFG.default_route: "manager.admin.spec.yaml", # pylint: disable=invalid-name
"": "manager.healthz.spec.yaml"})
111 changes: 111 additions & 0 deletions manager/admin_handler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
"""
Module for admin API endpoints
"""
import asyncio
import json
import subprocess
from datetime import datetime
from datetime import timezone

import requests
from connexion import context
from peewee import DatabaseError

from common import mqueue
from common.config import Config
from common.constants import EvaluatorMessageType
from common.logging import get_logger
from common.peewee_model import DB
from common.peewee_model import Announcement
Expand All @@ -28,6 +33,9 @@
CFG = Config()

LOGGER = get_logger(__name__)
EVENT_LOOP = asyncio.new_event_loop()
EVALUATOR_QUEUE = mqueue.MQWriter(CFG.evaluator_recalc_topic, loop=EVENT_LOOP)
BATCH_SEMAPHORE = asyncio.BoundedSemaphore(CFG.re_evaluation_kafka_batches)


class GetMissingInInventory(GetRequest):
Expand Down Expand Up @@ -463,3 +471,106 @@ def handle_delete(cls, **kwargs):
LOGGER.error("Internal server error: %s", exc)
return "Error", 500
return "Ok", 200


class RecalcBase:

@classmethod
def _create_event(cls):
with DB.transaction():
with DB.cursor() as cur:
cur.execute("""insert into recalc_event (changed_packages) values ('{}') returning id""")
ret = cur.fetchone()
if not ret:
raise None
return ret[0]


class RecalcAccounts(PutRequest, RecalcBase):
"""PUT to /v1/recalc/accounts"""

_endpoint_name = r"/v1/recalc/accounts"

@classmethod
def handle_put(cls, **kwargs):
"""Trigger recalc of all systems in specified account org_ids."""
loop = EVENT_LOOP
total_scheduled = 0
LOGGER.info("kwargs: %s", kwargs)
accounts = kwargs.get("body", {}).get("org_ids", [])
if not accounts:
return "No org_ids provided", 400

event_id = cls._create_event()
if not event_id:
return "Error creating recalc_event", 500

with DB.cursor() as cur:
cur.execute("""
SELECT inventory_id, org_id
FROM system_platform sp JOIN rh_account acc ON sp.rh_account_id = acc.id
WHERE org_id = ANY(%s)""", (accounts,))
while True:
loop.run_until_complete(BATCH_SEMAPHORE.acquire())
rows = cur.fetchmany(size=CFG.re_evaluation_kafka_batch_size)
if not rows:
BATCH_SEMAPHORE.release()
break
msgs = [
{
"type": EvaluatorMessageType.RE_EVALUATE_SYSTEM,
"host": {"id": str(inventory_id), "org_id": org_id},
"timestamp": str(datetime.now(timezone.utc)),
"recalc_event_id": event_id,
}
for inventory_id, org_id in rows
]
total_scheduled += len(msgs)
task = EVALUATOR_QUEUE.send_list(msgs, loop=loop)
task.add_done_callback(lambda x: BATCH_SEMAPHORE.release())
loop.run_until_complete(task)

return f"{total_scheduled} systems scheduled for re-evaluation", 200


class RecalcSystems(PutRequest, RecalcBase):
"""PUT to /v1/recalc/systems"""

_endpoint_name = r"/v1/recalc/systems"

@classmethod
def handle_put(cls, **kwargs):
"""Trigger recalc of specified inventory_ids."""
loop = EVENT_LOOP
total_scheduled = 0
inventory_ids = kwargs.get("body", {}).get("inventory_ids", [])
if not inventory_ids:
return "No inventory_ids provided", 400

event_id = cls._create_event()
if not event_id:
return "Error creating recalc_event", 500

with DB.cursor() as cur:
cur.execute("""
SELECT inventory_id, org_id
FROM system_platform sp JOIN rh_account acc ON sp.rh_account_id = acc.id
WHERE inventory_id = ANY(%s::uuid[])""", (inventory_ids,))
rows = cur.fetchall()
if not rows:
return f"{total_scheduled} systems scheduled for re-evaluation", 200

msgs = [
{
"type": EvaluatorMessageType.RE_EVALUATE_SYSTEM,
"host": {"id": str(inventory_id), "org_id": org_id},
"timestamp": str(datetime.now(timezone.utc)),
"recalc_event_id": event_id,
}
for inventory_id, org_id in rows
]
total_scheduled += len(msgs)
task = EVALUATOR_QUEUE.send_list(msgs, loop=loop)
loop.run_until_complete(task)

return f"{total_scheduled} systems scheduled for re-evaluation", 200

0 comments on commit bff4e5f

Please sign in to comment.