Skip to content

Commit

Permalink
feat(sns): sns topics no http subscriptions (#4095)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergio <[email protected]>
  • Loading branch information
Davidm4r and MrCloudSec authored May 28, 2024
1 parent 533f7cb commit 98b7df6
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 0 deletions.
39 changes: 39 additions & 0 deletions prowler/providers/aws/services/sns/sns_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self, provider):
self.__threading_call__(self.__list_topics__)
self.__get_topic_attributes__(self.regional_clients)
self.__list_tags_for_resource__()
self.__list_subscriptions_by_topic__()

def __list_topics__(self, regional_client):
logger.info("SNS - listing topics...")
Expand Down Expand Up @@ -74,6 +75,43 @@ def __list_tags_for_resource__(self):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def __list_subscriptions_by_topic__(self):
logger.info("SNS - Listing subscriptions by topic...")
try:
for topic in self.topics:
try:
regional_client = self.regional_clients[topic.region]
response = regional_client.list_subscriptions_by_topic(
TopicArn=topic.arn
)
subscriptions: list[Subscription] = [
Subscription(
id=sub["SubscriptionArn"].split(":")[-1],
arn=sub["SubscriptionArn"],
owner=sub["Owner"],
protocol=sub["Protocol"],
endpoint=sub["Endpoint"],
)
for sub in response["Subscriptions"]
]
topic.subscriptions = subscriptions
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)


class Subscription(BaseModel):
id: str
arn: str
owner: str
protocol: str
endpoint: str


class Topic(BaseModel):
name: str
Expand All @@ -82,3 +120,4 @@ class Topic(BaseModel):
policy: dict = None
kms_master_key_id: str = None
tags: Optional[list] = []
subscriptions: Optional[list[Subscription]] = []
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "sns_subscription_not_using_http_endpoints",
"CheckTitle": "Ensure there are no SNS subscriptions using HTTP endpoints",
"CheckType": [],
"ServiceName": "sns",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:sns:region:account-id:topic",
"Severity": "high",
"ResourceType": "AwsSNSTopic",
"Description": "Ensure there are no SNS subscriptions using HTTP endpoints",
"Risk": "When you use HTTPS, messages are automatically encrypted during transit, even if the SNS topic itself isn't encrypted. Without HTTPS, a network-based attacker can eavesdrop on network traffic or manipulate it using an attack such as man-in-the-middle.",
"RelatedUrl": "https://docs.aws.amazon.com/sns/latest/dg/sns-security-best-practices.html#enforce-encryption-data-in-transit",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "To enforce only encrypted connections over HTTPS, add the aws:SecureTransport condition in the IAM policy that's attached to unencrypted SNS topics. This forces message publishers to use HTTPS instead of HTTP",
"Url": "https://docs.aws.amazon.com/sns/latest/dg/sns-security-best-practices.html#enforce-encryption-data-in-transit"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.sns.sns_client import sns_client


class sns_subscription_not_using_http_endpoints(Check):
def execute(self):
findings = []
for topic in sns_client.topics:
for subscription in topic.subscriptions:
if subscription.arn == "PendingConfirmation":
continue
report = Check_Report_AWS(self.metadata())
report.region = topic.region
report.resource_id = subscription.id
report.resource_arn = subscription.arn
report.resource_tags = topic.tags
report.resource_details = topic.arn
report.status = "PASS"
report.status_extended = (
f"Subscription {subscription.arn} is using an HTTPS endpoint."
)

if subscription.protocol == "http":
report.status = "FAIL"
report.status_extended = (
f"Subscription {subscription.arn} is using an HTTP endpoint."
)

findings.append(report)

return findings
25 changes: 25 additions & 0 deletions tests/providers/aws/services/sns/sns_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,28 @@ def test__get_topic_attributes__(self):
assert sns.topics[0].region == AWS_REGION_EU_WEST_1
assert sns.topics[0].policy
assert sns.topics[0].kms_master_key_id == kms_key_id

@mock_aws
def test__list_subscriptions_by_topic__(self):
sns_client = client("sns", region_name=AWS_REGION_EU_WEST_1)
topic_response = sns_client.create_topic(Name=topic_name)
topic_arn = topic_response["TopicArn"]

# Create subscriptions for the topic
sns_client.subscribe(
TopicArn=topic_arn, Protocol="http", Endpoint="http://www.endpoint.com"
)
sns_client.subscribe(
TopicArn=topic_arn, Protocol="https", Endpoint="https://www.endpoint.com"
)

aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
sns = SNS(aws_provider)

assert len(sns.topics) == 1
assert sns.topics[0].arn == topic_arn
assert len(sns.topics[0].subscriptions) == 2
assert sns.topics[0].subscriptions[0].protocol == "http"
assert sns.topics[0].subscriptions[1].protocol == "https"
assert sns.topics[0].subscriptions[0].endpoint == "http://www.endpoint.com"
assert sns.topics[0].subscriptions[1].endpoint == "https://www.endpoint.com"
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
from unittest import mock
from uuid import uuid4

from prowler.providers.aws.services.sns.sns_service import Subscription, Topic
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1

kms_key_id = str(uuid4())
topic_name = "test-topic"
topic_arn = f"arn:aws:sns:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{topic_name}"
subscription_id_1 = str(uuid4())
subscription_id_2 = str(uuid4())
subscription_arn_1 = f"{topic_arn}:{subscription_id_1}"
subscription_arn_2 = f"{topic_arn}:{subscription_id_2}"


class Test_sns_subscription_not_using_http_endpoints:
def test_no_topics(self):
sns_client = mock.MagicMock
sns_client.topics = []
with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_subscription_not_using_http_endpoints.sns_subscription_not_using_http_endpoints import (
sns_subscription_not_using_http_endpoints,
)

check = sns_subscription_not_using_http_endpoints()
result = check.execute()
assert len(result) == 0

def test_no_subscriptions(self):
sns_client = mock.MagicMock
subscriptions = []
sns_client.topics = []
sns_client.topics.append(
Topic(
arn=topic_arn,
name=topic_name,
kms_master_key_id=kms_key_id,
region=AWS_REGION_EU_WEST_1,
subscriptions=subscriptions,
)
)

with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_subscription_not_using_http_endpoints.sns_subscription_not_using_http_endpoints import (
sns_subscription_not_using_http_endpoints,
)

check = sns_subscription_not_using_http_endpoints()
result = check.execute()
assert len(result) == 0

def test_subscriptions_with_pending_confirmation(self):
sns_client = mock.MagicMock
subscriptions = []
subscriptions.append(
Subscription(
id="PendingConfirmation",
arn="PendingConfirmation",
owner=AWS_ACCOUNT_NUMBER,
protocol="https",
endpoint="https://www.endpoint.com",
)
)
sns_client.topics = []
sns_client.topics.append(
Topic(
arn=topic_arn,
name=topic_name,
kms_master_key_id=kms_key_id,
region=AWS_REGION_EU_WEST_1,
subscriptions=subscriptions,
)
)

with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_subscription_not_using_http_endpoints.sns_subscription_not_using_http_endpoints import (
sns_subscription_not_using_http_endpoints,
)

check = sns_subscription_not_using_http_endpoints()
result = check.execute()
assert len(result) == 0

def test_subscriptions_with_https(self):
sns_client = mock.MagicMock
subscriptions = []
subscriptions.append(
Subscription(
id=subscription_id_1,
arn=subscription_arn_1,
owner=AWS_ACCOUNT_NUMBER,
protocol="https",
endpoint="https://www.endpoint.com",
)
)
sns_client.topics = []
sns_client.topics.append(
Topic(
arn=topic_arn,
name=topic_name,
kms_master_key_id=kms_key_id,
region=AWS_REGION_EU_WEST_1,
subscriptions=subscriptions,
)
)

with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_subscription_not_using_http_endpoints.sns_subscription_not_using_http_endpoints import (
sns_subscription_not_using_http_endpoints,
)

check = sns_subscription_not_using_http_endpoints()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Subscription {subscription_arn_1} is using an HTTPS endpoint."
)
assert result[0].resource_id == subscription_id_1
assert result[0].resource_arn == subscription_arn_1

def test_subscriptions_with_http(self):
sns_client = mock.MagicMock
subscriptions = []
subscriptions.append(
Subscription(
id=subscription_id_2,
arn=subscription_arn_2,
owner=AWS_ACCOUNT_NUMBER,
protocol="http",
endpoint="http://www.endpoint.com",
)
)
sns_client.topics = []
sns_client.topics.append(
Topic(
arn=topic_arn,
name=topic_name,
kms_master_key_id=kms_key_id,
region=AWS_REGION_EU_WEST_1,
subscriptions=subscriptions,
)
)

with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_subscription_not_using_http_endpoints.sns_subscription_not_using_http_endpoints import (
sns_subscription_not_using_http_endpoints,
)

check = sns_subscription_not_using_http_endpoints()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Subscription {subscription_arn_2} is using an HTTP endpoint."
)
assert result[0].resource_id == subscription_id_2
assert result[0].resource_arn == subscription_arn_2

def test_subscriptions_with_http_and_https(self):
sns_client = mock.MagicMock
subscriptions = []
subscriptions.append(
Subscription(
id=subscription_id_1,
arn=subscription_arn_1,
owner=AWS_ACCOUNT_NUMBER,
protocol="https",
endpoint="https://www.endpoint.com",
)
)
subscriptions.append(
Subscription(
id=subscription_id_2,
arn=subscription_arn_2,
owner=AWS_ACCOUNT_NUMBER,
protocol="http",
endpoint="http://www.endpoint.com",
)
)
sns_client.topics = []
sns_client.topics.append(
Topic(
arn=topic_arn,
name=topic_name,
kms_master_key_id=kms_key_id,
region=AWS_REGION_EU_WEST_1,
subscriptions=subscriptions,
)
)

with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_subscription_not_using_http_endpoints.sns_subscription_not_using_http_endpoints import (
sns_subscription_not_using_http_endpoints,
)

check = sns_subscription_not_using_http_endpoints()
result = check.execute()
assert len(result) == 2
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Subscription {subscription_arn_1} is using an HTTPS endpoint."
)
assert result[0].resource_id == subscription_id_1
assert result[0].resource_arn == subscription_arn_1

assert result[1].status == "FAIL"
assert (
result[1].status_extended
== f"Subscription {subscription_arn_2} is using an HTTP endpoint."
)
assert result[1].resource_id == subscription_id_2
assert result[1].resource_arn == subscription_arn_2

0 comments on commit 98b7df6

Please sign in to comment.