Skip to content

Commit

Permalink
fix(aws): not show findings when AccessDenieds (#3803)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrCloudSec authored Apr 29, 2024
1 parent b361524 commit 35c8ea5
Show file tree
Hide file tree
Showing 97 changed files with 2,333 additions and 1,127 deletions.
2 changes: 2 additions & 0 deletions docs/developer-guide/services.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ class <Service>(ServiceParentClass):
f"{<item>.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
```
???+note
To avoid fake findings, when Prowler can't retrieve the items, because an Access Denied or similar error, we set that items value as `None`.

#### Service Models

Expand Down
6 changes: 4 additions & 2 deletions prowler/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,10 @@ def prowler():
global_provider,
global_provider.output_options,
)
# Only display compliance table if there are findings and it is a default execution
if findings and default_execution:
# Only display compliance table if there are findings (not all MANUAL) and it is a default execution
if (
findings and not all(finding.status == "MANUAL" for finding in findings)
) and default_execution:
compliance_overview = False
if not compliance_framework:
compliance_framework = get_available_compliance_frameworks(provider)
Expand Down
13 changes: 8 additions & 5 deletions prowler/lib/outputs/summary_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ def display_summary_table(
entity_type = "Context"
audited_entities = provider.identity.context

if findings:
# Check if there are findings and that they are not all MANUAL
if findings and not all(finding.status == "MANUAL" for finding in findings):
current = {
"Service": "",
"Provider": "",
"Total": 0,
"Pass": 0,
"Critical": 0,
"High": 0,
"Medium": 0,
Expand All @@ -70,9 +72,9 @@ def display_summary_table(
):
add_service_to_table(findings_table, current)

current["Total"] = current["Muted"] = current["Critical"] = current[
"High"
] = current["Medium"] = current["Low"] = 0
current["Total"] = current["Pass"] = current["Muted"] = current[
"Critical"
] = current["High"] = current["Medium"] = current["Low"] = 0

current["Service"] = finding.check_metadata.ServiceName
current["Provider"] = finding.check_metadata.Provider
Expand All @@ -83,6 +85,7 @@ def display_summary_table(
current["Muted"] += 1
if finding.status == "PASS":
pass_count += 1
current["Pass"] += 1
elif finding.status == "FAIL":
fail_count += 1
if finding.check_metadata.Severity == "critical":
Expand Down Expand Up @@ -155,7 +158,7 @@ def add_service_to_table(findings_table, current):
)
current["Status"] = f"{Fore.RED}FAIL ({total_fails}){Style.RESET_ALL}"
else:
current["Status"] = f"{Fore.GREEN}PASS ({current['Total']}){Style.RESET_ALL}"
current["Status"] = f"{Fore.GREEN}PASS ({current['Pass']}){Style.RESET_ALL}"

findings_table["Provider"].append(current["Provider"])
findings_table["Service"].append(current["Service"])
Expand Down
31 changes: 19 additions & 12 deletions prowler/providers/aws/aws_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,18 +789,25 @@ def assume_role(

def get_aws_enabled_regions(self, current_session: Session) -> set:
"""get_aws_enabled_regions returns a set of enabled AWS regions"""

# EC2 Client to check enabled regions
service = "ec2"
default_region = self.get_default_region(service)
ec2_client = current_session.client(service, region_name=default_region)

enabled_regions = set()
# With AllRegions=False we only get the enabled regions for the account
for region in ec2_client.describe_regions(AllRegions=False).get("Regions", []):
enabled_regions.add(region.get("RegionName"))

return enabled_regions
try:
# EC2 Client to check enabled regions
service = "ec2"
default_region = self.get_default_region(service)
ec2_client = current_session.client(service, region_name=default_region)

enabled_regions = set()
# With AllRegions=False we only get the enabled regions for the account
for region in ec2_client.describe_regions(AllRegions=False).get(
"Regions", []
):
enabled_regions.add(region.get("RegionName"))

return enabled_regions
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return set()

# TODO: review this function
# Maybe this should be done within the AwsProvider and not in __main__.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,26 @@ class account_maintain_different_contact_details_to_security_billing_and_operati
Check
):
def execute(self):
report = Check_Report_AWS(self.metadata())
report.region = account_client.region
report.resource_id = account_client.audited_account
report.resource_arn = account_client.audited_account_arn
findings = []
if account_client.contact_base:
report = Check_Report_AWS(self.metadata())
report.region = account_client.region
report.resource_id = account_client.audited_account
report.resource_arn = account_client.audited_account_arn

if (
len(account_client.contact_phone_numbers)
== account_client.number_of_contacts
and len(account_client.contact_names) == account_client.number_of_contacts
# This is because the primary contact has no email field
and len(account_client.contact_emails)
== account_client.number_of_contacts - 1
):
report.status = "PASS"
report.status_extended = "SECURITY, BILLING and OPERATIONS contacts found and they are different between each other and between ROOT contact."
else:
report.status = "FAIL"
report.status_extended = "SECURITY, BILLING and OPERATIONS contacts not found or they are not different between each other and between ROOT contact."
return [report]
if (
len(account_client.contact_phone_numbers)
== account_client.number_of_contacts
and len(account_client.contact_names)
== account_client.number_of_contacts
# This is because the primary contact has no email field
and len(account_client.contact_emails)
== account_client.number_of_contacts - 1
):
report.status = "PASS"
report.status_extended = "SECURITY, BILLING and OPERATIONS contacts found and they are different between each other and between ROOT contact."
else:
report.status = "FAIL"
report.status_extended = "SECURITY, BILLING and OPERATIONS contacts not found or they are not different between each other and between ROOT contact."
findings.append(report)
return findings
55 changes: 31 additions & 24 deletions prowler/providers/aws/services/account/account_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,29 @@ def __init__(self, provider):
self.contacts_security = self.__get_alternate_contact__("SECURITY")
self.contacts_operations = self.__get_alternate_contact__("OPERATIONS")

# Set of contact phone numbers
self.contact_phone_numbers = {
self.contact_base.phone_number,
self.contacts_billing.phone_number,
self.contacts_security.phone_number,
self.contacts_operations.phone_number,
}
if self.contact_base:
# Set of contact phone numbers
self.contact_phone_numbers = {
self.contact_base.phone_number,
self.contacts_billing.phone_number,
self.contacts_security.phone_number,
self.contacts_operations.phone_number,
}

# Set of contact names
self.contact_names = {
self.contact_base.name,
self.contacts_billing.name,
self.contacts_security.name,
self.contacts_operations.name,
}
# Set of contact names
self.contact_names = {
self.contact_base.name,
self.contacts_billing.name,
self.contacts_security.name,
self.contacts_operations.name,
}

# Set of contact emails
self.contact_emails = {
self.contacts_billing.email,
self.contacts_security.email,
self.contacts_operations.email,
}
# Set of contact emails
self.contact_emails = {
self.contacts_billing.email,
self.contacts_security.email,
self.contacts_operations.email,
}

def __get_contact_information__(self):
try:
Expand All @@ -53,10 +54,16 @@ def __get_contact_information__(self):
phone_number=primary_account_contact.get("PhoneNumber"),
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return Contact(type="PRIMARY")
if error.response["Error"]["Code"] == "AccessDeniedException":
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None
else:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return Contact(type="PRIMARY")

def __get_alternate_contact__(self, contact_type: str):
try:
Expand Down
11 changes: 10 additions & 1 deletion prowler/providers/aws/services/backup/backup_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime
from typing import Optional

from botocore.client import ClientError
from pydantic import BaseModel

from prowler.lib.logger import logger
Expand Down Expand Up @@ -37,6 +38,8 @@ def __list_backup_vaults__(self, regional_client):
self.audit_resources,
)
):
if self.backup_vaults is None:
self.backup_vaults = []
self.backup_vaults.append(
BackupVault(
arn=configuration.get("BackupVaultArn"),
Expand All @@ -55,7 +58,13 @@ def __list_backup_vaults__(self, regional_client):
),
)
)

except ClientError as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
if error.response["Error"]["Code"] == "AccessDeniedException":
if not self.backup_vaults:
self.backup_vaults = None
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@
class backup_vaults_encrypted(Check):
def execute(self):
findings = []

for backup_vault in backup_client.backup_vaults:
# By default we assume that the result is fail
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"
report.status_extended = (
f"Backup Vault {backup_vault.name} is not encrypted."
)
report.resource_arn = backup_vault.arn
report.resource_id = backup_vault.name
report.region = backup_vault.region
# if it is encrypted we only change the status and the status extended
if backup_vault.encryption:
report.status = "PASS"
if backup_client.backup_vaults:
for backup_vault in backup_client.backup_vaults:
# By default we assume that the result is fail
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"
report.status_extended = (
f"Backup Vault {backup_vault.name} is encrypted."
f"Backup Vault {backup_vault.name} is not encrypted."
)
# then we store the finding
findings.append(report)
report.resource_arn = backup_vault.arn
report.resource_id = backup_vault.name
report.region = backup_vault.region
# if it is encrypted we only change the status and the status extended
if backup_vault.encryption:
report.status = "PASS"
report.status_extended = (
f"Backup Vault {backup_vault.name} is encrypted."
)
# then we store the finding
findings.append(report)

return findings
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@
class backup_vaults_exist(Check):
def execute(self):
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"
report.status_extended = "No Backup Vault exist."
report.resource_arn = backup_client.backup_vault_arn_template
report.resource_id = backup_client.audited_account
report.region = backup_client.region
if backup_client.backup_vaults:
report.status = "PASS"
report.status_extended = f"At least one backup vault exists: {backup_client.backup_vaults[0].name}."
report.resource_arn = backup_client.backup_vaults[0].arn
report.resource_id = backup_client.backup_vaults[0].name
report.region = backup_client.backup_vaults[0].region
if backup_client.backup_vaults is not None:
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"
report.status_extended = "No Backup Vault exist."
report.resource_arn = backup_client.backup_vault_arn_template
report.resource_id = backup_client.audited_account
report.region = backup_client.region
if backup_client.backup_vaults:
report.status = "PASS"
report.status_extended = f"At least one backup vault exists: {backup_client.backup_vaults[0].name}."
report.resource_arn = backup_client.backup_vaults[0].arn
report.resource_id = backup_client.backup_vaults[0].name
report.region = backup_client.backup_vaults[0].region

findings.append(report)
findings.append(report)
return findings
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,29 @@
class cloudtrail_bucket_requires_mfa_delete(Check):
def execute(self):
findings = []
for trail in cloudtrail_client.trails.values():
if trail.is_logging:
trail_bucket_is_in_account = False
trail_bucket = trail.s3_bucket
report = Check_Report_AWS(self.metadata())
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.arn
report.resource_tags = trail.tags
report.status = "FAIL"
report.status_extended = f"Trail {trail.name} bucket ({trail_bucket}) does not have MFA delete enabled."
for bucket in s3_client.buckets:
if trail_bucket == bucket.name:
trail_bucket_is_in_account = True
if bucket.mfa_delete:
report.status = "PASS"
report.status_extended = f"Trail {trail.name} bucket ({trail_bucket}) has MFA delete enabled."
# check if trail bucket is a cross account bucket
if not trail_bucket_is_in_account:
report.status = "MANUAL"
report.status_extended = f"Trail {trail.name} bucket ({trail_bucket}) is a cross-account bucket in another account out of Prowler's permissions scope, please check it manually."
if cloudtrail_client.trails is not None:
for trail in cloudtrail_client.trails.values():
if trail.is_logging:
trail_bucket_is_in_account = False
trail_bucket = trail.s3_bucket
report = Check_Report_AWS(self.metadata())
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.arn
report.resource_tags = trail.tags
report.status = "FAIL"
report.status_extended = f"Trail {trail.name} bucket ({trail_bucket}) does not have MFA delete enabled."
for bucket in s3_client.buckets:
if trail_bucket == bucket.name:
trail_bucket_is_in_account = True
if bucket.mfa_delete:
report.status = "PASS"
report.status_extended = f"Trail {trail.name} bucket ({trail_bucket}) has MFA delete enabled."
# check if trail bucket is a cross account bucket
if not trail_bucket_is_in_account:
report.status = "MANUAL"
report.status_extended = f"Trail {trail.name} bucket ({trail_bucket}) is a cross-account bucket in another account out of Prowler's permissions scope, please check it manually."

findings.append(report)
findings.append(report)

return findings
Loading

0 comments on commit 35c8ea5

Please sign in to comment.