From b8b60e6bc53edb5db9f2f090b7b149390bd7d501 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20De=20la=20Torre=20Vico?= Date: Tue, 12 Nov 2024 21:00:09 +0100 Subject: [PATCH] feat(prowler-check-kreator): `ProwlerChecKreator` first version (#5099) Co-authored-by: Sergio --- docs/tutorials/prowler-check-kreator.md | 36 +++ mkdocs.yml | 1 + util/__init__.py | 0 util/prowler_check_kreator/__init__.py | 0 .../lib/llms/__init__.py | 0 util/prowler_check_kreator/lib/llms/gemini.py | 239 ++++++++++++++ .../lib/metadata_types.py | 246 ++++++++++++++ util/prowler_check_kreator/lib/templates.py | 132 ++++++++ .../prowler_check_kreator.py | 304 ++++++++++++++++++ 9 files changed, 958 insertions(+) create mode 100644 docs/tutorials/prowler-check-kreator.md create mode 100644 util/__init__.py create mode 100644 util/prowler_check_kreator/__init__.py create mode 100644 util/prowler_check_kreator/lib/llms/__init__.py create mode 100644 util/prowler_check_kreator/lib/llms/gemini.py create mode 100644 util/prowler_check_kreator/lib/metadata_types.py create mode 100644 util/prowler_check_kreator/lib/templates.py create mode 100644 util/prowler_check_kreator/prowler_check_kreator.py diff --git a/docs/tutorials/prowler-check-kreator.md b/docs/tutorials/prowler-check-kreator.md new file mode 100644 index 00000000000..c1a53c12350 --- /dev/null +++ b/docs/tutorials/prowler-check-kreator.md @@ -0,0 +1,36 @@ + +# Prowler Check Kreator + +???+ note + Currently, this tool is only available for creating checks for the AWS provider. + +**Prowler Check Kreator** is a utility designed to streamline the creation of new checks for Prowler. This tool generates all necessary files required to add a new check to the Prowler repository. Specifically, it creates: + +- A dedicated folder for the check. +- The main check script. +- A metadata file with essential details. +- A folder and file structure for testing the check. + +## Usage + +To use the tool, execute the main script with the following command: + +```bash +python util/prowler_check_kreator/prowler_check_kreator.py +``` +Parameters: + +- ``: Currently only AWS is supported. +- ``: The name you wish to assign to the new check. + +## AI integration + +This tool optionally integrates AI to assist in generating the check code and metadata file content. When AI assistance is chosen, the tool uses [Gemini](https://gemini.google.com/) to produce preliminary code and metadata. + +???+ note + For this feature to work, you must have the library `google-generativeai` installed in your Python environment. + +???+ warning + AI-generated code and metadata might contain errors or require adjustments to align with specific Prowler requirements. Carefully review all AI-generated content before committing. + +To enable AI assistance, simply confirm when prompted by the tool. Additionally, ensure that the `GEMINI_API_KEY` environment variable is set with a valid Gemini API key. For instructions on obtaining your API key, refer to the [Gemini documentation](https://ai.google.dev/gemini-api/docs/api-key). diff --git a/mkdocs.yml b/mkdocs.yml index bc9ce5604b3..170a9c3d9d4 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -65,6 +65,7 @@ nav: - Pentesting: tutorials/pentesting.md - Parallel Execution: tutorials/parallel-execution.md - Developer Guide: developer-guide/introduction.md + - Prowler Check Kreator: tutorials/prowler-check-kreator.md - AWS: - Authentication: tutorials/aws/authentication.md - Assume Role: tutorials/aws/role-assumption.md diff --git a/util/__init__.py b/util/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/util/prowler_check_kreator/__init__.py b/util/prowler_check_kreator/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/util/prowler_check_kreator/lib/llms/__init__.py b/util/prowler_check_kreator/lib/llms/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/util/prowler_check_kreator/lib/llms/gemini.py b/util/prowler_check_kreator/lib/llms/gemini.py new file mode 100644 index 00000000000..42e47c8c383 --- /dev/null +++ b/util/prowler_check_kreator/lib/llms/gemini.py @@ -0,0 +1,239 @@ +import json +import os + +import google.generativeai as genai + +from util.prowler_check_kreator.lib.metadata_types import ( + get_metadata_placeholder_resource_type, + get_metadata_valid_check_type, + get_metadata_valid_resource_type, +) + + +class Gemini: + def __init__(self, model: str = "gemini-1.5-flash"): + if os.getenv("GEMINI_API_KEY"): + self.api_key = os.getenv("GEMINI_API_KEY") + else: + self.api_key = input( + "GEMINI_API_KEY is not set, please enter the API key: " + ) + if not self.api_key: + raise Exception("GEMINI_API_KEY is required") + + if model not in ["gemini-1.5-flash", "gemini-1.5-pro", "gemini-1.0-pro"]: + raise Exception("Invalid Gemini AI model") + + self.model_name = model + self.generation_config = { + "temperature": 0, + "top_p": 1, + "top_k": 1, + } + self.safety_settings = [ + { + "category": "HARM_CATEGORY_HARASSMENT", + "threshold": "BLOCK_MEDIUM_AND_ABOVE", + }, + { + "category": "HARM_CATEGORY_HATE_SPEECH", + "threshold": "BLOCK_MEDIUM_AND_ABOVE", + }, + { + "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", + "threshold": "BLOCK_MEDIUM_AND_ABOVE", + }, + { + "category": "HARM_CATEGORY_DANGEROUS_CONTENT", + "threshold": "BLOCK_MEDIUM_AND_ABOVE", + }, + ] + self._configure_genai() + + def _configure_genai(self): + """Configure the Gemini AI model.""" + try: + genai.configure(api_key=self.api_key) + except Exception as e: + raise Exception(f"Error configuring Gemini AI: {e}") + + def _generate_content(self, prompt_parts: list) -> str: + """Generate content using Gemini AI based on provided prompts.""" + try: + model = genai.GenerativeModel( + model_name=self.model_name, + generation_config=self.generation_config, + safety_settings=self.safety_settings, + ) + response = model.generate_content(prompt_parts) + if response: + return response.text + else: + raise Exception("Error generating content with Gemini AI") + except Exception as e: + raise Exception(f"Error generating content with Gemini AI: {e}") + + def _prepare_check_prompt(self, check_name: str, context: str) -> list: + """Prepare the prompt for generating the check.""" + + prompt_parts = [ + "You are a AWS cybersecurity engineer working in Prowler, an open-source Cloud Security tool to audit Cloud environments in an automated way.", + f"Your task is to create a new security check called '{check_name}' based on the following context:\n{context}", + "TA check is a Python class that inherits from the Check class and has only one method called execute.\n", + "The execute method must return a list of Check_Report_AWS objects.\n", + "Limit to Python code only.\n", + "Use the following check as inspiration about the format to create the new check:\n", + "ec2_instance_port_ssh_exposed_to_internet:", + "from prowler.lib.check.models import Check, Check_Report_AWS\nfrom prowler.providers.aws.services.ec2.ec2_client import ec2_client\nfrom prowler.providers.aws.services.ec2.lib.instance import get_instance_public_status\nfrom prowler.providers.aws.services.ec2.lib.security_groups import check_security_group\nfrom prowler.providers.aws.services.vpc.vpc_client import vpc_client\n\n\nclass ec2_instance_port_ssh_exposed_to_internet(Check):\n\t# EC2 Instances with SSH port 22 open to the Internet will be flagged as FAIL with a severity of medium if the instance has no public IP, high if the instance has a public IP but is in a private subnet, and critical if the instance has a public IP and is in a public subnet.\n\tdef execute(self):\n\t\tfindings = []\n\t\tcheck_ports = [22]\n\t\tfor instance in ec2_client.instances:\n\t\t\treport = Check_Report_AWS(self.metadata())\n\t\t\treport.region = instance.region\n\t\t\treport.status = 'PASS'\n\t\t\treport.status_extended = f'Instance {instance.id} does not have SSH port 22 open to the Internet.'\n\t\t\treport.resource_id = instance.id\n\t\t\treport.resource_arn = instance.arn\n\t\t\treport.resource_tags = instance.tags\n\t\t\tis_open_port = False\n\t\t\tif instance.security_groups:\n\t\t\t\tfor sg in ec2_client.security_groups.values():\n\t\t\t\t\tif sg.id in instance.security_groups:\n\t\t\t\t\t\tfor ingress_rule in sg.ingress_rules:\n\t\t\t\t\t\t\tif check_security_group(\n\t\t\t\t\t\t\t\tingress_rule, 'tcp', check_ports, any_address=True\n\t\t\t\t\t\t\t):\n\t\t\t\t\t\t\t\t# The port is open, now check if the instance is in a public subnet with a public IP\n\t\t\t\t\t\t\t\treport.status = 'FAIL'\n\t\t\t\t\t\t\t\t(\n\t\t\t\t\t\t\t\t\treport.status_extended,\n\t\t\t\t\t\t\t\t\treport.check_metadata.Severity,\n\t\t\t\t\t\t\t\t) = get_instance_public_status(\n\t\t\t\t\t\t\t\t\tvpc_client.vpc_subnets, instance, 'SSH'\n\t\t\t\t\t\t\t\t)\n\t\t\t\t\t\t\t\tis_open_port = True\n\t\t\t\t\t\t\t\tbreak\n\t\t\t\t\t\tif is_open_port:\n\t\t\t\t\t\t\tbreak\n\t\t\tfindings.append(report)\n\t\treturn findings\n", + "s3_bucket_default_encryption:", + "from prowler.lib.check.models import Check, Check_Report_AWS\nfrom prowler.providers.aws.services.s3.s3_client import s3_client\n\n\nclass s3_bucket_default_encryption(Check):\n\tdef execute(self):\n\t\tfindings = []\n\t\tfor arn, bucket in s3_client.buckets.items():\n\t\t\treport = Check_Report_AWS(self.metadata())\n\t\t\treport.region = bucket.region\n\t\t\treport.resource_id = bucket.name\n\t\t\treport.resource_arn = arn\n\t\t\treport.resource_tags = bucket.tags\n\t\t\tif bucket.encryption:\n\t\t\t\treport.status = 'PASS'\n\t\t\t\treport.status_extended = f'S3 Bucket {bucket.name} has Server Side Encryption with {bucket.encryption}.'\n\t\t\telse:\n\t\t\t\treport.status = 'FAIL'\n\t\t\t\treport.status_extended = f'S3 Bucket {bucket.name} does not have Server Side Encryption enabled.'\n\t\t\tfindings.append(report)\n\t\treturn findings\n", + "bedrock_guardrail_prompt_attack_filter_enabled:", + "from prowler.lib.check.models import Check, Check_Report_AWS\nfrom prowler.providers.aws.services.bedrock.bedrock_client import bedrock_client\n\n\nclass bedrock_guardrail_prompt_attack_filter_enabled(Check):\n\tdef execute(self):\n\t\tfindings = []\n\t\tfor guardrail in bedrock_client.guardrails.values():\n\t\t\treport = Check_Report_AWS(self.metadata())\n\t\t\treport.region = guardrail.region\n\t\t\treport.resource_id = guardrail.id\n\t\t\treport.resource_arn = guardrail.arn\n\t\t\treport.resource_tags = guardrail.tags\n\t\t\treport.status = 'PASS'\n\t\t\treport.status_extended = f'Bedrock Guardrail {guardrail.name} is configured to detect and block prompt attacks with a HIGH strength.'\n\t\t\tif not guardrail.prompt_attack_filter_strength:\n\t\t\t\treport.status = 'FAIL'\n\t\t\t\treport.status_extended = f'Bedrock Guardrail {guardrail.name} is not configured to block prompt attacks.'\n\t\t\telif guardrail.prompt_attack_filter_strength != 'HIGH':\n\t\t\t\treport.status = 'FAIL'\n\t\t\t\treport.status_extended = f'Bedrock Guardrail {guardrail.name} is configured to block prompt attacks but with a filter strength of {guardrail.prompt_attack_filter_strength}, not HIGH.'\n\t\t\tfindings.append(report)\n\n\t\treturn findings", + "cloudwatch_alarm_actions_enabled:", + "from prowler.lib.check.models import Check, Check_Report_AWS\nfrom prowler.providers.aws.services.cloudwatch.cloudwatch_client import (\n\tcloudwatch_client,\n)\n\n\nclass cloudwatch_alarm_actions_enabled(Check):\n\tdef execute(self):\n\t\tfindings = []\n\t\tfor metric_alarm in cloudwatch_client.metric_alarms:\n\t\t\treport = Check_Report_AWS(self.metadata())\n\t\t\treport.region = metric_alarm.region\n\t\t\treport.resource_id = metric_alarm.name\n\t\t\treport.resource_arn = metric_alarm.arn\n\t\t\treport.resource_tags = metric_alarm.tags\n\t\t\treport.status = 'PASS'\n\t\t\treport.status_extended = (\n\t\t\t\tf'CloudWatch metric alarm {metric_alarm.name} has actions enabled.'\n\t\t\t)\n\t\t\tif not metric_alarm.actions_enabled:\n\t\t\t\treport.status = 'FAIL'\n\t\t\t\treport.status_extended = f'CloudWatch metric alarm {metric_alarm.name} does not have actions enabled.'\n\t\t\tfindings.append(report)\n\t\treturn findings", + "awslambda_function_not_publicly_accessible:", + "from prowler.lib.check.models import Check, Check_Report_AWS\nfrom prowler.providers.aws.services.awslambda.awslambda_client import awslambda_client\nfrom prowler.providers.aws.services.iam.lib.policy import is_policy_public\n\n\nclass awslambda_function_not_publicly_accessible(Check):\n\tdef execute(self):\n\t\tfindings = []\n\t\tfor function in awslambda_client.functions.values():\n\t\t\treport = Check_Report_AWS(self.metadata())\n\t\t\treport.region = function.region\n\t\t\treport.resource_id = function.name\n\t\t\treport.resource_arn = function.arn\n\t\t\treport.resource_tags = function.tags\n\n\t\t\treport.status = 'PASS'\n\t\t\treport.status_extended = f'Lambda function {function.name} has a policy resource-based policy not public.'\n\t\t\tif is_policy_public(\n\t\t\t\tfunction.policy,\n\t\t\t\tawslambda_client.audited_account,\n\t\t\t\tis_cross_account_allowed=True,\n\t\t\t):\n\t\t\t\treport.status = 'FAIL'\n\t\t\t\treport.status_extended = f'Lambda function {function.name} has a policy resource-based policy with public access.'\n\n\t\t\tfindings.append(report)\n\n\t\treturn findings", + f"{check_name}:", + ] + return prompt_parts + + def _prepare_test_prompt(self, check_name: str) -> list: + """Prepare the prompt for generating the test.""" + + prompt_parts = [ + "You are a AWS cybersecurity engineer working in Prowler, an open-source Cloud Security tool to audit Cloud environments in an automated way.", + f"Your task is to create a new unit test for the security check '{check_name}'.", + "The test must have one or more methods that start with the word 'test'.", + "The test methods must use the assert statement to check the results of the check.", + "I need the answer only with Python formatted text.", + "Use the following test as inspiration to create the new test: ", + "ec2_instance_port_ssh_exposed_to_internet:", + "from unittest import mock\n\nfrom boto3 import client, resource\nfrom moto import mock_aws\n\nfrom tests.providers.aws.utils import (\n\tAWS_REGION_EU_WEST_1,\n\tAWS_REGION_US_EAST_1,\n\tset_mocked_aws_provider,\n)\n\n\nclass Test_ec2_instance_port_ssh_exposed_to_internet:\n\t@mock_aws\n\tdef test_no_ec2_instances(self):\n\t\t# Create EC2 Mocked Resources\n\t\tec2_client = client('ec2', region_name=AWS_REGION_US_EAST_1)\n\t\tec2_client.create_vpc(CidrBlock='10.0.0.0/16')\n\n\t\tfrom prowler.providers.aws.services.ec2.ec2_service import EC2\n\t\tfrom prowler.providers.aws.services.vpc.vpc_service import VPC\n\n\t\taws_provider = set_mocked_aws_provider(\n\t\t\t[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]\n\t\t)\n\n\t\twith mock.patch(\n\t\t\t'prowler.providers.common.provider.Provider.get_global_provider',\n\t\t\treturn_value=aws_provider,\n\t\t), mock.patch(\n\t\t\t'prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet.ec2_client',\n\t\t\tnew=EC2(aws_provider),\n\t\t), mock.patch(\n\t\t\t'prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet.vpc_client',\n\t\t\tnew=VPC(aws_provider),\n\t\t):\n\t\t\t# Test Check\n\t\t\tfrom prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet import (\n\t\t\t\tec2_instance_port_ssh_exposed_to_internet,\n\t\t\t)\n\n\t\t\tcheck = ec2_instance_port_ssh_exposed_to_internet()\n\t\t\tresult = check.execute()\n\n\t\t\tassert len(result) == 0\n\n\t@mock_aws\n\tdef test_ec2_instance_no_port_exposed(self):\n\t\t# Create EC2 Mocked Resources\n\t\tec2_client = client('ec2', region_name=AWS_REGION_US_EAST_1)\n\t\tec2_resource = resource('ec2', region_name=AWS_REGION_US_EAST_1)\n\t\tvpc_id = ec2_client.create_vpc(CidrBlock='10.0.0.0/16')['Vpc']['VpcId']\n\t\tdefault_sg = ec2_client.describe_security_groups(GroupNames=['default'])[\n\t\t\t'SecurityGroups'\n\t\t][0]\n\t\tdefault_sg_id = default_sg['GroupId']\n\t\tec2_client.authorize_security_group_ingress(\n\t\t\tGroupId=default_sg_id,\n\t\t\tIpPermissions=[\n\t\t\t\t{\n\t\t\t\t\t'IpProtocol': 'tcp',\n\t\t\t\t\t'FromPort': 22,\n\t\t\t\t\t'ToPort': 22,\n\t\t\t\t\t'IpRanges': [{'CidrIp': '123.123.123.123/32'}],\n\t\t\t\t}\n\t\t\t],\n\t\t)\n\t\tsubnet_id = ec2_client.create_subnet(VpcId=vpc_id, CidrBlock='10.0.0.0/16')[\n\t\t\t'Subnet'\n\t\t]['SubnetId']\n\t\tinstance_id = ec2_resource.create_instances(\n\t\t\tImageId='ami-12345678',\n\t\t\tMinCount=1,\n\t\t\tMaxCount=1,\n\t\t\tInstanceType='t2.micro',\n\t\t\tSecurityGroupIds=[default_sg_id],\n\t\t\tSubnetId=subnet_id,\n\t\t\tTagSpecifications=[\n\t\t\t\t{'ResourceType': 'instance', 'Tags': [{'Key': 'Name', 'Value': 'test'}]}\n\t\t\t],\n\t\t)[0].id\n\n\t\tfrom prowler.providers.aws.services.ec2.ec2_service import EC2\n\t\tfrom prowler.providers.aws.services.vpc.vpc_service import VPC\n\n\t\taws_provider = set_mocked_aws_provider(\n\t\t\t[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]\n\t\t)\n\n\t\twith mock.patch(\n\t\t\t'prowler.providers.common.provider.Provider.get_global_provider',\n\t\t\treturn_value=aws_provider,\n\t\t), mock.patch(\n\t\t\t'prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet.ec2_client',\n\t\t\tnew=EC2(aws_provider),\n\t\t), mock.patch(\n\t\t\t'prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet.vpc_client',\n\t\t\tnew=VPC(aws_provider),\n\t\t):\n\t\t\t# Test Check\n\t\t\tfrom prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet import (\n\t\t\t\tec2_instance_port_ssh_exposed_to_internet,\n\t\t\t)\n\n\t\t\tcheck = ec2_instance_port_ssh_exposed_to_internet()\n\t\t\tresult = check.execute()\n\n\t\t\tassert len(result) == 1\n\t\t\tassert result[0].status == 'PASS'\n\t\t\tassert (\n\t\t\t\tresult[0].status_extended\n\t\t\t\t== f'Instance {instance_id} does not have SSH port 22 open to the Internet.'\n\t\t\t)\n\t\t\tassert result[0].resource_id == instance_id\n\t\t\tassert (\n\t\t\t\tresult[0].resource_arn\n\t\t\t\t== f'arn:{aws_provider.identity.partition}:ec2:{AWS_REGION_US_EAST_1}:{aws_provider.identity.account}:instance/{instance_id}'\n\t\t\t)\n\t\t\tassert result[0].resource_tags == [{'Key': 'Name', 'Value': 'test'}]\n\t\t\tassert result[0].region == AWS_REGION_US_EAST_1\n\t\t\tassert result[0].check_metadata.Severity == 'critical'\n\n\t@mock_aws\n\tdef test_ec2_instance_exposed_port_in_private_subnet(self):\n\t\t# Create EC2 Mocked Resources\n\t\tec2_client = client('ec2', region_name=AWS_REGION_US_EAST_1)\n\t\tec2_resource = resource('ec2', region_name=AWS_REGION_US_EAST_1)\n\t\tvpc_id = ec2_client.create_vpc(CidrBlock='10.0.0.0/16')['Vpc']['VpcId']\n\t\tdefault_sg = ec2_client.describe_security_groups(GroupNames=['default'])[\n\t\t\t'SecurityGroups'\n\t\t][0]\n\t\tdefault_sg_id = default_sg['GroupId']\n\t\tec2_client.authorize_security_group_ingress(\n\t\t\tGroupId=default_sg_id,\n\t\t\tIpPermissions=[\n\t\t\t\t{\n\t\t\t\t\t'IpProtocol': 'tcp',\n\t\t\t\t\t'FromPort': 22,\n\t\t\t\t\t'ToPort': 22,\n\t\t\t\t\t'IpRanges': [{'CidrIp': '0.0.0.0/0'}],\n\t\t\t\t}\n\t\t\t],\n\t\t)\n\t\tsubnet_id = ec2_client.create_subnet(VpcId=vpc_id, CidrBlock='10.0.0.0/16')[\n\t\t\t'Subnet'\n\t\t]['SubnetId']\n\t\tinstance_id = ec2_resource.create_instances(\n\t\t\tImageId='ami-12345678',\n\t\t\tMinCount=1,\n\t\t\tMaxCount=1,\n\t\t\tInstanceType='t2.micro',\n\t\t\tSecurityGroupIds=[default_sg_id],\n\t\t\tSubnetId=subnet_id,\n\t\t\tTagSpecifications=[\n\t\t\t\t{'ResourceType': 'instance', 'Tags': [{'Key': 'Name', 'Value': 'test'}]}\n\t\t\t],\n\t\t)[0].id\n\n\t\tfrom prowler.providers.aws.services.ec2.ec2_service import EC2\n\t\tfrom prowler.providers.aws.services.vpc.vpc_service import VPC\n\n\t\taws_provider = set_mocked_aws_provider(\n\t\t\t[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]\n\t\t)\n\n\t\twith mock.patch(\n\t\t\t'prowler.providers.common.provider.Provider.get_global_provider',\n\t\t\treturn_value=aws_provider,\n\t\t), mock.patch(\n\t\t\t'prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet.ec2_client',\n\t\t\tnew=EC2(aws_provider),\n\t\t), mock.patch(\n\t\t\t'prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet.vpc_client',\n\t\t\tnew=VPC(aws_provider),\n\t\t):\n\t\t\t# Test Check\n\t\t\tfrom prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet import (\n\t\t\t\tec2_instance_port_ssh_exposed_to_internet,\n\t\t\t)\n\n\t\t\tcheck = ec2_instance_port_ssh_exposed_to_internet()\n\t\t\tresult = check.execute()\n\n\t\t\tassert len(result) == 1\n\t\t\tassert result[0].status == 'FAIL'\n\t\t\tassert (\n\t\t\t\tresult[0].status_extended\n\t\t\t\t== f'Instance {instance_id} has SSH exposed to 0.0.0.0/0 but with no public IP address.'\n\t\t\t)\n\t\t\tassert result[0].resource_id == instance_id\n\t\t\tassert (\n\t\t\t\tresult[0].resource_arn\n\t\t\t\t== f'arn:{aws_provider.identity.partition}:ec2:{AWS_REGION_US_EAST_1}:{aws_provider.identity.account}:instance/{instance_id}'\n\t\t\t)\n\t\t\tassert result[0].resource_tags == [{'Key': 'Name', 'Value': 'test'}]\n\t\t\tassert result[0].region == AWS_REGION_US_EAST_1\n\t\t\tassert result[0].check_metadata.Severity == 'medium'", + "s3_bucket_default_encryption:", + "from unittest import mock\n\nfrom boto3 import client\nfrom moto import mock_aws\n\nfrom tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider\n\n\nclass Test_s3_bucket_default_encryption:\n\t@mock_aws\n\tdef test_bucket_no_encryption(self):\n\t\ts3_client_us_east_1 = client('s3', region_name=AWS_REGION_US_EAST_1)\n\t\tbucket_name_us = 'bucket_test_us'\n\t\ts3_client_us_east_1.create_bucket(Bucket=bucket_name_us)\n\n\t\tfrom prowler.providers.aws.services.s3.s3_service import S3\n\n\t\taws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])\n\n\t\twith mock.patch(\n\t\t\t'prowler.providers.common.provider.Provider.get_global_provider',\n\t\t\treturn_value=aws_provider,\n\t\t):\n\t\t\twith mock.patch(\n\t\t\t\t'prowler.providers.aws.services.s3.s3_bucket_default_encryption.s3_bucket_default_encryption.s3_client',\n\t\t\t\tnew=S3(aws_provider),\n\t\t\t):\n\t\t\t\t# Test Check\n\t\t\t\tfrom prowler.providers.aws.services.s3.s3_bucket_default_encryption.s3_bucket_default_encryption import (\n\t\t\t\t\ts3_bucket_default_encryption,\n\t\t\t\t)\n\n\t\t\t\tcheck = s3_bucket_default_encryption()\n\t\t\t\tresult = check.execute()\n\n\t\t\t\tassert len(result) == 1\n\t\t\t\tassert result[0].status == 'FAIL'\n\t\t\t\tassert (\n\t\t\t\t\tresult[0].status_extended\n\t\t\t\t\t== f'S3 Bucket {bucket_name_us} does not have Server Side Encryption enabled.'\n\t\t\t\t)\n\t\t\t\tassert result[0].resource_id == bucket_name_us\n\t\t\t\tassert (\n\t\t\t\t\tresult[0].resource_arn\n\t\t\t\t\t== f'arn:{aws_provider.identity.partition}:s3:::{bucket_name_us}'\n\t\t\t\t)\n\t\t\t\tassert result[0].region == AWS_REGION_US_EAST_1\n\n\t@mock_aws\n\tdef test_bucket_kms_encryption(self):\n\t\ts3_client_us_east_1 = client('s3', region_name=AWS_REGION_US_EAST_1)\n\t\tbucket_name_us = 'bucket_test_us'\n\t\ts3_client_us_east_1.create_bucket(\n\t\t\tBucket=bucket_name_us, ObjectOwnership='BucketOwnerEnforced'\n\t\t)\n\t\tsse_config = {\n\t\t\t'Rules': [\n\t\t\t\t{\n\t\t\t\t\t'ApplyServerSideEncryptionByDefault': {\n\t\t\t\t\t\t'SSEAlgorithm': 'aws:kms',\n\t\t\t\t\t\t'KMSMasterKeyID': '12345678',\n\t\t\t\t\t}\n\t\t\t\t}\n\t\t\t]\n\t\t}\n\n\t\ts3_client_us_east_1.put_bucket_encryption(\n\t\t\tBucket=bucket_name_us, ServerSideEncryptionConfiguration=sse_config\n\t\t)\n\n\t\tfrom prowler.providers.aws.services.s3.s3_service import S3\n\n\t\taws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])\n\n\t\twith mock.patch(\n\t\t\t'prowler.providers.common.provider.Provider.get_global_provider',\n\t\t\treturn_value=aws_provider,\n\t\t):\n\t\t\twith mock.patch(\n\t\t\t\t'prowler.providers.aws.services.s3.s3_bucket_default_encryption.s3_bucket_default_encryption.s3_client',\n\t\t\t\tnew=S3(aws_provider),\n\t\t\t):\n\t\t\t\t# Test Check\n\t\t\t\tfrom prowler.providers.aws.services.s3.s3_bucket_default_encryption.s3_bucket_default_encryption import (\n\t\t\t\t\ts3_bucket_default_encryption,\n\t\t\t\t)\n\n\t\t\t\tcheck = s3_bucket_default_encryption()\n\t\t\t\tresult = check.execute()\n\n\t\t\t\tassert len(result) == 1\n\t\t\t\tassert result[0].status == 'PASS'\n\t\t\t\tassert (\n\t\t\t\t\tresult[0].status_extended\n\t\t\t\t\t== f'S3 Bucket {bucket_name_us} has Server Side Encryption with aws:kms.'\n\t\t\t\t)\n\t\t\t\tassert result[0].resource_id == bucket_name_us\n\t\t\t\tassert (\n\t\t\t\t\tresult[0].resource_arn\n\t\t\t\t\t== f'arn:{aws_provider.identity.partition}:s3:::{bucket_name_us}'\n\t\t\t\t)\n\t\t\t\tassert result[0].region == AWS_REGION_US_EAST_1", + "cloudwatch_alarm_actions_enabled:", + "from unittest import mock\n\nfrom boto3 import client\nfrom moto import mock_aws\n\nfrom tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider\n\n\nclass Test_cloudwatch_alarm_actions_enabled:\n\t@mock_aws\n\tdef test_no_cloudwatch_alarms(self):\n\t\tcloudwatch_client = client('cloudwatch', region_name=AWS_REGION_US_EAST_1)\n\t\tcloudwatch_client.metric_alarms = []\n\n\t\tfrom prowler.providers.aws.services.cloudwatch.cloudwatch_service import (\n\t\t\tCloudWatch,\n\t\t)\n\n\t\taws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])\n\n\t\twith mock.patch(\n\t\t\t'prowler.providers.common.provider.Provider.get_global_provider',\n\t\t\treturn_value=aws_provider,\n\t\t), mock.patch(\n\t\t\t'prowler.providers.aws.services.cloudwatch.cloudwatch_alarm_actions_enabled.cloudwatch_alarm_actions_enabled.cloudwatch_client',\n\t\t\tnew=CloudWatch(aws_provider),\n\t\t):\n\n\t\t\tfrom prowler.providers.aws.services.cloudwatch.cloudwatch_alarm_actions_enabled.cloudwatch_alarm_actions_enabled import (\n\t\t\t\tcloudwatch_alarm_actions_enabled,\n\t\t\t)\n\n\t\t\tcheck = cloudwatch_alarm_actions_enabled()\n\t\t\tresult = check.execute()\n\n\t\t\tassert len(result) == 0\n\n\t@mock_aws\n\tdef test_cloudwatch_alarms_actions_enabled(self):\n\t\tcloudwatch_client = client('cloudwatch', region_name=AWS_REGION_US_EAST_1)\n\t\tcloudwatch_client.put_metric_alarm(\n\t\t\tAlarmName='test_alarm',\n\t\t\tAlarmDescription='Test alarm',\n\t\t\tActionsEnabled=True,\n\t\t\tAlarmActions=['arn:aws:sns:us-east-1:123456789012:my-sns-topic'],\n\t\t\tEvaluationPeriods=1,\n\t\t\tComparisonOperator='GreaterThanThreshold',\n\t\t)\n\n\t\tfrom prowler.providers.aws.services.cloudwatch.cloudwatch_service import (\n\t\t\tCloudWatch,\n\t\t)\n\n\t\taws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])\n\n\t\twith mock.patch(\n\t\t\t'prowler.providers.common.provider.Provider.get_global_provider',\n\t\t\treturn_value=aws_provider,\n\t\t), mock.patch(\n\t\t\t'prowler.providers.aws.services.cloudwatch.cloudwatch_alarm_actions_enabled.cloudwatch_alarm_actions_enabled.cloudwatch_client',\n\t\t\tnew=CloudWatch(aws_provider),\n\t\t):\n\n\t\t\tfrom prowler.providers.aws.services.cloudwatch.cloudwatch_alarm_actions_enabled.cloudwatch_alarm_actions_enabled import (\n\t\t\t\tcloudwatch_alarm_actions_enabled,\n\t\t\t)\n\n\t\t\tcheck = cloudwatch_alarm_actions_enabled()\n\t\t\tresult = check.execute()\n\n\t\t\tassert len(result) == 1\n\t\t\tassert result[0].status == 'PASS'\n\t\t\tassert (\n\t\t\t\tresult[0].status_extended\n\t\t\t\t== 'CloudWatch metric alarm test_alarm has actions enabled.'\n\t\t\t)\n\t\t\tassert result[0].resource_id == 'test_alarm'\n\t\t\tassert (\n\t\t\t\tresult[0].resource_arn\n\t\t\t\t== 'arn:aws:cloudwatch:us-east-1:123456789012:alarm:test_alarm'\n\t\t\t)\n\t\t\tassert result[0].region == AWS_REGION_US_EAST_1\n\t\t\tassert result[0].resource_tags == []\n\n\t@mock_aws\n\tdef test_cloudwatch_alarms_actions_disabled(self):\n\t\tcloudwatch_client = client('cloudwatch', region_name=AWS_REGION_US_EAST_1)\n\t\tcloudwatch_client.put_metric_alarm(\n\t\t\tAlarmName='test_alarm',\n\t\t\tAlarmDescription='Test alarm',\n\t\t\tActionsEnabled=False,\n\t\t\tAlarmActions=['arn:aws:sns:us-east-1:123456789012:my-sns-topic'],\n\t\t\tEvaluationPeriods=1,\n\t\t\tComparisonOperator='GreaterThanThreshold',\n\t\t)\n\n\t\tfrom prowler.providers.aws.services.cloudwatch.cloudwatch_service import (\n\t\t\tCloudWatch,\n\t\t)\n\n\t\taws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])\n\n\t\twith mock.patch(\n\t\t\t'prowler.providers.common.provider.Provider.get_global_provider',\n\t\t\treturn_value=aws_provider,\n\t\t), mock.patch(\n\t\t\t'prowler.providers.aws.services.cloudwatch.cloudwatch_alarm_actions_enabled.cloudwatch_alarm_actions_enabled.cloudwatch_client',\n\t\t\tnew=CloudWatch(aws_provider),\n\t\t):\n\n\t\t\tfrom prowler.providers.aws.services.cloudwatch.cloudwatch_alarm_actions_enabled.cloudwatch_alarm_actions_enabled import (\n\t\t\t\tcloudwatch_alarm_actions_enabled,\n\t\t\t)\n\n\t\t\tcheck = cloudwatch_alarm_actions_enabled()\n\t\t\tresult = check.execute()\n\n\t\t\tassert len(result) == 1\n\t\t\tassert result[0].status == 'FAIL'\n\t\t\tassert (\n\t\t\t\tresult[0].status_extended\n\t\t\t\t== 'CloudWatch metric alarm test_alarm does not have actions enabled.'\n\t\t\t)\n\t\t\tassert result[0].resource_id == 'test_alarm'\n\t\t\tassert (\n\t\t\t\tresult[0].resource_arn\n\t\t\t\t== 'arn:aws:cloudwatch:us-east-1:123456789012:alarm:test_alarm'\n\t\t\t)\n\t\t\tassert result[0].region == AWS_REGION_US_EAST_1\n\t\t\tassert result[0].resource_tags == []", + "awslambda_function_not_publicly_accessible:", + "from json import dumps\nfrom unittest import mock\n\nfrom boto3 import client\nfrom moto import mock_aws\n\nfrom prowler.providers.aws.services.awslambda.awslambda_service import Function\nfrom tests.providers.aws.utils import (\n\tAWS_ACCOUNT_NUMBER,\n\tAWS_REGION_EU_WEST_1,\n\tset_mocked_aws_provider,\n)\n\n\nclass Test_awslambda_function_not_publicly_accessible:\n\t@mock_aws\n\tdef test_no_functions(self):\n\t\taws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])\n\n\t\tfrom prowler.providers.aws.services.awslambda.awslambda_service import Lambda\n\n\t\twith mock.patch(\n\t\t\t'prowler.providers.common.provider.Provider.get_global_provider',\n\t\t\treturn_value=aws_provider,\n\t\t), mock.patch(\n\t\t\t'prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client',\n\t\t\tnew=Lambda(aws_provider),\n\t\t):\n\t\t\t# Test Check\n\t\t\tfrom prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import (\n\t\t\t\tawslambda_function_not_publicly_accessible,\n\t\t\t)\n\n\t\t\tcheck = awslambda_function_not_publicly_accessible()\n\t\t\tresult = check.execute()\n\n\t\t\tassert len(result) == 0\n\n\t@mock_aws\n\tdef test_function_public(self):\n\t\t# Create the mock IAM role\n\t\tiam_client = client('iam', region_name=AWS_REGION_EU_WEST_1)\n\t\trole_name = 'test-role'\n\t\tassume_role_policy_document = {\n\t\t\t'Version': '2012-10-17',\n\t\t\t'Statement': [\n\t\t\t\t{\n\t\t\t\t\t'Effect': 'Allow',\n\t\t\t\t\t'Principal': {'Service': 'lambda.amazonaws.com'},\n\t\t\t\t\t'Action': 'sts:AssumeRole',\n\t\t\t\t}\n\t\t\t],\n\t\t}\n\t\trole_arn = iam_client.create_role(\n\t\t\tRoleName=role_name,\n\t\t\tAssumeRolePolicyDocument=dumps(assume_role_policy_document),\n\t\t)['Role']['Arn']\n\n\t\tfunction_name = 'test-lambda'\n\n\t\t# Create the lambda function using boto3 client\n\t\tlambda_client = client('lambda', region_name=AWS_REGION_EU_WEST_1)\n\t\tfunction_arn = lambda_client.create_function(\n\t\t\tFunctionName=function_name,\n\t\t\tRuntime='nodejs4.3',\n\t\t\tRole=role_arn,\n\t\t\tHandler='index.handler',\n\t\t\tCode={'ZipFile': b'fileb://file-path/to/your-deployment-package.zip'},\n\t\t\tDescription='Test Lambda function',\n\t\t\tTimeout=3,\n\t\t\tMemorySize=128,\n\t\t\tPublish=True,\n\t\t\tTags={'tag1': 'value1', 'tag2': 'value2'},\n\t\t)['FunctionArn']\n\n\t\t# Attach the policy to the lambda function with a wildcard principal\n\t\tlambda_client.add_permission(\n\t\t\tFunctionName=function_name,\n\t\t\tStatementId='public-access',\n\t\t\tAction='lambda:InvokeFunction',\n\t\t\tPrincipal='*',\n\t\t)\n\n\t\taws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])\n\n\t\tfrom prowler.providers.aws.services.awslambda.awslambda_service import Lambda\n\n\t\twith mock.patch(\n\t\t\t'prowler.providers.common.provider.Provider.get_global_provider',\n\t\t\treturn_value=aws_provider,\n\t\t), mock.patch(\n\t\t\t'prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client',\n\t\t\tnew=Lambda(aws_provider),\n\t\t):\n\t\t\t# Test Check\n\t\t\tfrom prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import (\n\t\t\t\tawslambda_function_not_publicly_accessible,\n\t\t\t)\n\n\t\t\tcheck = awslambda_function_not_publicly_accessible()\n\t\t\tresult = check.execute()\n\n\t\t\tassert len(result) == 1\n\t\t\tassert result[0].region == AWS_REGION_EU_WEST_1\n\t\t\tassert result[0].resource_id == function_name\n\t\t\tassert result[0].resource_arn == function_arn\n\t\t\tassert result[0].status == 'FAIL'\n\t\t\tassert (\n\t\t\t\tresult[0].status_extended\n\t\t\t\t== f'Lambda function {function_name} has a policy resource-based policy with public access.'\n\t\t\t)\n\t\t\tassert result[0].resource_tags == [{'tag1': 'value1', 'tag2': 'value2'}]\n\n\t@mock_aws\n\tdef test_function_public_with_source_account(self):\n\t\t# Create the mock IAM role\n\t\tiam_client = client('iam', region_name=AWS_REGION_EU_WEST_1)\n\t\trole_name = 'test-role'\n\t\tassume_role_policy_document = {\n\t\t\t'Version': '2012-10-17',\n\t\t\t'Statement': [\n\t\t\t\t{\n\t\t\t\t\t'Effect': 'Allow',\n\t\t\t\t\t'Principal': {'Service': 'lambda.amazonaws.com'},\n\t\t\t\t\t'Action': 'sts:AssumeRole',\n\t\t\t\t}\n\t\t\t],\n\t\t}\n\t\trole_arn = iam_client.create_role(\n\t\t\tRoleName=role_name,\n\t\t\tAssumeRolePolicyDocument=dumps(assume_role_policy_document),\n\t\t)['Role']['Arn']\n\n\t\tfunction_name = 'test-lambda'\n\n\t\t# Create the lambda function using boto3 client\n\t\tlambda_client = client('lambda', region_name=AWS_REGION_EU_WEST_1)\n\t\tfunction_arn = lambda_client.create_function(\n\t\t\tFunctionName=function_name,\n\t\t\tRuntime='nodejs4.3',\n\t\t\tRole=role_arn,\n\t\t\tHandler='index.handler',\n\t\t\tCode={'ZipFile': b'fileb://file-path/to/your-deployment-package.zip'},\n\t\t\tDescription='Test Lambda function',\n\t\t\tTimeout=3,\n\t\t\tMemorySize=128,\n\t\t\tPublish=True,\n\t\t\tTags={'tag1': 'value1', 'tag2': 'value2'},\n\t\t)['FunctionArn']\n\n\t\t# Attach the policy to the lambda function with a wildcard principal\n\t\tlambda_client.add_permission(\n\t\t\tFunctionName=function_name,\n\t\t\tStatementId='non-public-access',\n\t\t\tAction='lambda:InvokeFunction',\n\t\t\tPrincipal='*',\n\t\t\tSourceArn=function_arn,\n\t\t\tSourceAccount=AWS_ACCOUNT_NUMBER,\n\t\t)\n\n\t\taws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])\n\n\t\tfrom prowler.providers.aws.services.awslambda.awslambda_service import Lambda\n\n\t\twith mock.patch(\n\t\t\t'prowler.providers.common.provider.Provider.get_global_provider',\n\t\t\treturn_value=aws_provider,\n\t\t), mock.patch(\n\t\t\t'prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client',\n\t\t\tnew=Lambda(aws_provider),\n\t\t):\n\t\t\t# Test Check\n\t\t\tfrom prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import (\n\t\t\t\tawslambda_function_not_publicly_accessible,\n\t\t\t)\n\n\t\t\tcheck = awslambda_function_not_publicly_accessible()\n\t\t\tresult = check.execute()\n\n\t\t\tassert len(result) == 1\n\t\t\tassert result[0].region == AWS_REGION_EU_WEST_1\n\t\t\tassert result[0].resource_id == function_name\n\t\t\tassert result[0].resource_arn == function_arn\n\t\t\tassert result[0].status == 'PASS'\n\t\t\tassert (\n\t\t\t\tresult[0].status_extended\n\t\t\t\t== f'Lambda function {function_name} has a policy resource-based policy not public.'\n\t\t\t)\n\t\t\tassert result[0].resource_tags == [{'tag1': 'value1', 'tag2': 'value2'}]\n\n\t@mock_aws\n\tdef test_function_not_public(self):\n\t\t# Create the mock IAM role\n\t\tiam_client = client('iam', region_name=AWS_REGION_EU_WEST_1)\n\t\trole_name = 'test-role'\n\t\tassume_role_policy_document = {\n\t\t\t'Version': '2012-10-17',\n\t\t\t'Statement': [\n\t\t\t\t{\n\t\t\t\t\t'Effect': 'Allow',\n\t\t\t\t\t'Principal': {'Service': 'lambda.amazonaws.com'},\n\t\t\t\t\t'Action': 'sts:AssumeRole',\n\t\t\t\t}\n\t\t\t],\n\t\t}\n\t\trole_arn = iam_client.create_role(\n\t\t\tRoleName=role_name,\n\t\t\tAssumeRolePolicyDocument=dumps(assume_role_policy_document),\n\t\t)['Role']['Arn']\n\n\t\tfunction_name = 'test-lambda'\n\n\t\t# Create the lambda function using boto3 client\n\t\tlambda_client = client('lambda', region_name=AWS_REGION_EU_WEST_1)\n\t\tfunction_arn = lambda_client.create_function(\n\t\t\tFunctionName=function_name,\n\t\t\tRuntime='nodejs4.3',\n\t\t\tRole=role_arn,\n\t\t\tHandler='index.handler',\n\t\t\tCode={'ZipFile': b'fileb://file-path/to/your-deployment-package.zip'},\n\t\t\tDescription='Test Lambda function',\n\t\t\tTimeout=3,\n\t\t\tMemorySize=128,\n\t\t\tPublish=True,\n\t\t\tTags={'tag1': 'value1', 'tag2': 'value2'},\n\t\t)['FunctionArn']\n\n\t\t# Attach the policy to the lambda function with a specific AWS account number as principal\n\t\tlambda_client.add_permission(\n\t\t\tFunctionName=function_name,\n\t\t\tStatementId='public-access',\n\t\t\tAction='lambda:InvokeFunction',\n\t\t\tPrincipal=AWS_ACCOUNT_NUMBER,\n\t\t\tSourceArn=function_arn,\n\t\t)\n\n\t\taws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])\n\n\t\tfrom prowler.providers.aws.services.awslambda.awslambda_service import Lambda\n\n\t\twith mock.patch(\n\t\t\t'prowler.providers.common.provider.Provider.get_global_provider',\n\t\t\treturn_value=aws_provider,\n\t\t), mock.patch(\n\t\t\t'prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client',\n\t\t\tnew=Lambda(aws_provider),\n\t\t):\n\t\t\t# Test Check\n\t\t\tfrom prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import (\n\t\t\t\tawslambda_function_not_publicly_accessible,\n\t\t\t)\n\n\t\t\tcheck = awslambda_function_not_publicly_accessible()\n\t\t\tresult = check.execute()\n\n\t\t\tassert len(result) == 1\n\t\t\tassert result[0].region == AWS_REGION_EU_WEST_1\n\t\t\tassert result[0].resource_id == function_name\n\t\t\tassert result[0].resource_arn == function_arn\n\t\t\tassert result[0].status == 'PASS'\n\t\t\tassert (\n\t\t\t\tresult[0].status_extended\n\t\t\t\t== f'Lambda function {function_name} has a policy resource-based policy not public.'\n\t\t\t)\n\t\t\tassert result[0].resource_tags == [{'tag1': 'value1', 'tag2': 'value2'}]", + f"{check_name}:", + ] + return prompt_parts + + def _prepare_metadata_prompt(self, metadata: dict, context: str) -> list: + """Prepare the prompt for generating the metadata.""" + + metadata.pop("SubServiceName", None) + metadata["Remediation"]["Code"].pop("NativeIaC", None) + metadata["Remediation"]["Code"].pop("Other", None) + metadata["Remediation"]["Code"].pop("Terraform", None) + metadata.pop("DependsOn", None) + metadata.pop("RelatedTo", None) + + valid_prowler_categories = [ + "encryption", + "forensics-ready", + "internet-exposed", + "logging", + "redundancy", + "secrets", + "thread-detection", + "trustboundaries", + "vulnerability-management", + ] + + metadata_placeholder_resource_type = get_metadata_placeholder_resource_type( + metadata.get("Provider") + ) + + prompt_parts = [ + "Your task is to fill the metadata for a new cybersecurity check in Prowler (a Cloud Security tool).", + "The metadata is a JSON object with the following fields: ", + json.dumps(metadata, indent=2), + "Use the following context sources as inspiration to fill the metadata: ", + context, + "The field CheckType should be filled following the format: 'namespace/category/classifier', where namespace, category, and classifier are the values from the following dict: ", + json.dumps( + get_metadata_valid_check_type(metadata.get("Provider")), indent=2 + ), + "One example of a valid CheckType value is: 'Software and Configuration Checks/Vulnerabilities/CVE'. If you don't have a valid value for CheckType, you can leave it empty.", + f"The field ResourceType must be one of the following values (if there is not a valid value, you can put '{metadata_placeholder_resource_type}'): ", + ", ".join(get_metadata_valid_resource_type(metadata.get("Provider"))), + "If you don't have a valid value for ResourceType, you can leave it empty.", + f"The field Category must be one or more of the following values: {', '.join(valid_prowler_categories)}.", + "I need the answer only with JSON formatted text.", + ] + return prompt_parts + + def generate_check(self, check_name: str, context: str) -> str: + """Fill the check with Gemini AI.""" + check = "" + + prompt_parts = self._prepare_check_prompt(check_name, context) + check = ( + self._generate_content(prompt_parts) + .replace("python", "") + .replace("```", "") + .strip() + ) + + return check + + def generate_test(self, check_name: str): + """Fill the test with Gemini AI.""" + test = "" + + prompt_parts = self._prepare_test_prompt( + check_name, + ) + test = ( + self._generate_content(prompt_parts) + .replace("python", "") + .replace("```", "") + .strip() + ) + + return test + + def generate_metadata(self, metadata: dict, context: str) -> dict: + """Fill the metadata with Gemini AI.""" + if not metadata: + return {} + + prompt_parts = self._prepare_metadata_prompt(metadata, context) + filled_metadata_json = self._generate_content(prompt_parts) + + # Parse the generated JSON and re-add the removed fields + filled_metadata = json.loads( + filled_metadata_json.replace("\n", "") + .replace("json", "") + .replace("JSON", "") + .replace("```", "") + .strip() + ) + + # Add the removed fields back in the same order + filled_metadata["Remediation"]["Code"]["NativeIaC"] = "" + filled_metadata["Remediation"]["Code"]["Other"] = "" + filled_metadata["Remediation"]["Code"]["Terraform"] = "" + + # Insert key SubServiceName after ServiceName key and RelatedTo and DependsOn just before Notes key + + ordered_filled_metadata = {} + + for key, value in filled_metadata.items(): + if key == "Notes": + ordered_filled_metadata["DependsOn"] = [] + ordered_filled_metadata["RelatedTo"] = [] + ordered_filled_metadata[key] = value + if key == "ServiceName": + ordered_filled_metadata["SubServiceName"] = "" + + # Check that resource type is valid + if filled_metadata["ResourceType"]: + valid_resource_types = get_metadata_valid_resource_type( + filled_metadata["Provider"] + ) + if filled_metadata["ResourceType"] not in valid_resource_types: + ordered_filled_metadata["ResourceType"] = "Other" + + return ordered_filled_metadata diff --git a/util/prowler_check_kreator/lib/metadata_types.py b/util/prowler_check_kreator/lib/metadata_types.py new file mode 100644 index 00000000000..9491472e9e7 --- /dev/null +++ b/util/prowler_check_kreator/lib/metadata_types.py @@ -0,0 +1,246 @@ +def get_metadata_valid_check_type(provider: str = "aws") -> list: + """Get the valid check types for the provider + + Args: + provider: The Prowler provider. + + Returns: + A list of valid check types for the given provider. + """ + check_types = [] + + if provider == "aws": + check_types = [ + { + "namespace": "Software and Configuration Checks", + "children": [ + { + "category": "Vulnerabilities", + "children": [{"classifier": "CVE"}], + }, + { + "category": "AWS Security Best Practices", + "children": [ + {"classifier": "Network Reachability"}, + {"classifier": "Runtime Behavior Analysis"}, + ], + }, + { + "category": "Industry and Regulatory Standards", + "children": [ + {"classifier": "AWS Foundational Security Best Practices"}, + {"classifier": "CIS Host Hardening Benchmarks"}, + {"classifier": "CIS AWS Foundations Benchmark"}, + {"classifier": "PCI-DSS"}, + {"classifier": "Cloud Security Alliance Controls"}, + {"classifier": "ISO 90001 Controls"}, + {"classifier": "ISO 27001 Controls"}, + {"classifier": "ISO 27017 Controls"}, + {"classifier": "ISO 27018 Controls"}, + {"classifier": "SOC 1"}, + {"classifier": "SOC 2"}, + {"classifier": "HIPAA Controls (USA)"}, + {"classifier": "NIST 800-53 Controls (USA)"}, + {"classifier": "NIST CSF Controls (USA)"}, + {"classifier": "IRAP Controls (Australia)"}, + {"classifier": "K-ISMS Controls (Korea)"}, + {"classifier": "MTCS Controls (Singapore)"}, + {"classifier": "FISC Controls (Japan)"}, + {"classifier": "My Number Act Controls (Japan)"}, + {"classifier": "ENS Controls (Spain)"}, + {"classifier": "Cyber Essentials Plus Controls (UK)"}, + {"classifier": "G-Cloud Controls (UK)"}, + {"classifier": "C5 Controls (Germany)"}, + {"classifier": "IT-Grundschutz Controls (Germany)"}, + {"classifier": "GDPR Controls (Europe)"}, + {"classifier": "TISAX Controls (Europe)"}, + ], + }, + {"category": "Patch Management"}, + ], + }, + { + "namespace": "TTPs", + "children": [ + {"category": "Initial Access"}, + {"category": "Execution"}, + {"category": "Persistence"}, + {"category": "Privilege Escalation"}, + {"category": "Defense Evasion"}, + {"category": "Credential Access"}, + {"category": "Discovery"}, + {"category": "Lateral Movement"}, + {"category": "Collection"}, + {"category": "Command and Control"}, + ], + }, + { + "namespace": "Effects", + "children": [ + {"category": "Data Exposure"}, + {"category": "Data Exfiltration"}, + {"category": "Data Destruction"}, + {"category": "Denial of Service"}, + {"category": "Resource Consumption"}, + ], + }, + { + "namespace": "Unusual Behaviors", + "children": [ + {"category": "Application"}, + {"category": "Network Flow"}, + {"category": "IP address"}, + {"category": "User"}, + {"category": "VM"}, + {"category": "Container"}, + {"category": "Serverless"}, + {"category": "Process"}, + {"category": "Database"}, + {"category": "Data"}, + ], + }, + { + "namespace": "Sensitive Data Identifications", + "children": [ + {"category": "PII"}, + {"category": "Passwords"}, + {"category": "Legal"}, + {"category": "Financial"}, + {"category": "Security"}, + {"category": "Business"}, + ], + }, + ] + + return check_types + + +def get_metadata_valid_resource_type(provider: str = "aws") -> set: + """Get the valid resource types for the provider + + Args: + provider: The Prowler provider. + + Returns: + A set of valid resource types for the given provider. + """ + valid_resource_types = set() + + if provider == "aws": + valid_resource_types = { + "AwsIamAccessKey", + "AwsElbLoadBalancer", + "AwsRedshiftCluster", + "AwsEventsEndpoint", + "AwsElbv2LoadBalancer", + "AwsAutoScalingLaunchConfiguration", + "AwsWafv2RuleGroup", + "AwsWafRegionalRule", + "AwsCloudFrontDistribution", + "AwsWafRegionalWebAcl", + "AwsWafRateBasedRule", + "AwsCertificateManagerCertificate", + "AwsKmsKey", + "AwsDmsEndpoint", + "AwsLambdaLayerVersion", + "AwsIamRole", + "AwsElasticBeanstalkEnvironment", + "AwsBackupBackupPlan", + "AwsEc2ClientVpnEndpoint", + "AwsEcrContainerImage", + "AwsSqsQueue", + "AwsIamGroup", + "AwsOpenSearchServiceDomain", + "AwsApiGatewayV2Api", + "AwsCloudTrailTrail", + "AwsWafWebAcl", + "AwsEc2Subnet", + "AwsEc2VpcPeeringConnection", + "AwsEc2VpcEndpointService", + "AwsCodeBuildProject", + "AwsLambdaFunction", + "AwsNetworkFirewallRuleGroup", + "AwsDmsReplicationInstance", + "AwsRdsEventSubscription", + "AwsCloudWatchAlarm", + "AwsS3AccountPublicAccessBlock", + "AwsWafRegionalRateBasedRule", + "AwsRdsDbInstance", + "AwsEksCluster", + "AwsXrayEncryptionConfig", + "AwsWafv2WebAcl", + "AwsWafRuleGroup", + "AwsBackupBackupVault", + "AwsKinesisStream", + "AwsNetworkFirewallFirewallPolicy", + "AwsEc2NetworkInterface", + "AwsEcsTaskDefinition", + "AwsMskCluster", + "AwsApiGatewayRestApi", + "AwsS3Object", + "AwsRdsDbSnapshot", + "AwsBackupRecoveryPoint", + "AwsWafRule", + "AwsS3AccessPoint", + "AwsApiGatewayV2Stage", + "AwsGuardDutyDetector", + "AwsEfsAccessPoint", + "AwsEcsContainer", + "AwsEcsTask", + "AwsS3Bucket", + "AwsSageMakerNotebookInstance", + "AwsNetworkFirewallFirewall", + "AwsStepFunctionStateMachine", + "AwsIamUser", + "AwsAppSyncGraphQLApi", + "AwsApiGatewayStage", + "AwsEcrRepository", + "AwsEcsService", + "AwsEc2Vpc", + "AwsAmazonMQBroker", + "AwsWafRegionalRuleGroup", + "AwsEventSchemasRegistry", + "AwsRoute53HostedZone", + "AwsEventsEventbus", + "AwsDmsReplicationTask", + "AwsEc2Instance", + "AwsEcsCluster", + "AwsRdsDbSecurityGroup", + "AwsCloudFormationStack", + "AwsSnsTopic", + "AwsDynamoDbTable", + "AwsRdsDbCluster", + "AwsEc2Eip", + "AwsEc2RouteTable", + "AwsEc2TransitGateway", + "AwsElasticSearchDomain", + "AwsEc2LaunchTemplate", + "AwsEc2Volume", + "AwsAthenaWorkGroup", + "AwsSecretsManagerSecret", + "AwsEc2SecurityGroup", + "AwsIamPolicy", + "AwsSsmPatchCompliance", + "AwsAutoScalingAutoScalingGroup", + "AwsEc2NetworkAcl", + "AwsRdsDbClusterSnapshot", + } + + return valid_resource_types + + +def get_metadata_placeholder_resource_type(provider: str = "aws") -> str: + """Get the placeholder for the resource type for the provider + + Args: + provider: The Prowler provider. + + Returns: + A placeholder for the resource type for the given provider. + """ + placeholder = "" + + if provider == "aws": + placeholder = "Other" + + return placeholder diff --git a/util/prowler_check_kreator/lib/templates.py b/util/prowler_check_kreator/lib/templates.py new file mode 100644 index 00000000000..d02a469818d --- /dev/null +++ b/util/prowler_check_kreator/lib/templates.py @@ -0,0 +1,132 @@ +def load_check_template(provider: str, service: str, check_name: str) -> str: + """Load the template for the check file. + + Args: + provider (str): The provider of the service. + service (str): The service to check. + check_name (str): The name of the check. + + Returns: + A check template used when the user does not want to generate the check with AI. + + Raises: + ValueError: If the provider is not implemented yet. + """ + if provider == "aws": + return f""" +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.{service}.{service}_client import {service}_client +from typing import List + + +class {check_name}(Check): + def execute(self) -> List[Check_Report_AWS]: + findings = [] + for , in {service}_client..items(): + report = Check_Report_AWS(self.metadata()) + report.region = .region + report.resource_id = .name + report.resource_arn = + report.resource_tags = .tags + report.status = "FAIL" + report.status_extended = f"..." + + if : + report.status = "PASS" + report.status_extended = f"..." + + findings.append(report) + + return findings +""" + else: + raise ValueError(f"Template for {provider} not implemented yet") + + +def load_test_template(provider: str, service: str, check_name: str) -> str: + """Load the template for the test file. + + Args: + provider: The provider of the service (e.g., "aws"). + service: The service to check (e.g., "s3"). + check_name: The name of the check (e.g., "check_bucket_encryption"). + + Returns: + A test template used when the user does not want to generate the check with AI. + + Raises: + ValueError: If the template for the given provider is not implemented. + """ + if provider == "aws": + return f""" +from unittest import mock + +from boto3 import client +from moto import mock_aws + +from tests.providers.aws.utils import ( + AWS_REGION_EU_WEST_1, + set_mocked_aws_provider, +) + + +class Test_{check_name}: + @mock_aws + def test_(self): + from prowler.providers.aws.services.{service}.{service}_service import + + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.{service}.{check_name}.{check_name}.{service}_client", + new=(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.{service}.{check_name}.{check_name} import ( + {check_name}, + ) + + check = {check_name}() + result = check.execute() + + assert len(result) == 0 + + @mock_aws + def test_one_compliant_{service}(self): + {service}_client = client("{service}", region_name=AWS_REGION_EU_WEST_1) + # Create a compliant resource + + from prowler.providers.aws.services.{service}.{service}_service import + + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.{service}.{check_name}.{check_name}.{service}_client", + new=(aws_provider), + ): + from prowler.providers.aws.services.{service}.{check_name}.{check_name} import ( + {check_name}, + ) + + check = {check_name}() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].status_extended == "..." + assert result[0].region == AWS_REGION_EU_WEST_1 + assert result[0].resource_id == .id + assert ( + result[0].resource_arn + == f"arn:(aws_partition):{service}:(region):(account_id):(resource)" + ) + assert result[0].resource_tags == .tags +""" + else: + raise ValueError(f"Template for {provider} not implemented yet") diff --git a/util/prowler_check_kreator/prowler_check_kreator.py b/util/prowler_check_kreator/prowler_check_kreator.py new file mode 100644 index 00000000000..24fa6140f67 --- /dev/null +++ b/util/prowler_check_kreator/prowler_check_kreator.py @@ -0,0 +1,304 @@ +#!/usr/bin/env python3 +import json +import os +import sys + +from util.prowler_check_kreator.lib.templates import ( + load_check_template, + load_test_template, +) + + +class ProwlerCheckKreator: + def __init__(self, provider: str, check_name: str): + # Validate provider + + SUPPORTED_PROVIDERS = {"aws"} + + if provider in SUPPORTED_PROVIDERS: + self._provider = provider + else: + raise ValueError( + f"Invalid provider. Supported providers: {', '.join(SUPPORTED_PROVIDERS)}" + ) + + # Find the Prowler folder + self._prowler_folder = os.path.abspath( + os.path.join(os.path.dirname(__file__), os.pardir, os.pardir) + ) + + # Validate if service exists for the selected provider + service_name = check_name.split("_")[0] + + service_path = os.path.join( + self._prowler_folder, + "prowler/providers/", + provider, + "services/", + service_name, + ) + + if os.path.exists(service_path): + self._service_name = service_name + else: + raise ValueError( + f"Service {service_name} does not exist for {provider}. Please introduce a valid service" + ) + + # Ask user if want to use Gemini for all the process + + user_input = ( + input( + "Do you want to use Gemini to create the check and metadata? Type 'yes'/'no' and press enter: " + ) + .strip() + .lower() + ) + + if user_input == "yes": + # Let the user to use the model that he wants + supported_models = [ + "gemini-1.5-flash", + "gemini-1.5-pro", + "gemini-1.0-pro", + ] + + print("Select the model that you want to use:") + for i, model in enumerate(supported_models): + print(f"{i + 1}. {model}") + + user_input = input( + "Type the number of the model and press enter (default is 1): " + ).strip() + + if not user_input: + model_index = 1 + else: + model_index = int(user_input) + + if model_index < 1 or model_index > len(supported_models): + raise ValueError("Invalid model selected.") + + model_name = supported_models[model_index - 1] + + if "gemini" in model_name: + from util.prowler_check_kreator.lib.llms.gemini import Gemini + + self._model = Gemini(model_name) + + # Provide some context about the check to create + self._context = ( + input( + "Please provide some context to generate the check and metadata:\n" + ) + ).strip() + + else: + raise ValueError("Invalid model selected.") + elif user_input == "no": + self._model = None + self._context = "" + else: + raise ValueError("Invalid input. Please type 'yes' or 'no'.") + + if not self._check_exists(check_name): + self._check_name = check_name + self._check_path = os.path.join( + self._prowler_folder, + "prowler/providers/", + provider, + "services/", + service_name, + check_name, + ) + else: + # Check already exists, give the user the possibility to continue or not + user_input = ( + input( + f"Some files of {check_name} already exists. Do you want to continue and overwrite it? Type 'yes' if you want to continue: " + ) + .strip() + .lower() + ) + + if user_input == "yes": + self._check_name = check_name + self._check_path = os.path.join( + self._prowler_folder, + "prowler/providers/", + provider, + "services/", + service_name, + check_name, + ) + else: + raise ValueError(f"Check {check_name} already exists.") + + def kreate_check(self) -> None: + """Create a new check in Prowler""" + + # Create the check + print(f"Creating check {self._check_name} for {self._provider}") + + # Inside the check folder, create the check files: __init__.py, check_name.py, and check_name.metadata.json + os.makedirs(self._check_path, exist_ok=True) + + with open(os.path.join(self._check_path, "__init__.py"), "w") as f: + f.write("") + + self._write_check_file() + self._write_metadata_file() + + # Create test directory if it does not exist + test_folder = os.path.join( + self._prowler_folder, + "tests/providers/", + self._provider, + "services/", + self._service_name, + self._check_name, + ) + + os.makedirs(test_folder, exist_ok=True) + + self._write_test_file() + + print(f"Check {self._check_name} created successfully") + + def _check_exists(self, check_name: str) -> bool: + """Ensure if any file related to the check already exists. + + Args: + check_name: The name of the check. + + Returns: + True if the check already exists, False otherwise. + """ + + # Get the check path + check_path = os.path.join( + self._prowler_folder, + "prowler/providers/", + self._provider, + "services/", + self._service_name, + check_name, + ) + + # Get the test path + _test_path = os.path.join( + self._prowler_folder, + "tests/providers/", + self._provider, + "services/", + self._service_name, + check_name, + ) + + # Check if exits check.py, check_metadata.json or check_test.py + return ( + os.path.exists(check_path) + or os.path.exists(os.path.join(check_path, "__init__.py")) + or os.path.exists(os.path.join(check_path, f"{check_name}.py")) + or os.path.exists(os.path.join(check_path, f"{check_name}.metadata.json")) + or os.path.exists(_test_path) + ) + + def _write_check_file(self) -> None: + """Write the check file""" + + if self._model is None: + check_content = load_check_template( + self._provider, self._service_name, self._check_name + ) + else: + check_content = self._model.generate_check( + check_name=self._check_name, context=self._context + ) + + with open(os.path.join(self._check_path, f"{self._check_name}.py"), "w") as f: + f.write(check_content) + + def _write_metadata_file(self) -> None: + """Write the metadata file""" + + metadata_template = { + "Provider": self._provider, + "CheckID": self._check_name, + "CheckTitle": "", + "CheckType": [], + "ServiceName": self._service_name, + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "", + "ResourceType": "", + "Description": "", + "Risk": "", + "RelatedUrl": "", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "", + }, + "Recommendation": {"Text": "", "Url": ""}, + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + } + + if self._model is None: + filled_metadata = metadata_template + else: + filled_metadata = self._model.generate_metadata( + metadata_template, self._context + ) + + with open( + os.path.join(self._check_path, f"{self._check_name}.metadata.json"), "w" + ) as f: + f.write(json.dumps(filled_metadata, indent=2)) + + def _write_test_file(self) -> None: + """Write the test file""" + + test_folder = os.path.join( + self._prowler_folder, + "tests/providers/", + self._provider, + "services/", + self._service_name, + self._check_name, + ) + + if self._model is None: + test_template = load_test_template( + self._provider, self._service_name, self._check_name + ) + else: + test_template = self._model.generate_test(self._check_name) + + with open(os.path.join(test_folder, f"{self._check_name}_test.py"), "w") as f: + f.write(test_template) + + +if __name__ == "__main__": + try: + if len(sys.argv) != 3: + raise ValueError( + "Invalid arguments. Usage: python prowler_check_kreator.py " + ) + + prowler_check_creator = ProwlerCheckKreator(sys.argv[1], sys.argv[2]) + + sys.exit(prowler_check_creator.kreate_check()) + + except ValueError as e: + print(f"Error: {e}") + sys.exit(1) + except Exception as e: + print(f"Unexpected error: {e}") + sys.exit(1)