From 1cd9d2d39a1b9476c6c2dd5983054f60e467dd08 Mon Sep 17 00:00:00 2001 From: gruebel Date: Sat, 7 Oct 2023 23:18:53 +0200 Subject: [PATCH] create ResourcePolicyDocument with internet reachable check --- .../scan/assume_role_policy_document.py | 74 +---- .../scan/resource_policy_document.py | 260 ++++++++++++++++++ .../scanning/test_resource_policy_document.py | 146 ++++++++++ 3 files changed, 421 insertions(+), 59 deletions(-) create mode 100644 cloudsplaining/scan/resource_policy_document.py create mode 100644 test/scanning/test_resource_policy_document.py diff --git a/cloudsplaining/scan/assume_role_policy_document.py b/cloudsplaining/scan/assume_role_policy_document.py index 17ab393a..e8b64739 100644 --- a/cloudsplaining/scan/assume_role_policy_document.py +++ b/cloudsplaining/scan/assume_role_policy_document.py @@ -4,23 +4,31 @@ # Licensed under the BSD 3-Clause license. # For full license text, see the LICENSE file in the repo root # or https://opensource.org/licenses/BSD-3-Clause +from __future__ import annotations + import logging from typing import Dict, Any, List +from cloudsplaining.scan.resource_policy_document import ( + ResourcePolicyDocument, + ResourceStatement, +) from cloudsplaining.shared.constants import SERVICE_PREFIXES_WITH_COMPUTE_ROLES logger = logging.getLogger(__name__) -class AssumeRolePolicyDocument: - """ - Holds the AssumeRole/Trust Policy document +class AssumeRolePolicyDocument(ResourcePolicyDocument): + """Holds the AssumeRole/Trust Policy document + + It is a specialized version of a Resource-based policy """ - def __init__(self, policy: Dict[str, Any]) -> None: + def __init__(self, policy: dict[str, Any]) -> None: statement_structure = policy.get("Statement", []) self.policy = policy - self.statements = [] + # We would actually need to define a proper base class with a generic type for statements + self.statements: list[AssumeRoleStatement] = [] # type:ignore[assignment] # leaving here but excluding from tests because IAM Policy grammar dictates that it must be a list if not isinstance(statement_structure, list): # pragma: no cover statement_structure = [statement_structure] @@ -28,11 +36,6 @@ def __init__(self, policy: Dict[str, Any]) -> None: for statement in statement_structure: self.statements.append(AssumeRoleStatement(statement)) - @property - def json(self) -> Dict[str, Any]: - """Return the AssumeRole Policy in JSON""" - return self.policy - @property def role_assumable_by_compute_services(self) -> List[str]: """Determines whether or not the role is assumed from a compute service, and if so which ones.""" @@ -45,17 +48,13 @@ def role_assumable_by_compute_services(self) -> List[str]: return assumable_by_compute_services -class AssumeRoleStatement: +class AssumeRoleStatement(ResourceStatement): """ Statements in an AssumeRole/Trust Policy document """ def __init__(self, statement: Dict[str, Any]) -> None: - self.json = statement - self.statement = statement - self.effect = statement["Effect"] - self.actions = self._assume_role_actions() - self.principals = self._principals() + super().__init__(statement=statement) # self.not_principal = statement.get("NotPrincipal") if statement.get("NotPrincipal"): @@ -76,49 +75,6 @@ def _assume_role_actions(self) -> List[str]: return [actions] - def _principals(self) -> List[str]: - """Extracts all principals from IAM statement. - Should handle these cases: - "Principal": "value" - "Principal": ["value"] - "Principal": { "AWS": "value" } - "Principal": { "AWS": ["value", "value"] } - "Principal": { "Federated": "value" } - "Principal": { "Federated": ["value", "value"] } - "Principal": { "Service": "value" } - "Principal": { "Service": ["value", "value"] } - Return: Set of principals - """ - principals: List[str] = [] - principal = self.statement.get("Principal", None) - if not principal: - # It is possible not to define a principal, AWS ignores these statements. - return principals # pragma: no cover - - if isinstance(principal, dict): - - if "AWS" in principal: - if isinstance(principal["AWS"], list): - principals.extend(principal["AWS"]) - else: - principals.append(principal["AWS"]) - - if "Federated" in principal: - if isinstance(principal["Federated"], list): - principals.extend(principal["Federated"]) - else: - principals.append(principal["Federated"]) - - if "Service" in principal: - if isinstance(principal["Service"], list): - principals.extend(principal["Service"]) - else: - principals.append(principal["Service"]) - else: - principals.append(principal) - # principals = list(principals).sort() - return principals - @property def role_assumable_by_compute_services(self) -> List[str]: """Determines whether or not the role is assumed from a compute service, and if so which ones.""" diff --git a/cloudsplaining/scan/resource_policy_document.py b/cloudsplaining/scan/resource_policy_document.py new file mode 100644 index 00000000..96d3354a --- /dev/null +++ b/cloudsplaining/scan/resource_policy_document.py @@ -0,0 +1,260 @@ +"""Represents the Resource-based policy""" +from __future__ import annotations + +import logging +import re +from typing import Any + +from policy_sentry.util.arns import ARN + + +CONDITION_KEY_CATEGORIES = { + "aws:sourcearn": "arn", + "aws:principalarn": "arn", + "aws:sourceowner": "account", + "aws:sourceaccount": "account", + "aws:principalaccount": "account", + "aws:principalorgid": "organization", + "aws:principalorgpaths": "organization", + "kms:calleraccount": "account", + "aws:userid": "userid", + "aws:sourceip": "cidr", + "aws:sourcevpc": "vpc", + "aws:sourcevpce": "vpce", + # a key for SAML Federation trust policy. + # https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-idp_saml.html + # https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_providers_create_saml_assertions.html + "saml:aud": "saml-endpoint", +} +RELEVANT_CONDITION_OPERATORS_PATTERN = re.compile( + "((ForAllValues|ForAnyValue):)?(ARN(Equals|Like)|String(Equals|Like)(IgnoreCase)?|IpAddress)(IfExists)?", + re.IGNORECASE, +) + +logger = logging.getLogger(__name__) + + +class ResourcePolicyDocument: + """Holds the Resource Policy document""" + + def __init__(self, policy: dict[str, Any]) -> None: + statement_structure = policy.get("Statement", []) + self.policy = policy + self.statements = [] + # leaving here but excluding from tests because IAM Policy grammar dictates that it must be a list + if not isinstance(statement_structure, list): # pragma: no cover + statement_structure = [statement_structure] + + for statement in statement_structure: + self.statements.append(ResourceStatement(statement)) + + @property + def json(self) -> dict[str, Any]: + """Return the Resource Policy in JSON""" + return self.policy + + @property + def internet_accessible_actions(self) -> list[str]: + result = [] + for statement in self.statements: + actions = statement.internet_accessible_actions + if actions: + result.extend(actions) + + return result + + +class ResourceStatement: + """Statements in a Resource Policy document""" + + def __init__(self, statement: dict[str, Any]) -> None: + self.json = statement + self.statement = statement + self.effect = statement["Effect"] + self.actions = self._actions() + self.principals = self._principals() + self.conditions = self._conditions() + + def _actions(self) -> list[str]: + """Extracts all actions""" + actions = self.statement.get("Action", []) + if not actions: + return [] + + if isinstance(actions, list): + return actions + + return [actions] + + def _principals(self) -> list[str]: + """Extracts all principals from IAM statement. + Should handle these cases: + "Principal": "value" + "Principal": ["value"] + "Principal": { "AWS": "value" } + "Principal": { "AWS": ["value", "value"] } + "Principal": { "Federated": "value" } + "Principal": { "Federated": ["value", "value"] } + "Principal": { "Service": "value" } + "Principal": { "Service": ["value", "value"] } + Return: Set of principals + """ + principals: list[str] = [] + principal = self.statement.get("Principal", None) + if not principal: + # It is possible not to define a principal, AWS ignores these statements. + return principals # pragma: no cover + + if isinstance(principal, dict): + if "AWS" in principal: + if isinstance(principal["AWS"], list): + principals.extend(principal["AWS"]) + else: + principals.append(principal["AWS"]) + + if "Federated" in principal: + if isinstance(principal["Federated"], list): + principals.extend(principal["Federated"]) + else: + principals.append(principal["Federated"]) + + if "Service" in principal: + if isinstance(principal["Service"], list): + principals.extend(principal["Service"]) + else: + principals.append(principal["Service"]) + else: + principals.append(principal) + + return principals + + # Adapted version of policyuniverse's _condition_entries, here: + # https://github.com/Netflix-Skunkworks/policyuniverse/blob/master/policyuniverse/statement.py#L146 + def _conditions(self) -> list[tuple[str, Any]]: + """Extracts any ARNs, Account Numbers, UserIDs, Usernames, CIDRs, VPCs, and VPC Endpoints from a condition block. + + Ignores any negated condition operators like StringNotLike. + Ignores weak condition keys like referer, date, etc. + + Reason: A condition is meant to limit the principal in a statement. Often, resource policies use a wildcard principal + and rely exclusively on the Condition block to limit access. + + We would want to alert if the Condition had no limitations (like a non-existent Condition block), or very weak + limitations. Any negation would be weak, and largely equivelant to having no condition block whatsoever. + + The alerting code that relies on this data must ensure the condition has at least one of the following: + - A limiting ARN + - Account Identifier + - AWS Organization Principal Org ID + - User ID + - Source IP / CIDR + - VPC + - VPC Endpoint + + https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html + """ + + conditions: list[tuple[str, Any]] = [] + condition = self.statement.get("Condition") + if not condition: + return conditions + + for condition_operator, condition_context in condition.items(): + if RELEVANT_CONDITION_OPERATORS_PATTERN.match(condition_operator): + for key, value in condition_context.items(): + key_lower = key.lower() + if key_lower in CONDITION_KEY_CATEGORIES: + if isinstance(value, list): + conditions.extend( + (CONDITION_KEY_CATEGORIES[key_lower], v) for v in value + ) + else: + conditions.append( + (CONDITION_KEY_CATEGORIES[key_lower], value) + ) + + return conditions + + @property + def internet_accessible_actions(self) -> list[str]: + """Determines whether the actions can be used by everyone""" + + # compared to policyuniverse's implementation, + # there is no need to check for the existence of 'NotPrincipal', + # because it is not support with self.effect == "Allow" + if self.effect == "Deny": + return [] + + for entry in self.conditions: + if self._is_condition_entry_internet_accessible(entry=entry): + return self.actions + + if self.conditions: + # this means we have conditions, but they protect the policy to be accessible by everyone + return [] + + for principal in self.principals: + if self._arn_internet_accessible(arn=principal): + return self.actions + + return [] + + # Adapted version of policyuniverse's _is_condition_entry_internet_accessible and the called methods, here: + # https://github.com/Netflix-Skunkworks/policyuniverse/blob/master/policyuniverse/statement.py#L301 + # and onwards + def _is_condition_entry_internet_accessible(self, entry: tuple[str, Any]) -> bool: + category, condition_value = entry + + if category == "arn": + return self._arn_internet_accessible(arn=condition_value) + elif category == "cidr": + return self._cidr_internet_accessible(cidr=condition_value) + elif category == "organization": + return self._organization_internet_accessible(org=condition_value) + elif category == "userid": + return self._userid_internet_accessible(userid=condition_value) + + return "*" in condition_value + + def _arn_internet_accessible(self, arn: str) -> bool: + if "*" == arn: + return True + + if not arn.startswith("arn:"): + # probably an account ID or AWS service + return False + + try: + parsed_arn = ARN(provided_arn=arn) + except Exception: + logger.info(f"ARN {arn} is not parsable") + return "*" in arn + + if parsed_arn.service_prefix == "s3": + # S3 ARNs don't have account numbers + return False + + if not parsed_arn.account and not parsed_arn.service_prefix: + logger.info(f"ARN {arn} is not valid") + return True + + if parsed_arn.account == "*": + return True + + return False + + def _cidr_internet_accessible(self, cidr: str) -> bool: + return cidr.endswith("/0") + + def _organization_internet_accessible(self, org: str) -> bool: + if "o-*" in org: + return True + return False + + def _userid_internet_accessible(self, userid: str) -> bool: + # Trailing wildcards are okay for user IDs: + # AROAIIIIIIIIIIIIIIIII:* + if userid.find("*") == len(userid) - 1: + # note: this will also return False for a zero-length userid + return False + return True diff --git a/test/scanning/test_resource_policy_document.py b/test/scanning/test_resource_policy_document.py new file mode 100644 index 00000000..481b4b63 --- /dev/null +++ b/test/scanning/test_resource_policy_document.py @@ -0,0 +1,146 @@ +from cloudsplaining.scan.resource_policy_document import ResourcePolicyDocument + + +def test_resource_policy_document(): + # given + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Principal": {"AWS": "*"}, + "Effect": "Allow", + "Action": [ + "SNS:Subscribe", + "SNS:Publish", + ], + "Resource": "*", + } + ], + } + + # when + policy_doc = ResourcePolicyDocument(policy=policy) + + # then + assert policy_doc.internet_accessible_actions == ["SNS:Subscribe", "SNS:Publish"] + + +def test_resource_policy_document_restricted_by_arn(): + # given + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Principal": {"AWS": "*"}, + "Effect": "Allow", + "Action": [ + "SNS:Subscribe", + "SNS:Publish", + ], + "Resource": "*", + "Condition": { + "ForAnyValue:ARNEquals": { + "AWS:SourceArn": [ + "arn:aws:iam::012345678910:role/SomeTestRoleForTesting", + "arn:aws:iam::012345678910:role/OtherRole", + ] + }, + }, + } + ], + } + + # when + policy_doc = ResourcePolicyDocument(policy=policy) + + # then + assert policy_doc.internet_accessible_actions == [] + + +def test_resource_policy_document_restricted_by_cidr(): + # given + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Principal": {"AWS": "*"}, + "Effect": "Allow", + "Action": [ + "SNS:Subscribe", + "SNS:Publish", + ], + "Resource": "*", + "Condition": { + "IpAddress": { + "AWS:SourceIP": [ + "10.0.0.0/16", + ] + } + }, + } + ], + } + + # when + policy_doc = ResourcePolicyDocument(policy=policy) + + # then + assert policy_doc.internet_accessible_actions == [] + + +def test_resource_policy_document_restricted_by_org_id(): + # given + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Principal": {"AWS": "*"}, + "Effect": "Allow", + "Action": [ + "SNS:Subscribe", + "SNS:Publish", + ], + "Resource": "*", + "Condition": { + "StringEquals": { + "AWS:PrincipalOrgID": "o-xxxxxxxxxx", + } + }, + } + ], + } + + # when + policy_doc = ResourcePolicyDocument(policy=policy) + + # then + assert policy_doc.internet_accessible_actions == [] + + +def test_resource_policy_document_restricted_by_user_id(): + # given + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Principal": {"AWS": "*"}, + "Effect": "Allow", + "Action": [ + "SNS:Subscribe", + "SNS:Publish", + ], + "Resource": "*", + "Condition": { + "StringLike": { + "AWS:userid": "AROAI1111111111111111:*", + } + }, + } + ], + } + + # when + policy_doc = ResourcePolicyDocument(policy=policy) + + # then + assert policy_doc.internet_accessible_actions == []