diff --git a/cve_bin_tool/cvedb.py b/cve_bin_tool/cvedb.py index e8e4fde342..0ffd045cf3 100644 --- a/cve_bin_tool/cvedb.py +++ b/cve_bin_tool/cvedb.py @@ -11,6 +11,7 @@ import datetime import json import logging +import re import shutil import sqlite3 import tempfile @@ -643,9 +644,11 @@ def metric_finder(self, cursor, cve): metric = "unknown" else: cursor.execute(query, [cve.get("CVSS_version")]) - # Fetch all the results of the query and use 'map' to extract only the 'metrics_name' from the result + # Fetch all the results of the query and use 'map' to extract only the + # 'metrics_name' from the result metric = list(map(lambda x: x[0], cursor.fetchall())) - # Since the query is expected to return a single result, extract the first item from the list and store it in 'metric' + # Since the query is expected to return a single result, extract the first + # item from the list and store it in 'metric' metric = metric[0] return metric @@ -752,6 +755,32 @@ def update_vendors(self, cve_data): return updated_severity, updated_affected + async def cpe2vendors(self, cpe_string): + match = re.match(r"cpe:2\.3:[aho]:(.*?):", cpe_string) + if match: + yield match.group(1) + + async def find_vendor_from_purl(self, purl): + param1 = f"pkg:{purl.type}/{purl.name}" + param2 = f"pkg:{purl.type}/%/{purl.name}" + + query = """ + SELECT cpe FROM purl2cpe WHERE purl LIKE ? + UNION + SELECT cpe FROM purl2cpe WHERE purl LIKE ? + """ + try: + vendors = set() + async with self.get_db() as db: + async with db.execute(query, (param1, param2)) as cursor: + for row in await cursor.fetchall(): + async for vendor in self.cpe2vendors(row[0]): + vendors.add(vendor) + return vendors + except Exception as err: + LOGGER.opt(exception=err).debug("Unable to access purl2cpe database.") + return set() + def db_open_and_get_cursor(self) -> sqlite3.Cursor: """Opens connection to sqlite database, returns cursor object.""" diff --git a/cve_bin_tool/sbom_manager/parse.py b/cve_bin_tool/sbom_manager/parse.py index 2827bae083..8069ad8cfb 100644 --- a/cve_bin_tool/sbom_manager/parse.py +++ b/cve_bin_tool/sbom_manager/parse.py @@ -392,26 +392,28 @@ def parse_ext_ref(self, ext_ref) -> (str | None, str | None, str | None): LOGGER.debug("Nothing found") return [None, None, None] - def decode_purl(self, purl) -> (str | None, str | None, str | None): + async def decode_purl(self, purl) -> (str | None, str | None, str | None): """ - Decode a Package URL (purl) to extract version information. + Decode a Package URL (purl) to extract vendor, product, and version information. Args: - purl (str): Package URL (purl) string. Returns: - - Tuple[str | None, str | None, str | None]: A tuple containing the vendor (which is always None for purl), - product, and version information extracted from the purl string, or None if the purl is invalid or incomplete. - + - Tuple[str | None, str | None, str | None]: A tuple containing vendor, product, and version extracted + from the purl string. """ - vendor = None # Because the vendor and product identifiers in the purl don't always align - product = None # with the CVE DB, only the version is parsed. - version = None - # Process purl identifier purl_info = PackageURL.from_string(purl).to_dict() version = purl_info.get("version") + product = purl_info.get("name") + + vendorlist, success = self.cvedb.find_vendor_from_purl( + PackageURL.from_string(purl), version + ) + + vendor = vendorlist[0].product_info.vendor if success and vendorlist else None - return [vendor or None, product or None, version or None] + return [vendor, product, version] if __name__ == "__main__":