-
Notifications
You must be signed in to change notification settings - Fork 44
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore: add gabi script collecting cve stats
- Loading branch information
1 parent
7f167ce
commit d9aca04
Showing
2 changed files
with
240 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,240 @@ | ||
#!/usr/bin/env python3 | ||
import os | ||
import subprocess | ||
import sys | ||
from datetime import date | ||
|
||
import requests | ||
import yaml | ||
|
||
KUBE_CONFIG_FILE = os.path.expanduser("~/.kube/config") | ||
|
||
GABI_CFG = {"url": "", "headers": {}} | ||
IMPACTS = {"5": "important", "7": "critical"} | ||
|
||
|
||
def run_oc(args): | ||
cmd = ["oc"] | ||
cmd.extend(args) | ||
cmd.extend(["-o", "yaml"]) | ||
result = subprocess.run(cmd, capture_output=True) | ||
return yaml.safe_load(result.stdout) | ||
|
||
|
||
def set_gabi_url(): | ||
routes = run_oc(["get", "route"]) | ||
for route in routes["items"]: | ||
if route["metadata"]["name"].startswith("gabi-"): | ||
GABI_CFG["url"] = f"https://{route['spec']['host']}/query" | ||
return | ||
print("Gabi URL not found! Are you logged in? (oc login)", file=sys.stderr) | ||
sys.exit(2) | ||
|
||
|
||
def set_gabi_token(): | ||
with open(KUBE_CONFIG_FILE, "r") as kube_config: | ||
kube_config = yaml.safe_load(kube_config.read()) | ||
context_users = {} | ||
for context in kube_config["contexts"]: | ||
context_users[context["name"]] = context["context"]["user"] | ||
current_context_name = kube_config["current-context"] | ||
current_context_user = context_users[current_context_name] | ||
for user in kube_config["users"]: | ||
if user["name"] == current_context_user: | ||
GABI_CFG["headers"]["Authorization"] = f"Bearer {user['user']['token']}" | ||
return | ||
print("Gabi token not found! Are you logged in? (oc login)", file=sys.stderr) | ||
sys.exit(2) | ||
|
||
|
||
def query(query): | ||
tries = 0 | ||
data = {"query": query} | ||
while tries <= 5: | ||
r = requests.post(GABI_CFG["url"], headers=GABI_CFG["headers"], json=data) | ||
if r.status_code == 200: | ||
return r.json()["result"] | ||
else: | ||
print(f"Query failed: {query}, HTTP code: {r.status_code}", file=sys.stderr) | ||
tries += 1 | ||
sys.exit(3) | ||
|
||
|
||
def validate_date(date_text): | ||
try: | ||
date.fromisoformat(date_text) | ||
except ValueError: | ||
return False | ||
return True | ||
|
||
|
||
def main(): | ||
if len(sys.argv) != 2 or not validate_date(sys.argv[1]): | ||
print(f"Invalid date format! Example run: {sys.argv[0]} 2023-01-01", file=sys.stderr) | ||
sys.exit(1) | ||
|
||
start_date = sys.argv[1] | ||
|
||
set_gabi_url() | ||
set_gabi_token() | ||
|
||
print(f"Gabi URL: {GABI_CFG['url']}") | ||
print(f"Start date: {start_date}") | ||
print("") | ||
|
||
top10cves = query( | ||
""" | ||
select cve.cve, cve.cvss3_score, count(*) | ||
from system_vulnerabilities_active sv join | ||
cve_metadata cve on sv.cve_id = cve.id | ||
where when_mitigated is null or ( | ||
mitigation_reason is null and rule_id in ( | ||
select id from insights_rule where active = true and rule_only = false | ||
) | ||
) | ||
group by cve.cve, cve.cvss3_score | ||
order by 3 desc | ||
limit 10; | ||
""" | ||
)[1:] | ||
print("Top 10 CVEs with most system hits:") | ||
for cve in top10cves: | ||
print(f"{cve[0]} (CVSS: {cve[1]}): {cve[2]}") | ||
print("") | ||
|
||
all_accounts = query( | ||
""" | ||
select rh_account_id, cnt | ||
from | ||
( | ||
select rh_account_id, count(id) as cnt | ||
from system_platform | ||
group by 1 | ||
) t1 join | ||
( | ||
select id | ||
from rh_account | ||
where cve_cache_keepalive is not null | ||
) t2 on t1.rh_account_id = t2.id; | ||
""" | ||
)[1:] | ||
all_account_cnt = len(all_accounts) | ||
print(f"All actively used accounts: {all_account_cnt}") | ||
|
||
accounts = query( | ||
f""" | ||
select rh_account_id, cnt | ||
from | ||
( | ||
select rh_account_id, count(id) as cnt | ||
from system_platform | ||
group by 1 | ||
) t1 join | ||
( | ||
select id | ||
from rh_account | ||
where cve_cache_keepalive is not null and | ||
cve_cache_keepalive >= '{start_date}' | ||
) t2 on t1.rh_account_id = t2.id | ||
where t1.cnt >= 10; | ||
""" | ||
)[1:] | ||
account_cnt = len(accounts) | ||
system_cnt = 0 | ||
for acc in accounts: | ||
system_cnt += int(acc[1]) | ||
print(f"Actively used accounts since {start_date} with >= 10 systems: {account_cnt}") | ||
print(f"Total systems in these accounts: {system_cnt}") | ||
print("") | ||
|
||
critical_cnt = 0 | ||
important_cnt = 0 | ||
exploited_cnt = 0 | ||
critical_cnt_gt5 = 0 | ||
important_cnt_gt5 = 0 | ||
exploited_cnt_gt3 = 0 | ||
critical_systems_cnt_gt5 = 0 | ||
important_systems_cnt_gt5 = 0 | ||
exploited_systems_cnt_gt5 = 0 | ||
for idx, account in enumerate(sorted(accounts)): | ||
cnts = query( | ||
f""" | ||
select impact_id, count(distinct cve_id), count(distinct system_id) | ||
from system_vulnerabilities_active sv join | ||
cve_metadata cve on sv.cve_id = cve.id | ||
where sv.rh_account_id = {account[0]} and | ||
sv.first_reported >= '{start_date}' and | ||
cve.impact_id in (5, 7) and ( | ||
sv.when_mitigated is null or ( | ||
sv.mitigation_reason is null and sv.rule_id in ( | ||
select id from insights_rule ir where ir.active = true and ir.rule_only = false | ||
) | ||
) | ||
) | ||
group by impact_id; | ||
""" | ||
)[1:] | ||
for impact_id, cnt, cnt_sys in cnts: | ||
impact = IMPACTS[impact_id] | ||
cnt = int(cnt) | ||
cnt_sys = int(cnt_sys) | ||
# print(f"acc {account[0]} {impact}: {cnt}, systems: {cnt_sys}") | ||
if impact == "critical": | ||
if cnt > 0: | ||
critical_cnt += 1 | ||
if cnt >= 5: | ||
critical_cnt_gt5 += 1 | ||
critical_systems_cnt_gt5 += cnt_sys | ||
elif impact == "important": | ||
if cnt > 0: | ||
important_cnt += 1 | ||
if cnt >= 5: | ||
important_cnt_gt5 += 1 | ||
important_systems_cnt_gt5 += cnt_sys | ||
else: | ||
print(f"ERR: unexpected impact: {impact}") | ||
sys.exit(2) | ||
|
||
cnts = query( | ||
f""" | ||
select count(distinct cve_id), count(distinct system_id) | ||
from system_vulnerabilities_active sv join | ||
cve_metadata cve on sv.cve_id = cve.id | ||
where sv.rh_account_id = {account[0]} and | ||
sv.first_reported >= '{start_date}' and | ||
cve.exploit_data is not null and ( | ||
sv.when_mitigated is null or ( | ||
sv.mitigation_reason is null and sv.rule_id in ( | ||
select id from insights_rule ir where ir.active = true and ir.rule_only = false | ||
) | ||
) | ||
); | ||
""" | ||
) | ||
cnt = int(cnts[1][0]) | ||
cnt_sys = int(cnts[1][1]) | ||
# print(f"acc {account[0]} exploited: {cnt}, systems: {cnt_sys}") | ||
if cnt > 0: | ||
exploited_cnt += 1 | ||
if cnt >= 3: | ||
exploited_cnt_gt3 += 1 | ||
exploited_systems_cnt_gt5 += cnt_sys | ||
|
||
if ((idx + 1) % 10) == 0: | ||
print(f"{idx+1}/{account_cnt} account counts done...") | ||
print("Account counts done.") | ||
print("") | ||
|
||
print(f"Accounts with Critical CVEs: {critical_cnt}") | ||
print(f"Accounts with Important CVEs: {important_cnt}") | ||
print(f"Accounts with Exploited CVEs: {exploited_cnt}") | ||
print(f"Accounts with >=5 Critical CVEs: {critical_cnt_gt5}") | ||
print(f"Accounts with >=5 Important CVEs: {important_cnt_gt5}") | ||
print(f"Accounts with >=3 Exploited CVEs: {exploited_cnt_gt3}") | ||
print(f"Systems in accounts with >=5 Critical CVEs: {critical_systems_cnt_gt5}") | ||
print(f"Systems in accounts with >=5 Important CVEs: {important_systems_cnt_gt5}") | ||
print(f"Systems in accounts with >=3 Exploited CVEs: {exploited_systems_cnt_gt5}") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |