diff --git a/prowler/config/config.yaml b/prowler/config/config.yaml index e319f4a029..c37802b260 100644 --- a/prowler/config/config.yaml +++ b/prowler/config/config.yaml @@ -125,6 +125,7 @@ aws: # ] organizations_enabled_regions: [] organizations_trusted_delegated_administrators: [] + organizations_trusted_ids: [] # AWS ECR # aws.ecr_repositories_scan_vulnerabilities_in_latest_image diff --git a/prowler/providers/aws/services/secretsmanager/secretsmanager_has_restrictive_resource_policy/__init__.py b/prowler/providers/aws/services/secretsmanager/secretsmanager_has_restrictive_resource_policy/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/aws/services/secretsmanager/secretsmanager_has_restrictive_resource_policy/secretsmanager_has_restrictive_resource_policy.metadata.json b/prowler/providers/aws/services/secretsmanager/secretsmanager_has_restrictive_resource_policy/secretsmanager_has_restrictive_resource_policy.metadata.json new file mode 100644 index 0000000000..ff7f714244 --- /dev/null +++ b/prowler/providers/aws/services/secretsmanager/secretsmanager_has_restrictive_resource_policy/secretsmanager_has_restrictive_resource_policy.metadata.json @@ -0,0 +1,34 @@ +{ + "Provider": "aws", + "CheckID": "secretsmanager_has_restrictive_resource_policy", + "CheckTitle": "Ensure Secrets Manager secrets have restrictive resource-based policies.", + "CheckType": [ + "Software and Configuration Checks/AWS Security Best Practices" + ], + "ServiceName": "secretsmanager", + "SubServiceName": "", + "ResourceIdTemplate": "arn:aws:secretsmanager:region:account-id:secret:secret-name", + "Severity": "high", + "ResourceType": "AwsSecretsManagerSecret", + "Description": "This check verifies whether Secrets Manager secrets have resource-based policies that restrict access.", + "Risk": "Secrets without restrictive resource-based policies may be accessed by unauthorized entities, leading to potential data breaches.", + "RelatedUrl": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/auth-and-access_resource-policies.html", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Ensure that Secrets Manager policies restrict access to authorized principals only, following the Principle of Least Privilege.", + "Url": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/determine-acccess_examine-iam-policies.html" + } + }, + "Categories": [ + "access-control" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/secretsmanager/secretsmanager_has_restrictive_resource_policy/secretsmanager_has_restrictive_resource_policy.py b/prowler/providers/aws/services/secretsmanager/secretsmanager_has_restrictive_resource_policy/secretsmanager_has_restrictive_resource_policy.py new file mode 100644 index 0000000000..b5e4ecf155 --- /dev/null +++ b/prowler/providers/aws/services/secretsmanager/secretsmanager_has_restrictive_resource_policy/secretsmanager_has_restrictive_resource_policy.py @@ -0,0 +1,254 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.secretsmanager.secretsmanager_client import ( + secretsmanager_client, +) +import re + + +class secretsmanager_has_restrictive_resource_policy(Check): + def execute(self): + findings = [] + organizations_trusted_ids = secretsmanager_client.audit_config.get( + "organizations_trusted_ids", [] + ) + # Regular expression to match IAM roles or users without wildcard * in their name + arn_pattern = r"arn:aws:iam::\d{12}:(role|user)/([^*]+)$" + # Regular expression to match AWS service names + service_pattern = r"^[a-z0-9-]+\.amazonaws\.com$" + + for secret in secretsmanager_client.secrets.values(): + report = Check_Report_AWS(self.metadata(), resource=secret) + report.region = secret.region + report.resource_id = secret.name + report.resource_arn = secret.arn + report.resource_tags = secret.tags + report.status = "FAIL" + # Determine the Role ARN to be used + assumed_role_config = getattr( + secretsmanager_client.provider, "_assumed_role_configuration", None + ) + if ( + assumed_role_config + and getattr(assumed_role_config, "info", None) + and getattr(assumed_role_config.info, "role_arn", None) + and getattr(assumed_role_config.info.role_arn, "arn", None) + ): + final_role_arn = assumed_role_config.info.role_arn.arn + else: + identity_arn = secretsmanager_client.provider.identity.identity_arn + if identity_arn: + # If the identity ARN is a sts assumed-role ARN, transform it + match = re.match( + r"arn:aws:sts::(\d+):assumed-role/([^/]+)/", identity_arn + ) + if match: + account_id, role_name = match.groups() + final_role_arn = f"arn:aws:iam::{account_id}:role/{role_name}" + else: + final_role_arn = identity_arn + else: + final_role_arn = "None" + + report.status_extended = ( + f"SecretsManager secret '{secret.name}' does not have a resource-based policy " + f"or access to the policy is denied for the role '{final_role_arn}'" + ) + + if secret.policy: + statements = secret.policy.get("Statement", []) + not_denied_principals = [] + not_denied_services = [] + + # Check for an explicit Deny that applies to all Principals except those defined in the Condition + has_explicit_deny_for_all = any( + "*" in self.extract_field(statement.get("Principal", {})) + and any( + action in self.extract_field(statement.get("Action", [])) + for action in ["*", "secretsmanager:*"] + ) + and self.is_valid_resource( + secret, self.extract_field(statement.get("Resource", "*")) + ) + and "Condition" in statement + # accept only the allowed Condition operators + and all( + operator in ["StringNotEquals", "StringNotEqualsIfExists"] + for operator in statement["Condition"].keys() + ) + # no PrincipalArn nor Service of the Condition must have wildcard * in its name + and all( + self.is_valid_principal( + principal_value=principal_value, + not_denied_list=not_denied_list, + pattern=pattern, + ) + for operator, condition_values in statement["Condition"].items() + for principal_key, principal_value in condition_values.items() + # only these two keys are allowed: + for mapping in [ + { + "aws:PrincipalArn": ( + not_denied_principals, + arn_pattern, + ), + "aws:PrincipalServiceName": ( + not_denied_services, + service_pattern, + ), + }.get(principal_key) + ] + # the principal_key decides which list and pattern is passed to is_valid_principal + for not_denied_list, pattern in [ + mapping if mapping is not None else (None, None) + ] + # direct tuple unpacking of the mapping + ) + for statement in statements + if statement.get("Effect") == "Deny" + ) + + # Check for Deny with "StringNotEquals":"aws:PrincipalOrgID" condition + has_deny_outside_org = ( + True + if not organizations_trusted_ids + else any( + ( + ("*" in self.extract_field(statement.get("Principal", {}))) + or ( + "NotPrincipal" in statement + and any( + allowed_service + in self.extract_field( + statement.get("NotPrincipal", {}) + ) + for allowed_service in not_denied_services + ) + ) + ) + and any( + action in self.extract_field(statement.get("Action", [])) + for action in ["*", "secretsmanager:*"] + ) + and self.is_valid_resource( + secret, self.extract_field(statement.get("Resource", "*")) + ) + and "Condition" in statement + and set(statement["Condition"].keys()) == {"StringNotEquals"} + and set(statement["Condition"]["StringNotEquals"].keys()) + == {"aws:PrincipalOrgID"} + and statement["Condition"]["StringNotEquals"][ + "aws:PrincipalOrgID" + ] + in organizations_trusted_ids + for statement in statements + if statement.get("Effect") == "Deny" + ) + ) + + # Check for "NotActions" without wildcard * for not_denied_principals and not_denied_services + failed_principals = [] + failed_services = [] + + # Validate that NotAction does not contain wildcards for specified principals + for statement in statements: + if statement.get("Effect") == "Deny": + principals = self.extract_field(statement.get("Principal", {})) + + # Check "NotAction" of Deny statements only for not_denied_principals + for principal in principals: + if principal in not_denied_principals: + if "NotAction" not in statement or any( + "*" in action + for action in self.extract_field( + statement.get("NotAction", []) + ) + ): + failed_principals.append(principal) + + # Allow-Statement for not-denied services must not have any wildcards in "Action" + # and "SourceAccount" must be the audited account + for statement in statements: + if statement.get("Effect") == "Allow": + principals = self.extract_field(statement.get("Principal", {})) + for service in principals: + if service in not_denied_services: + condition = statement.get("Condition", {}) + actions = self.extract_field( + statement.get("Action", []) + ) + if ( + "StringEquals" not in condition + or condition.get("StringEquals", {}).get( + "aws:SourceAccount" + ) + != secretsmanager_client.audited_account + or len(condition) + > 1 # only "StringEquals" is allowed + or any("*" in action for action in actions) + ): # wildcard in "Action" is not allowed + failed_services.append(service) + + has_specific_not_actions = len(failed_principals) == 0 + has_valid_service_policies = len(failed_services) == 0 + + # Determine if the policy satisfies all conditions + if ( + has_explicit_deny_for_all + and has_deny_outside_org + and has_specific_not_actions + and has_valid_service_policies + ): + report.status = "PASS" + report.status_extended = f"SecretsManager secret '{secret.name}' has a sufficiently restrictive resource-based policy." + else: + report.status = "FAIL" + report.status_extended = f"SecretsManager secret '{secret.name}' does not meet all required restrictions: " + if not has_explicit_deny_for_all: + report.status_extended += "Missing or incorrect 'Deny' statement for all Principals with wildcard Action. " + if not has_deny_outside_org: + report.status_extended += "Missing or incorrect 'Deny' statement restricting access outside 'PrincipalOrgID'. " + if not has_specific_not_actions: + report.status_extended += f"Missing field 'NotAction' or disallowed wildcard * in the 'NotAction' field of the 'Deny' statement for the specific Principal(s) {failed_principals if failed_principals else ''}. " + if not has_valid_service_policies: + report.status_extended += f"Invalid 'Allow' statements for Service Principals {failed_services if failed_services else ''}. " + + findings.append(report) + + return findings + + # Extract values from a field to return an array containing the field, + # handling single values, arrays and dict with keys "AWS" or "Service". + # If the field is empty or invalid, return the default_value in the array. + def extract_field(self, field, default_value=None): + if isinstance(field, str): + return [field] + elif isinstance(field, list): + return field + elif isinstance(field, dict): + for key in ("AWS", "Service"): + if key in field: + return [field[key]] if isinstance(field[key], str) else field[key] + return [default_value] + + def is_valid_resource(self, secret, resource): + """Check if the Resource field is valid for the given secret.""" + if resource == "*": + return True # Wildcard resource is acceptable in general cases + if isinstance(resource, list): + if "*" in resource: + return True + return all(r == secret.arn for r in resource) + return resource == secret.arn + + def is_valid_principal(self, principal_value, not_denied_list, pattern): + if not_denied_list is None or pattern is None: + return False + + principals = self.extract_field(principal_value) + for principal in principals: + if re.match(pattern, principal): + not_denied_list.append(principal) + else: + return False + + return True diff --git a/tests/providers/aws/services/secretsmanager/secretsmanager_has_restrictive_resource_policy/secretsmanager_has_restrictive_resource_policy_test.py b/tests/providers/aws/services/secretsmanager/secretsmanager_has_restrictive_resource_policy/secretsmanager_has_restrictive_resource_policy_test.py new file mode 100644 index 0000000000..2f7e40e331 --- /dev/null +++ b/tests/providers/aws/services/secretsmanager/secretsmanager_has_restrictive_resource_policy/secretsmanager_has_restrictive_resource_policy_test.py @@ -0,0 +1,716 @@ +import json +import pytest +from unittest import mock +from boto3 import client +from moto import mock_aws + +from prowler.providers.aws.services.secretsmanager.secretsmanager_service import ( + SecretsManager, +) +from tests.providers.aws.utils import AWS_REGION_EU_WEST_1, set_mocked_aws_provider + + +@pytest.fixture(scope="function") +def secretsmanager_client(): + with mock_aws(): + client_instance = client("secretsmanager", region_name=AWS_REGION_EU_WEST_1) + secret = client_instance.create_secret(Name="test-secret") + yield client_instance, secret["ARN"] + + +class TestSecretsManagerHasRestrictiveResourcePolicy: + + def test_no_secrets(self): + with mock_aws(): + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.secretsmanager.secretsmanager_has_restrictive_resource_policy.secretsmanager_has_restrictive_resource_policy import ( + secretsmanager_client, + ) + + secretsmanager_client.secrets.clear() + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.secretsmanager.secretsmanager_has_restrictive_resource_policy.secretsmanager_has_restrictive_resource_policy.secretsmanager_client", + new=SecretsManager(aws_provider), + ): + from prowler.providers.aws.services.secretsmanager.secretsmanager_has_restrictive_resource_policy.secretsmanager_has_restrictive_resource_policy import ( + secretsmanager_has_restrictive_resource_policy, + ) + + check = secretsmanager_has_restrictive_resource_policy() + result = check.execute() + + assert len(result) == 0 + + def test_secret_with_weak_policy(self, secretsmanager_client): + client_instance, secret_arn = secretsmanager_client + client_instance.put_resource_policy( + SecretId=secret_arn, + ResourcePolicy=json.dumps( + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": "*", + "Action": "secretsmanager:GetSecretValue", + "Resource": "*", + } + ], + }, + indent=4, + ), + ) + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.secretsmanager.secretsmanager_has_restrictive_resource_policy.secretsmanager_has_restrictive_resource_policy.secretsmanager_client", + new=SecretsManager(aws_provider), + ): + from prowler.providers.aws.services.secretsmanager.secretsmanager_has_restrictive_resource_policy.secretsmanager_has_restrictive_resource_policy import ( + secretsmanager_has_restrictive_resource_policy, + ) + + check = secretsmanager_has_restrictive_resource_policy() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + + @pytest.mark.parametrize( + "description, remove_index, modify_element, expected_status", + [ + # test unmodified policy + ("Valid Policy", None, None, "PASS"), + # test modified statement DenyUnauthorizedPrincipals + ( + "Invalid Effect in DenyUnauthorizedPrincipals", + None, + (0, {"Effect": "Allow"}), + "FAIL", + ), + ( + "Valid Effect in DenyUnauthorizedPrincipals", + None, + (0, {"Effect": "Deny"}), + "PASS", + ), + ( + "Invalid Action in DenyUnauthorizedPrincipals", + None, + (0, {"Action": "InvalidAction"}), + "FAIL", + ), + ( + "Valid Action in DenyUnauthorizedPrincipals", + None, + (0, {"Action": "*"}), + "PASS", + ), + ( + "Invalid Resource in DenyUnauthorizedPrincipals", + None, + ( + 0, + { + "Resource": "arn:aws:secretsmanager:eu-central-1:123456789012:secret:wrong-secret" + }, + ), + "FAIL", + ), + ( + "Valid Resource in DenyUnauthorizedPrincipals", + None, + (0, {"Resource": "*"}), + "PASS", + ), + ( + "Invalid Condition Operator in DenyUnauthorizedPrincipals", + None, + ( + 0, + { + "Condition": { + "WrongOperator": { + "aws:PrincipalArn": "arn:aws:iam::123456789012:role/AccountSecurityAuditRole" + } + } + }, + ), + "FAIL", + ), + ( + "Valid Condition Operator in DenyUnauthorizedPrincipals", + None, + ( + 0, + { + "Condition": { + "StringNotEquals": { + "aws:PrincipalArn": "arn:aws:iam::123456789012:role/AccountSecurityAuditRole" + } + } + }, + ), + "PASS", + ), + ( + "Invalid Condition Key in DenyUnauthorizedPrincipals", + None, + ( + 0, + { + "Condition": { + "StringNotEquals": { + "aws:wrongKey": "arn:aws:iam::123456789012:role/AccountSecurityAuditRole" + } + } + }, + ), + "FAIL", + ), + ( + "Valid Condition Key in DenyUnauthorizedPrincipals", + None, + ( + 0, + { + "Condition": { + "StringNotEquals": { + "aws:PrincipalArn": "arn:aws:iam::123456789012:role/AccountSecurityAuditRole" + } + } + }, + ), + "PASS", + ), + ( + "Invalid Principal with wildcard in Condition in DenyUnauthorizedPrincipals", + None, + ( + 0, + { + "Condition": { + "StringNotEquals": { + "aws:PrincipalArn": "arn:aws:iam::123456789012:role/*" + } + } + }, + ), + "FAIL", + ), + ( + "Valid Principal w/o wildcard in Condition in DenyUnauthorizedPrincipals", + None, + ( + 0, + { + "Condition": { + "StringNotEquals": { + "aws:PrincipalArn": "arn:aws:iam::123456789012:role/AccountSecurityAuditRole" + } + } + }, + ), + "PASS", + ), + ( + "Invalid Service Principal in Condition in DenyUnauthorizedPrincipals", + None, + ( + 0, + { + "Condition": { + "StringNotEquals": { + "aws:PrincipalServiceName": "invalid.service.com" + } + } + }, + ), + "FAIL", + ), + ( + "Valid Service Principal in Condition in DenyUnauthorizedPrincipals", + None, + ( + 0, + { + "Condition": { + "StringNotEquals": { + "aws:PrincipalServiceName": "valid.amazonaws.com" + } + } + }, + ), + "PASS", + ), + # test modified statement DenyOutsideOrganization + ( + "Invalid Effect in DenyOutsideOrganization", + None, + (1, {"Effect": "Allow"}), + "FAIL", + ), + ( + "Valid Effect in DenyOutsideOrganization", + None, + (1, {"Effect": "Deny"}), + "PASS", + ), + ( + "Invalid Action in DenyOutsideOrganization", + None, + (1, {"Action": "secretsmanager:InvalidAction"}), + "FAIL", + ), + ( + "Valid Action in DenyOutsideOrganization", + None, + (1, {"Action": "secretsmanager:*"}), + "PASS", + ), + ( + "Invalid Resource in DenyOutsideOrganization", + None, + ( + 1, + { + "Resource": "arn:aws:secretsmanager:eu-central-1:123456789012:secret:wrong-secret" + }, + ), + "FAIL", + ), + ( + "Invalid Condition Operator in DenyOutsideOrganization", + None, + ( + 1, + { + "Condition": { + "WrongOperator": {"aws:PrincipalOrgID": "o-1234567890"} + } + }, + ), + "FAIL", + ), + ( + "Valid Condition Operator in DenyOutsideOrganization", + None, + ( + 1, + { + "Condition": { + "StringNotEquals": {"aws:PrincipalOrgID": "o-1234567890"} + } + }, + ), + "PASS", + ), + ( + "Invalid Condition Key in DenyOutsideOrganization", + None, + ( + 1, + { + "Condition": { + "StringNotEquals": {"aws:wrongKey": "o-1234567890"} + } + }, + ), + "FAIL", + ), + ( + "Valid Condition Key in DenyOutsideOrganization", + None, + ( + 1, + { + "Condition": { + "StringNotEquals": {"aws:PrincipalOrgID": "o-1234567890"} + } + }, + ), + "PASS", + ), + ( + "Invalid PrincipalOrgID in DenyOutsideOrganization", + None, + ( + 1, + { + "Condition": { + "StringNotEquals": {"aws:PrincipalOrgID": "o-invalid"} + } + }, + ), + "FAIL", + ), + ( + "Valid PrincipalOrgID in DenyOutsideOrganization", + None, + ( + 1, + { + "Condition": { + "StringNotEquals": {"aws:PrincipalOrgID": "o-1234567890"} + } + }, + ), + "PASS", + ), + # test modified statement AllowAuditPolicyRead + ( + "Invalid wildcard in NotAction in AllowAuditPolicyRead", + None, + (2, {"NotAction": "*"}), + "FAIL", + ), + ( + "No wildcard in NotAction in AllowAuditPolicyRead", + None, + (2, {"NotAction": "secretsmanager:DescribeSecret"}), + "PASS", + ), + ( + "Invalid wildcard in NotAction in AllowSecretAccessForRole2", + None, + (3, {"NotAction": "*"}), + "FAIL", + ), + ( + "No wildcard in NotAction in AllowSecretAccessForRole2", + None, + (3, {"NotAction": "secretsmanager:DescribeSecret"}), + "PASS", + ), + ( + "Invalid wildcard in NotAction in both statements", + None, + [(2, {"NotAction": "*"}), (3, {"NotAction": "secretsmanager:*"})], + "FAIL", + ), + ( + "No wildcard in NotAction in both statements", + None, + [ + (2, {"NotAction": "secretsmanager:DescribeSecret"}), + (3, {"NotAction": "secretsmanager:GetSecretValue"}), + ], + "PASS", + ), + # test policy with removed statements + ("Missing DenyUnauthorizedPrincipals", 0, None, "FAIL"), + ("Missing DenyOutsideOrganization", 1, None, "FAIL"), + # the following 2 test cases PASS because these statements are not required to make the Policy secure + # but in practice the AWS Principal will not be able to access the secret + ("Missing AllowAuditPolicyRead", 2, None, "PASS"), + ("Missing AllowSecretAccessForRole2", 3, None, "PASS"), + ], + ) + def test_secretsmanager_policies_for_principals( + self, + secretsmanager_client, + description, + remove_index, + modify_element, + expected_status, + ): + with mock_aws(): + client_instance, secret_arn = secretsmanager_client + + valid_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "DenyUnauthorizedPrincipals", + "Effect": "Deny", + "Principal": "*", + "Action": "*", + "Resource": "*", + "Condition": { + "StringNotEquals": { + "aws:PrincipalArn": [ + "arn:aws:iam::123456789012:role/AccountSecurityAuditRole", + "arn:aws:iam::123456789012:role/Role2", + ] + } + }, + }, + { + "Sid": "DenyOutsideOrganization", + "Effect": "Deny", + "Principal": "*", + "Action": "secretsmanager:*", + "Resource": "*", + "Condition": { + "StringNotEquals": {"aws:PrincipalOrgID": "o-1234567890"} + }, + }, + { + "Sid": "AllowAuditPolicyRead", + "Effect": "Deny", + "Principal": { + "AWS": "arn:aws:iam::123456789012:role/AccountSecurityAuditRole" + }, + "NotAction": [ + "secretsmanager:DescribeSecret", + "secretsmanager:GetResourcePolicy", + ], + "Resource": "*", + }, + { + "Sid": "AllowSecretAccessForRole2", + "Effect": "Deny", + "Principal": {"AWS": "arn:aws:iam::123456789012:role/Role2"}, + "NotAction": ["secretsmanager:GetSecretValue"], + "Resource": "*", + }, + ], + } + + policy_copy = json.loads(json.dumps(valid_policy)) + if remove_index is not None: + del policy_copy["Statement"][remove_index] + if modify_element is not None: + if isinstance(modify_element, list): + for index, value in modify_element: + policy_copy["Statement"][index].update(value) + else: + index, value = modify_element + policy_copy["Statement"][index].update(value) + + client_instance.put_resource_policy( + SecretId=secret_arn, ResourcePolicy=json.dumps(policy_copy) + ) + + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.secretsmanager.secretsmanager_has_restrictive_resource_policy.secretsmanager_has_restrictive_resource_policy.secretsmanager_client", + new=SecretsManager(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.secretsmanager.secretsmanager_has_restrictive_resource_policy.secretsmanager_has_restrictive_resource_policy.secretsmanager_client.audit_config", + {"organizations_trusted_ids": "o-1234567890"}, + ): + from prowler.providers.aws.services.secretsmanager.secretsmanager_has_restrictive_resource_policy.secretsmanager_has_restrictive_resource_policy import ( + secretsmanager_has_restrictive_resource_policy, + ) + + check = secretsmanager_has_restrictive_resource_policy() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == expected_status, f"Test case: {description}" + + @pytest.mark.parametrize( + "description, modify_element, expected_status", + [ + # test unmodified policy + ( + "Valid unmodified Policy with PrincipalArn and Service", + None, + "PASS", + ), + # test statement DenyOutsideOrganization + ( + "Invalid DenyOutsideOrganization using NotPrincipal with disallowed service", + (1, {"NotPrincipal": {"Service": "invalid.service.com"}}), + "FAIL", + ), + # test statement AllowAppFlowAccess + ( + "Invalid wildcard '*' in Action in AllowAppFlowAccess", + (4, {"Action": "*"}), + "FAIL", + ), + ( + "No wildcard '*' in Action in AllowAppFlowAccess", + (4, {"Action": "secretsmanager:GetSecretValue"}), + "PASS", + ), + ( + "Invalid wildcard 'secretsmanager:*' in Action in AllowAppFlowAccess", + (4, {"Action": "secretsmanager:*"}), + "FAIL", + ), + ( + "No wildcard 'secretsmanager:*' in Action in AllowAppFlowAccess", + (4, {"Action": "secretsmanager:ValidAction"}), + "PASS", + ), + ( + "Missing Condition in AllowAppFlowAccess", + (4, {"Condition": {}}), + "FAIL", + ), + ( + "Valid Condition in AllowAppFlowAccess", + ( + 4, + { + "Condition": { + "StringEquals": {"aws:SourceAccount": "123456789012"} + } + }, + ), + "PASS", + ), + ( + "Invalid Condition Operator in AllowAppFlowAccess", + ( + 4, + { + "Condition": { + "WrongOperator": {"aws:SourceAccount": "123456789012"} + } + }, + ), + "FAIL", + ), + ( + "Valid Condition Operator in AllowAppFlowAccess", + ( + 4, + { + "Condition": { + "StringEquals": {"aws:SourceAccount": "123456789012"} + } + }, + ), + "PASS", + ), + ( + "Invalid Condition Key in AllowAppFlowAccess", + (4, {"Condition": {"StringEquals": {"aws:WrongKey": "123456789012"}}}), + "FAIL", + ), + ( + "Valid Condition Key in AllowAppFlowAccess", + ( + 4, + { + "Condition": { + "StringEquals": {"aws:SourceAccount": "123456789012"} + } + }, + ), + "PASS", + ), + ], + ) + def test_secretsmanager_policies_for_services( + self, secretsmanager_client, description, modify_element, expected_status + ): + with mock_aws(): + client_instance, secret_arn = secretsmanager_client + + valid_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "DenyUnauthorizedPrincipals", + "Effect": "Deny", + "Principal": "*", + "Action": "*", + "Resource": "*", + "Condition": { + "StringNotEqualsIfExists": { + "aws:PrincipalArn": [ + "arn:aws:iam::123456789012:role/AccountSecurityAuditRole", + "arn:aws:iam::123456789012:role/Role2", + ], + "aws:PrincipalServiceName": "appflow.amazonaws.com", + } + }, + }, + { + "Sid": "DenyOutsideOrganization", + "Effect": "Deny", + "NotPrincipal": {"Service": "appflow.amazonaws.com"}, + "Action": "secretsmanager:*", + "Resource": "*", + "Condition": { + "StringNotEquals": {"aws:PrincipalOrgID": "o-1234567890"} + }, + }, + { + "Sid": "AllowAuditPolicyRead", + "Effect": "Deny", + "Principal": { + "AWS": "arn:aws:iam::123456789012:role/AccountSecurityAuditRole" + }, + "NotAction": [ + "secretsmanager:DescribeSecret", + "secretsmanager:GetResourcePolicy", + ], + "Resource": "*", + }, + { + "Sid": "AllowSecretAccessForRole2", + "Effect": "Deny", + "Principal": {"AWS": "arn:aws:iam::123456789012:role/Role2"}, + "NotAction": ["secretsmanager:GetSecretValue"], + "Resource": "*", + }, + { + "Sid": "AllowAppFlowAccess", + "Effect": "Allow", + "Principal": {"Service": "appflow.amazonaws.com"}, + "Action": [ + "secretsmanager:GetSecretValue", + "secretsmanager:PutSecretValue", + "secretsmanager:DeleteSecret", + "secretsmanager:DescribeSecret", + "secretsmanager:UpdateSecret", + ], + "Resource": "*", + "Condition": { + "StringEquals": {"aws:SourceAccount": "123456789012"} + }, + }, + ], + } + + policy_copy = json.loads(json.dumps(valid_policy)) + + if modify_element is not None: + if isinstance(modify_element, list): + for index, value in modify_element: + policy_copy["Statement"][index].update(value) + else: + index, value = modify_element + policy_copy["Statement"][index].update(value) + + client_instance.put_resource_policy( + SecretId=secret_arn, ResourcePolicy=json.dumps(policy_copy) + ) + + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.secretsmanager.secretsmanager_has_restrictive_resource_policy.secretsmanager_has_restrictive_resource_policy.secretsmanager_client", + new=SecretsManager(aws_provider), + ), mock.patch( + "prowler.providers.aws.services.secretsmanager.secretsmanager_has_restrictive_resource_policy.secretsmanager_has_restrictive_resource_policy.secretsmanager_client.audit_config", + {"organizations_trusted_ids": "o-1234567890"}, + ): + from prowler.providers.aws.services.secretsmanager.secretsmanager_has_restrictive_resource_policy.secretsmanager_has_restrictive_resource_policy import ( + secretsmanager_has_restrictive_resource_policy, + ) + + check = secretsmanager_has_restrictive_resource_policy() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == expected_status, f"Test case: {description}"