From 33f8e98f3a0494b1322cd520addd3dfd38ebbbca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Sas=C3=A1k?= Date: Fri, 24 Nov 2023 13:44:43 +0100 Subject: [PATCH] feat(evaluator): skip old kafka evaluation messages --- evaluator/common.py | 2 +- evaluator/evaluator.py | 4 +++- evaluator/processor.py | 17 ++++++++++++----- grouper/queue.py | 3 +++ vmaas_sync/vmaas_sync.py | 11 ++++++++++- 5 files changed, 29 insertions(+), 8 deletions(-) diff --git a/evaluator/common.py b/evaluator/common.py index d76d57f2f..9e14dc3f2 100644 --- a/evaluator/common.py +++ b/evaluator/common.py @@ -54,7 +54,7 @@ # cve coupled with its package_name and cpe, from vmaas CveUnpatched = namedtuple("Cve", ["cve", "package_name", "cpe"]) # system_platform row taken from DB -SystemPlatform = namedtuple("SystemPlatform", ["id", "inventory_id", "rh_account_id", "vmaas_json", "rule_results"]) +SystemPlatform = namedtuple("SystemPlatform", ["id", "inventory_id", "rh_account_id", "vmaas_json", "rule_results", "last_evaluation"]) # single vulnerability stored inside db VulnerabilityDB = namedtuple( "VulnerabilityDB", diff --git a/evaluator/evaluator.py b/evaluator/evaluator.py index ab9f8772e..bd5f91298 100644 --- a/evaluator/evaluator.py +++ b/evaluator/evaluator.py @@ -6,6 +6,7 @@ import signal from aiokafka import ConsumerRecord +from dateutil import parser from psycopg_pool.pool_async import AsyncConnectionPool from .common import CFG @@ -62,6 +63,7 @@ async def _consume_message(self, msg: ConsumerRecord): inventory_id = msg_dict["host"]["id"] org_id = msg_dict["host"]["org_id"] request_id = msg_dict.get("platform_metadata", {}).get("request_id") + timestamp = parser.parse(ts) if (ts := msg_dict.get("timestamp")) else None try: msg_type = EvaluatorMessageType(msg_dict.get("type")) @@ -69,7 +71,7 @@ async def _consume_message(self, msg: ConsumerRecord): LOGGER.error("received unknown message type: %s", msg_type) return - await self.processor.evaluate_system(inventory_id, org_id, request_id) + await self.processor.evaluate_system(inventory_id, org_id, request_id, timestamp) async def consume_message(self, msg: ConsumerRecord): """Consume message for evaluation, wrapper for semaphore""" diff --git a/evaluator/processor.py b/evaluator/processor.py index 0f5ca35db..32715f603 100644 --- a/evaluator/processor.py +++ b/evaluator/processor.py @@ -2,8 +2,10 @@ Processing logic of grouped evaluator message for single system """ import asyncio +from datetime import datetime from typing import Dict from typing import List +from typing import Optional from typing import Tuple from psycopg import AsyncConnection @@ -62,7 +64,7 @@ async def _lock_system(self, inventory_id: str, conn: AsyncConnection) -> System async with conn.cursor(row_factory=dict_row) as cur: await cur.execute( """ - SELECT id, rh_account_id, vmaas_json, rule_results + SELECT id, rh_account_id, vmaas_json, rule_results, last_evaluation FROM system_platform WHERE inventory_id = %s AND when_deleted IS NULL @@ -73,7 +75,9 @@ async def _lock_system(self, inventory_id: str, conn: AsyncConnection) -> System ) row = await cur.fetchone() if row: - return SystemPlatform(row["id"], inventory_id, row["rh_account_id"], row["vmaas_json"], row["rule_results"]) + return SystemPlatform( + row["id"], inventory_id, row["rh_account_id"], row["vmaas_json"], row["rule_results"], row["last_evaluation"] + ) return None async def _load_db_system_vulnerabilities(self, system_platform: SystemPlatform, conn: AsyncConnection) -> Dict[str, VulnerabilityDB]: @@ -216,7 +220,7 @@ async def _compare_sys_vulns( return to_insert, to_update, to_delete - async def _evaluate_system(self, inventory_id: str, org_id: str): + async def _evaluate_system(self, inventory_id: str, org_id: str, request_timestamp: Optional[datetime]): """Evaluate vulnerabilities for single system, and update DB""" async with self.db_pool.connection() as conn: async with conn.transaction(): @@ -230,6 +234,9 @@ async def _evaluate_system(self, inventory_id: str, org_id: str): "skipping evaluation, due to empty vmaas_json and rule_results, system: %s, org_id: %s", inventory_id, org_id ) return + if request_timestamp and system_platform.last_evaluation and request_timestamp < system_platform.last_evaluation: + LOGGER.info("skipping evaluation, kafka message is older than system was lastly evaluated") + return # start both task asynchronously to speed up sys_vuln_rows_db, sys_vuln_rows = await asyncio.gather( @@ -259,14 +266,14 @@ async def _evaluate_system(self, inventory_id: str, org_id: str): send_remediations_update(self.remediations_results, inventory_id, fixable_sys_vuln_rows) send_notifications(self.evaluator_results, new_system_vulns, [], [], system_platform.rh_account_id, org_id) - async def evaluate_system(self, inventory_id: str, org_id: str, request_id: str): + async def evaluate_system(self, inventory_id: str, org_id: str, request_id: str, request_timestamp: datetime): """Evaluate single system""" EVAL_COUNT.inc() msg = {"platform_metadata": {"request_id": request_id}, "host": {"org_id": org_id, "id": inventory_id}} try: with EVAL_TIME.time(): LOGGER.info("evaluating system: %s, org_id: %s", inventory_id, org_id) - await self._evaluate_system(inventory_id, org_id) + await self._evaluate_system(inventory_id, org_id, request_timestamp) except EvaluatorException as ex: LOGGER.error(str(ex)) send_msg_to_payload_tracker(self.payload_tracker, msg, "error", status_msg="evaluation failed", loop=self.loop) diff --git a/grouper/queue.py b/grouper/queue.py index 638f81ecf..b073c19a7 100644 --- a/grouper/queue.py +++ b/grouper/queue.py @@ -3,6 +3,8 @@ message to evaluator """ import asyncio +from datetime import datetime +from datetime import timezone from typing import Dict from .common import ADVISOR_QUEUE_SIZE @@ -151,6 +153,7 @@ async def _send_for_evaluation(self, item: QueueItem, org_id: str, inventory_id: "org_id": org_id, }, "platform_metadata": {"request_id": item.request_id}, + "timestamp": str(datetime.now(timezone.utc)), } if (not item.inventory_changed and not item.advisor_changed) and not CFG.disable_optimisation: UNCHANGED_SYSTEM.inc() diff --git a/vmaas_sync/vmaas_sync.py b/vmaas_sync/vmaas_sync.py index 3e7375559..80aae27e4 100644 --- a/vmaas_sync/vmaas_sync.py +++ b/vmaas_sync/vmaas_sync.py @@ -1,6 +1,8 @@ """VMaaS sync module.""" import asyncio import datetime as dt +from datetime import datetime +from datetime import timezone from typing import Any from typing import Dict from typing import Tuple @@ -300,7 +302,14 @@ def re_evaluate_systems(): if not rows: BATCH_SEMAPHORE.release() break - msgs = [{"type": "re-evaluate_system", "host": {"id": inventory_id, "org_id": org_id}} for inventory_id, _, org_id in rows] + msgs = [ + { + "type": "re-evaluate_system", + "host": {"id": inventory_id, "org_id": org_id}, + "timestamp": str(datetime.now(timezone.utc)), + } + for inventory_id, _, org_id in rows + ] total_scheduled += len(msgs) future = EVALUATOR_QUEUE.send_list(msgs, loop=loop) future.add_done_callback(lambda x: BATCH_SEMAPHORE.release())