Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create ResourcePolicyDocument with internet reachable check #321

Merged
merged 1 commit into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 15 additions & 59 deletions cloudsplaining/scan/assume_role_policy_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,38 @@
# Licensed under the BSD 3-Clause license.
# For full license text, see the LICENSE file in the repo root
# or https://opensource.org/licenses/BSD-3-Clause
from __future__ import annotations

import logging
from typing import Dict, Any, List

from cloudsplaining.scan.resource_policy_document import (
ResourcePolicyDocument,
ResourceStatement,
)
from cloudsplaining.shared.constants import SERVICE_PREFIXES_WITH_COMPUTE_ROLES

logger = logging.getLogger(__name__)


class AssumeRolePolicyDocument:
"""
Holds the AssumeRole/Trust Policy document
class AssumeRolePolicyDocument(ResourcePolicyDocument):
"""Holds the AssumeRole/Trust Policy document

It is a specialized version of a Resource-based policy
"""

def __init__(self, policy: Dict[str, Any]) -> None:
def __init__(self, policy: dict[str, Any]) -> None:
statement_structure = policy.get("Statement", [])
self.policy = policy
self.statements = []
# We would actually need to define a proper base class with a generic type for statements
self.statements: list[AssumeRoleStatement] = [] # type:ignore[assignment]
# leaving here but excluding from tests because IAM Policy grammar dictates that it must be a list
if not isinstance(statement_structure, list): # pragma: no cover
statement_structure = [statement_structure]

for statement in statement_structure:
self.statements.append(AssumeRoleStatement(statement))

@property
def json(self) -> Dict[str, Any]:
"""Return the AssumeRole Policy in JSON"""
return self.policy

@property
def role_assumable_by_compute_services(self) -> List[str]:
"""Determines whether or not the role is assumed from a compute service, and if so which ones."""
Expand All @@ -45,17 +48,13 @@ def role_assumable_by_compute_services(self) -> List[str]:
return assumable_by_compute_services


class AssumeRoleStatement:
class AssumeRoleStatement(ResourceStatement):
"""
Statements in an AssumeRole/Trust Policy document
"""

def __init__(self, statement: Dict[str, Any]) -> None:
self.json = statement
self.statement = statement
self.effect = statement["Effect"]
self.actions = self._assume_role_actions()
self.principals = self._principals()
super().__init__(statement=statement)

# self.not_principal = statement.get("NotPrincipal")
if statement.get("NotPrincipal"):
Expand All @@ -76,49 +75,6 @@ def _assume_role_actions(self) -> List[str]:

return [actions]

def _principals(self) -> List[str]:
"""Extracts all principals from IAM statement.
Should handle these cases:
"Principal": "value"
"Principal": ["value"]
"Principal": { "AWS": "value" }
"Principal": { "AWS": ["value", "value"] }
"Principal": { "Federated": "value" }
"Principal": { "Federated": ["value", "value"] }
"Principal": { "Service": "value" }
"Principal": { "Service": ["value", "value"] }
Return: Set of principals
"""
principals: List[str] = []
principal = self.statement.get("Principal", None)
if not principal:
# It is possible not to define a principal, AWS ignores these statements.
return principals # pragma: no cover

if isinstance(principal, dict):

if "AWS" in principal:
if isinstance(principal["AWS"], list):
principals.extend(principal["AWS"])
else:
principals.append(principal["AWS"])

if "Federated" in principal:
if isinstance(principal["Federated"], list):
principals.extend(principal["Federated"])
else:
principals.append(principal["Federated"])

if "Service" in principal:
if isinstance(principal["Service"], list):
principals.extend(principal["Service"])
else:
principals.append(principal["Service"])
else:
principals.append(principal)
# principals = list(principals).sort()
return principals

@property
def role_assumable_by_compute_services(self) -> List[str]:
"""Determines whether or not the role is assumed from a compute service, and if so which ones."""
Expand Down
260 changes: 260 additions & 0 deletions cloudsplaining/scan/resource_policy_document.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
"""Represents the Resource-based policy"""
from __future__ import annotations

import logging
import re
from typing import Any

from policy_sentry.util.arns import ARN


CONDITION_KEY_CATEGORIES = {
"aws:sourcearn": "arn",
"aws:principalarn": "arn",
"aws:sourceowner": "account",
"aws:sourceaccount": "account",
"aws:principalaccount": "account",
"aws:principalorgid": "organization",
"aws:principalorgpaths": "organization",
"kms:calleraccount": "account",
"aws:userid": "userid",
"aws:sourceip": "cidr",
"aws:sourcevpc": "vpc",
"aws:sourcevpce": "vpce",
# a key for SAML Federation trust policy.
# https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-idp_saml.html
# https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_providers_create_saml_assertions.html
"saml:aud": "saml-endpoint",
}
RELEVANT_CONDITION_OPERATORS_PATTERN = re.compile(
"((ForAllValues|ForAnyValue):)?(ARN(Equals|Like)|String(Equals|Like)(IgnoreCase)?|IpAddress)(IfExists)?",
re.IGNORECASE,
)

logger = logging.getLogger(__name__)


class ResourcePolicyDocument:
"""Holds the Resource Policy document"""

def __init__(self, policy: dict[str, Any]) -> None:
statement_structure = policy.get("Statement", [])
self.policy = policy
self.statements = []
# leaving here but excluding from tests because IAM Policy grammar dictates that it must be a list
if not isinstance(statement_structure, list): # pragma: no cover
statement_structure = [statement_structure]

for statement in statement_structure:
self.statements.append(ResourceStatement(statement))

@property
def json(self) -> dict[str, Any]:
"""Return the Resource Policy in JSON"""
return self.policy

@property
def internet_accessible_actions(self) -> list[str]:
result = []
for statement in self.statements:
actions = statement.internet_accessible_actions
if actions:
result.extend(actions)

return result


class ResourceStatement:
"""Statements in a Resource Policy document"""

def __init__(self, statement: dict[str, Any]) -> None:
self.json = statement
self.statement = statement
self.effect = statement["Effect"]
self.actions = self._actions()
self.principals = self._principals()
self.conditions = self._conditions()

def _actions(self) -> list[str]:
"""Extracts all actions"""
actions = self.statement.get("Action", [])
if not actions:
return []

if isinstance(actions, list):
return actions

return [actions]

def _principals(self) -> list[str]:
"""Extracts all principals from IAM statement.
Should handle these cases:
"Principal": "value"
"Principal": ["value"]
"Principal": { "AWS": "value" }
"Principal": { "AWS": ["value", "value"] }
"Principal": { "Federated": "value" }
"Principal": { "Federated": ["value", "value"] }
"Principal": { "Service": "value" }
"Principal": { "Service": ["value", "value"] }
Return: Set of principals
"""
principals: list[str] = []
principal = self.statement.get("Principal", None)
if not principal:
# It is possible not to define a principal, AWS ignores these statements.
return principals # pragma: no cover

if isinstance(principal, dict):
if "AWS" in principal:
if isinstance(principal["AWS"], list):
principals.extend(principal["AWS"])
else:
principals.append(principal["AWS"])

if "Federated" in principal:
if isinstance(principal["Federated"], list):
principals.extend(principal["Federated"])
else:
principals.append(principal["Federated"])

if "Service" in principal:
if isinstance(principal["Service"], list):
principals.extend(principal["Service"])
else:
principals.append(principal["Service"])
else:
principals.append(principal)

return principals

# Adapted version of policyuniverse's _condition_entries, here:
# https://github.com/Netflix-Skunkworks/policyuniverse/blob/master/policyuniverse/statement.py#L146
def _conditions(self) -> list[tuple[str, Any]]:
"""Extracts any ARNs, Account Numbers, UserIDs, Usernames, CIDRs, VPCs, and VPC Endpoints from a condition block.

Ignores any negated condition operators like StringNotLike.
Ignores weak condition keys like referer, date, etc.

Reason: A condition is meant to limit the principal in a statement. Often, resource policies use a wildcard principal
and rely exclusively on the Condition block to limit access.

We would want to alert if the Condition had no limitations (like a non-existent Condition block), or very weak
limitations. Any negation would be weak, and largely equivelant to having no condition block whatsoever.

The alerting code that relies on this data must ensure the condition has at least one of the following:
- A limiting ARN
- Account Identifier
- AWS Organization Principal Org ID
- User ID
- Source IP / CIDR
- VPC
- VPC Endpoint

https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html
"""

conditions: list[tuple[str, Any]] = []
condition = self.statement.get("Condition")
if not condition:
return conditions

for condition_operator, condition_context in condition.items():
if RELEVANT_CONDITION_OPERATORS_PATTERN.match(condition_operator):
for key, value in condition_context.items():
key_lower = key.lower()
if key_lower in CONDITION_KEY_CATEGORIES:
if isinstance(value, list):
conditions.extend(
(CONDITION_KEY_CATEGORIES[key_lower], v) for v in value
)
else:
conditions.append(
(CONDITION_KEY_CATEGORIES[key_lower], value)
)

return conditions

@property
def internet_accessible_actions(self) -> list[str]:
"""Determines whether the actions can be used by everyone"""

# compared to policyuniverse's implementation,
# there is no need to check for the existence of 'NotPrincipal',
# because it is not support with self.effect == "Allow"
if self.effect == "Deny":
return []

for entry in self.conditions:
if self._is_condition_entry_internet_accessible(entry=entry):
return self.actions

if self.conditions:
# this means we have conditions, but they protect the policy to be accessible by everyone
return []

for principal in self.principals:
if self._arn_internet_accessible(arn=principal):
return self.actions

return []

# Adapted version of policyuniverse's _is_condition_entry_internet_accessible and the called methods, here:
# https://github.com/Netflix-Skunkworks/policyuniverse/blob/master/policyuniverse/statement.py#L301
# and onwards
def _is_condition_entry_internet_accessible(self, entry: tuple[str, Any]) -> bool:
category, condition_value = entry

if category == "arn":
return self._arn_internet_accessible(arn=condition_value)
elif category == "cidr":
return self._cidr_internet_accessible(cidr=condition_value)
elif category == "organization":
return self._organization_internet_accessible(org=condition_value)
elif category == "userid":
return self._userid_internet_accessible(userid=condition_value)

return "*" in condition_value

def _arn_internet_accessible(self, arn: str) -> bool:
if "*" == arn:
return True

if not arn.startswith("arn:"):
# probably an account ID or AWS service
return False

try:
parsed_arn = ARN(provided_arn=arn)
except Exception:
logger.info(f"ARN {arn} is not parsable")
return "*" in arn

if parsed_arn.service_prefix == "s3":
# S3 ARNs don't have account numbers
return False

if not parsed_arn.account and not parsed_arn.service_prefix:
logger.info(f"ARN {arn} is not valid")
return True

if parsed_arn.account == "*":
return True

return False

def _cidr_internet_accessible(self, cidr: str) -> bool:
return cidr.endswith("/0")

def _organization_internet_accessible(self, org: str) -> bool:
if "o-*" in org:
return True
return False

def _userid_internet_accessible(self, userid: str) -> bool:
# Trailing wildcards are okay for user IDs:
# AROAIIIIIIIIIIIIIIIII:*
if userid.find("*") == len(userid) - 1:
# note: this will also return False for a zero-length userid
return False
return True
Loading