Skip to content

Commit

Permalink
feat(dms): add new check `dms_endpoint_neptune_iam_authorization_enab…
Browse files Browse the repository at this point in the history
…led` (#5549)

Co-authored-by: Sergio Garcia <[email protected]>
  • Loading branch information
danibarranqueroo and MrCloudSec authored Nov 5, 2024
1 parent 6ff1c43 commit ea03808
Show file tree
Hide file tree
Showing 10 changed files with 487 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "dms_endpoint_neptune_iam_authorization_enabled",
"CheckTitle": "Check if DMS endpoints for Neptune databases have IAM authorization enabled.",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "dms",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:dms:region:account-id:endpoint/endpoint-id",
"Severity": "medium",
"ResourceType": "AwsDmsEndpoint",
"Description": "This control checks whether an AWS DMS endpoint for an Amazon Neptune database is configured with IAM authorization. The control fails if the DMS endpoint doesn't have IAM authorization enabled.",
"Risk": "Without IAM authorization, DMS endpoints for Neptune databases may lack granular access control, increasing the risk of unauthorized access to sensitive data.",
"RelatedUrl": "https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Neptune.html",
"Remediation": {
"Code": {
"CLI": "aws dms modify-endpoint --endpoint-arn <endpoint-arn> --service-access-role-arn <iam-role-arn>",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/dms-controls.html#dms-10",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable IAM authorization on DMS endpoints for Neptune databases by specifying a service role in the ServiceAccessRoleARN parameter.",
"Url": "https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Neptune.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import List

from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.dms.dms_client import dms_client


class dms_endpoint_neptune_iam_authorization_enabled(Check):
"""
Check if AWS DMS Endpoints for Neptune have IAM authorization enabled.
This class verifies whether each AWS DMS Endpoint configured for Neptune has IAM authorization enabled
by checking the `NeptuneSettings.IamAuthEnabled` property in the endpoint's configuration.
"""

def execute(self) -> List[Check_Report_AWS]:
"""
Execute the DMS Neptune IAM authorization enabled check.
Iterates over all DMS Endpoints and generates a report indicating whether
each Neptune endpoint has IAM authorization enabled.
Returns:
List[Check_Report_AWS]: A list of report objects with the results of the check.
"""
findings = []
for endpoint_arn, endpoint in dms_client.endpoints.items():
if endpoint.engine_name == "neptune":
report = Check_Report_AWS(self.metadata())
report.resource_id = endpoint.id
report.resource_arn = endpoint_arn
report.region = endpoint.region
report.resource_tags = endpoint.tags
report.status = "FAIL"
report.status_extended = f"DMS Endpoint {endpoint.id} for Neptune databases does not have IAM authorization enabled."
if endpoint.neptune_iam_auth_enabled:
report.status = "PASS"
report.status_extended = f"DMS Endpoint {endpoint.id} for Neptune databases has IAM authorization enabled."

findings.append(report)

return findings
8 changes: 7 additions & 1 deletion prowler/providers/aws/services/dms/dms_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ def _describe_endpoints(self, regional_client):
id=endpoint["EndpointIdentifier"],
region=regional_client.region,
ssl_mode=endpoint.get("SslMode", False),
neptune_iam_auth_enabled=endpoint.get(
"NeptuneSettings", {}
).get("IamAuthEnabled", False),
engine_name=endpoint["EngineName"],
)
except Exception as error:
logger.error(
Expand All @@ -94,6 +98,8 @@ class Endpoint(BaseModel):
region: str
ssl_mode: str
tags: Optional[list]
neptune_iam_auth_enabled: bool = False
engine_name: str


class RepInstance(BaseModel):
Expand All @@ -106,4 +112,4 @@ class RepInstance(BaseModel):
security_groups: list[str] = []
multi_az: bool
region: str
tags: Optional[list]
tags: Optional[list] = []
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
from unittest import mock

import botocore
from boto3 import client
from moto import mock_aws

from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)

make_api_call = botocore.client.BaseClient._make_api_call

DMS_ENDPOINT_NAME = "dms-endpoint"
DMS_ENDPOINT_ARN = f"arn:aws:dms:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:endpoint:{DMS_ENDPOINT_NAME}"
DMS_INSTANCE_NAME = "rep-instance"
DMS_INSTANCE_ARN = (
f"arn:aws:dms:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:rep:{DMS_INSTANCE_NAME}"
)


def mock_make_api_call_enabled_not_neptune(self, operation_name, kwarg):
if operation_name == "DescribeEndpoints":
return {
"Endpoints": [
{
"EndpointIdentifier": DMS_ENDPOINT_NAME,
"EndpointArn": DMS_ENDPOINT_ARN,
"SslMode": "require",
"NeptuneSettings": {
"IamAuthEnabled": True,
},
"EngineName": "oracle",
}
]
}
elif operation_name == "ListTagsForResource":
if kwarg["ResourceArn"] == DMS_INSTANCE_ARN:
return {
"TagList": [
{"Key": "Name", "Value": "rep-instance"},
{"Key": "Owner", "Value": "admin"},
]
}
elif kwarg["ResourceArn"] == DMS_ENDPOINT_ARN:
return {
"TagList": [
{"Key": "Name", "Value": "dms-endpoint"},
{"Key": "Owner", "Value": "admin"},
]
}
return make_api_call(self, operation_name, kwarg)


def mock_make_api_call_enabled(self, operation_name, kwarg):
if operation_name == "DescribeEndpoints":
return {
"Endpoints": [
{
"EndpointIdentifier": DMS_ENDPOINT_NAME,
"EndpointArn": DMS_ENDPOINT_ARN,
"SslMode": "require",
"NeptuneSettings": {
"IamAuthEnabled": True,
},
"EngineName": "neptune",
}
]
}
elif operation_name == "ListTagsForResource":
if kwarg["ResourceArn"] == DMS_INSTANCE_ARN:
return {
"TagList": [
{"Key": "Name", "Value": "rep-instance"},
{"Key": "Owner", "Value": "admin"},
]
}
elif kwarg["ResourceArn"] == DMS_ENDPOINT_ARN:
return {
"TagList": [
{"Key": "Name", "Value": "dms-endpoint"},
{"Key": "Owner", "Value": "admin"},
]
}
return make_api_call(self, operation_name, kwarg)


def mock_make_api_call_not_enabled(self, operation_name, kwarg):
if operation_name == "DescribeEndpoints":
return {
"Endpoints": [
{
"EndpointIdentifier": DMS_ENDPOINT_NAME,
"EndpointArn": DMS_ENDPOINT_ARN,
"SslMode": "require",
"NeptuneSettings": {
"IamAuthEnabled": False,
},
"EngineName": "neptune",
}
]
}
elif operation_name == "ListTagsForResource":
if kwarg["ResourceArn"] == DMS_INSTANCE_ARN:
return {
"TagList": [
{"Key": "Name", "Value": "rep-instance"},
{"Key": "Owner", "Value": "admin"},
]
}
elif kwarg["ResourceArn"] == DMS_ENDPOINT_ARN:
return {
"TagList": [
{"Key": "Name", "Value": "dms-endpoint"},
{"Key": "Owner", "Value": "admin"},
]
}
return make_api_call(self, operation_name, kwarg)


class Test_dms_endpoint_neptune_iam_authorization_enabled:
@mock_aws
def test_no_dms_endpoints(self):
dms_client = client("dms", region_name=AWS_REGION_US_EAST_1)
dms_client.endpoints = {}

from prowler.providers.aws.services.dms.dms_service import DMS

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.dms.dms_endpoint_neptune_iam_authorization_enabled.dms_endpoint_neptune_iam_authorization_enabled.dms_client",
new=DMS(aws_provider),
):
# Test Check
from prowler.providers.aws.services.dms.dms_endpoint_neptune_iam_authorization_enabled.dms_endpoint_neptune_iam_authorization_enabled import (
dms_endpoint_neptune_iam_authorization_enabled,
)

check = dms_endpoint_neptune_iam_authorization_enabled()
result = check.execute()

assert len(result) == 0

@mock_aws
def test_dms_not_neptune_iam_auth_enabled(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_enabled_not_neptune,
):

from prowler.providers.aws.services.dms.dms_service import DMS

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.dms.dms_endpoint_neptune_iam_authorization_enabled.dms_endpoint_neptune_iam_authorization_enabled.dms_client",
new=DMS(aws_provider),
):
# Test Check
from prowler.providers.aws.services.dms.dms_endpoint_neptune_iam_authorization_enabled.dms_endpoint_neptune_iam_authorization_enabled import (
dms_endpoint_neptune_iam_authorization_enabled,
)

check = dms_endpoint_neptune_iam_authorization_enabled()
result = check.execute()

assert len(result) == 0

@mock_aws
def test_dms_neptune_iam_auth_not_enabled(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_not_enabled,
):

from prowler.providers.aws.services.dms.dms_service import DMS

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.dms.dms_endpoint_neptune_iam_authorization_enabled.dms_endpoint_neptune_iam_authorization_enabled.dms_client",
new=DMS(aws_provider),
):
# Test Check
from prowler.providers.aws.services.dms.dms_endpoint_neptune_iam_authorization_enabled.dms_endpoint_neptune_iam_authorization_enabled import (
dms_endpoint_neptune_iam_authorization_enabled,
)

check = dms_endpoint_neptune_iam_authorization_enabled()
result = check.execute()

assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
"DMS Endpoint dms-endpoint for Neptune databases does not have IAM authorization enabled."
)
assert result[0].resource_id == "dms-endpoint"
assert (
result[0].resource_arn
== "arn:aws:dms:us-east-1:123456789012:endpoint:dms-endpoint"
)
assert result[0].resource_tags == [
{
"Key": "Name",
"Value": "dms-endpoint",
},
{
"Key": "Owner",
"Value": "admin",
},
]
assert result[0].region == "us-east-1"

@mock_aws
def test_dms_neptune_iam_auth_enabled(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_enabled,
):

from prowler.providers.aws.services.dms.dms_service import DMS

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.dms.dms_endpoint_neptune_iam_authorization_enabled.dms_endpoint_neptune_iam_authorization_enabled.dms_client",
new=DMS(aws_provider),
):
# Test Check
from prowler.providers.aws.services.dms.dms_endpoint_neptune_iam_authorization_enabled.dms_endpoint_neptune_iam_authorization_enabled import (
dms_endpoint_neptune_iam_authorization_enabled,
)

check = dms_endpoint_neptune_iam_authorization_enabled()
result = check.execute()

assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
"DMS Endpoint dms-endpoint for Neptune databases has IAM authorization enabled."
)
assert result[0].resource_id == "dms-endpoint"
assert (
result[0].resource_arn
== "arn:aws:dms:us-east-1:123456789012:endpoint:dms-endpoint"
)
assert result[0].resource_tags == [
{
"Key": "Name",
"Value": "dms-endpoint",
},
{
"Key": "Owner",
"Value": "admin",
},
]
assert result[0].region == "us-east-1"
Loading

0 comments on commit ea03808

Please sign in to comment.