From 3dfaa7740f554f6136634d6486d26bb4d053c039 Mon Sep 17 00:00:00 2001 From: Marcus Burghardt Date: Thu, 9 Jan 2025 11:36:24 +0100 Subject: [PATCH 01/12] Introduce functions to process profiles selections This commits move all profile related rules from variables.py to a new file called profiles.py and update the profile functions to make them more generic and reusable. This also simplifies the variables.py files which is now only used to process variables files, options and respective values. Signed-off-by: Marcus Burghardt --- ssg/profiles.py | 249 +++++++++++++++++++++++++++++++++++++++++++++++ ssg/variables.py | 235 +------------------------------------------- 2 files changed, 252 insertions(+), 232 deletions(-) create mode 100644 ssg/profiles.py diff --git a/ssg/profiles.py b/ssg/profiles.py new file mode 100644 index 00000000000..8c77b05517e --- /dev/null +++ b/ssg/profiles.py @@ -0,0 +1,249 @@ +from __future__ import absolute_import +from __future__ import print_function + +import os +import sys +import yaml + +from .controls import ControlsManager +from .products import ( + get_profile_files_from_root, + load_product_yaml, + product_yaml_path, +) + +if sys.version_info >= (3, 9): + list_type = list # Python 3.9+ supports built-in generics + dict_type = dict +else: + from typing import List as list_type # Fallback for older versions + from typing import Dict as dict_type + +class ProfileSelections: + """ + A class to represent profile with sections of rules and variables. + + Attributes: + ----------- + profile_id : str + The unique identifier for the profile. + product : str + The product associated with the profile. + variables : dict + A dictionary containing the variables for the profile. + """ + def __init__(self, profile_id, profile_title, product): + self.profile_id = profile_id + self.profile_title = profile_title + self.product = product + self.rules = [] + self.unselected_rules = [] + self.variables = {} + + +def _load_product_yaml(content_dir: str, product: str) -> object: + """ + Load the product YAML file and return its content as a Python object. + + Args: + content_dir (str): The directory where the content is stored. + product (str): The name of the product. + + Returns: + object: The loaded YAML content as a Python object. + """ + file_yaml_path = product_yaml_path(content_dir, product) + return load_product_yaml(file_yaml_path) + + +def _load_yaml_profile_file(file_path: str) -> dict_type: + """ + Load the content of a YAML file intended to profiles definitions. + + It is not necessary to process macros in this case. + + Args: + file_path (str): The path to the YAML file. + + Returns: + dict: The content of the YAML file as a dictionary. + """ + with open(file_path, 'r') as file: + try: + return yaml.safe_load(file) + except yaml.YAMLError as e: + print(f"Error loading YAML profile file {file_path}: {e}") + return {} + + +def _get_extended_profile_path(profiles_files: list, profile_name: str) -> str: + """ + Retrieve the full path of a profile file from a list of profile file paths. + + Args: + profiles_files (list of str): A list of file paths where profile files are located. + profile_name (str): The name of the profile to search for. + + Returns: + str: The full path of the profile file if found, otherwise None. + """ + profile_file = f"{profile_name}.profile" + profile_path = next((path for path in profiles_files if profile_file in path), None) + return profile_path + + +def _process_profile_extension(profile: ProfileSelections, profile_yaml: dict, + profiles_files: list, policies: dict) -> ProfileSelections: + """ + Processes the extension of a profile by recursively checking if the profile extends another + profile and updating the profile selections accordingly. + + Args: + profile (ProfileSelections): The profile object to be processed. + profile_yaml (dict): The YAML content of the current profile. + profiles_files (list): List of profile file paths. + policies (dict): The policies defined in the current profile. + + Returns: + ProfileSelections: The updated profile object. + """ + extended_profile = profile_yaml.get("extends") + if isinstance(extended_profile, str): + extended_profile = _get_extended_profile_path(profiles_files, extended_profile) + if extended_profile is not None: + profile_yaml = _load_yaml_profile_file(extended_profile) + return _process_profile(profile, profile_yaml, profiles_files, policies) + return profile + + +def _process_controls(profile: ProfileSelections, control_line: str, policies: dict) -> ProfileSelections: + """ + Process a control file inheritance to update profile selections based on the given policies. + + Args: + profile (ProfileSelections): The profile object to be processed. + control_line (str): A string representing the control line, which contains a policy ID and + optionally a level, separated by colons. + policies (dict): A dictionary of policies, where each key is a policy ID and each value is + a policy object containing the controls. + + Returns: + ProfileSelections: The updated profile object. + + Raises: + KeyError: If the policy ID from the control line is not found in the policies dictionary. + """ + if control_line.count(":") == 2: + policy_id, _, level = control_line.split(":") + else: + policy_id, _ = control_line.split(":") + level = None + + try: + policy = policies[policy_id] + except KeyError: + print(f"Policy {policy_id} not found") + return profile + + for control in policy.controls: + if level in control.levels: + for rule in control.rules: + if "=" in rule: + variable_name, variable_value = rule.split('=', 1) + # When a profile extends a control file, the variables explicitly defined in + # profiles files must be honored, so don't update variables already defined. + if variable_name not in profile.variables: + profile.variables[variable_name] = variable_value + elif rule not in profile.unselected_rules and rule not in profile.rules: + profile.rules.append(rule) + return profile + + +def _process_selections(profile: ProfileSelections, profile_yaml: dict, policies: dict) -> ProfileSelections: + """ + Processes the selections from the profile YAML and updates the profile accordingly. + + Args: + profile (ProfileSelections): The profile object to be processed. + profile_yaml (dict): A dictionary containing the profile YAML data. + policies (dict): A dictionary containing policy information. + + Returns: + profile: The updated profile object. + """ + selections = profile_yaml.get("selections") + for selected in selections: + if selected.startswith("!"): + profile.unselected_rules.append(selected[1:]) + elif "=" in selected: + variable_name, variable_value = selected.split('=', 1) + profile.variables[variable_name] = variable_value + elif ":" in selected: + profile = _process_controls(profile, selected, policies) + else: + profile.rules.append(selected) + return profile + + +def _process_profile(profile: ProfileSelections, profile_yaml: dict, profiles_files: list, + policies: dict) -> ProfileSelections: + """ + Processes a profile by handling profile extensions, and processing selections. + + Args: + profile (ProfileSelections): The profile object to be processed. + profile_yaml (dict): The YAML content of the profile. + profiles_files (list): A list of profile file paths. + policies (dict): A dictionary of policies defined by control files. + + Returns: + ProfileSelections: The processed profile object. + """ + profile = _process_profile_extension(profile, profile_yaml, profiles_files, policies) + profile = _process_selections(profile, profile_yaml, policies) + return profile + + +def _load_controls_manager(controls_dir: str, product_yaml: dict) -> object: + """ + Loads and initializes a ControlsManager instance. + + Args: + controls_dir (str): The directory containing control files. + product_yaml (dict): The product configuration in YAML format. + + Returns: + object: An instance of ControlsManager with loaded controls. + """ + control_mgr = ControlsManager(controls_dir, product_yaml) + control_mgr.load() + return control_mgr + + +def get_profiles_from_products(content_dir: str, products: list) -> list_type: + """ + Retrieves profiles with respective variables from the given products. + + Args: + content_dir (str): The directory containing the content. + products (list): A list of product names to retrieve profiles from. + + Returns: + list: A list of ProfileVariables objects containing profile variables for each product. + """ + profiles = [] + controls_dir = os.path.join(content_dir, 'controls') + + for product in products: + product_yaml = _load_product_yaml(content_dir, product) + profiles_files = get_profile_files_from_root(product_yaml, product_yaml) + controls_manager = _load_controls_manager(controls_dir, product_yaml) + for file in profiles_files: + profile_id = os.path.basename(file).split('.profile')[0] + profile_yaml = _load_yaml_profile_file(file) + profile_title = profile_yaml.get("title") + profile = ProfileSelections(profile_id, profile_title, product) + profile = _process_profile(profile, profile_yaml, profiles_files, controls_manager.policies) + profiles.append(profile) + + return profiles diff --git a/ssg/variables.py b/ssg/variables.py index 66757e01df1..92d81205199 100644 --- a/ssg/variables.py +++ b/ssg/variables.py @@ -4,16 +4,10 @@ import glob import os import sys -import yaml from collections import defaultdict from .constants import BENCHMARKS -from .controls import ControlsManager -from .products import ( - get_profile_files_from_root, - load_product_yaml, - product_yaml_path, -) +from .profiles import get_profiles_from_products from .yaml import open_and_macro_expand @@ -111,235 +105,12 @@ def get_variable_options(content_dir: str, variable_id: str = None) -> dict_type return all_options -class ProfileVariables: - """ - A class to represent profile variables. - - Attributes: - ----------- - profile_id : str - The unique identifier for the profile. - product : str - The product associated with the profile. - variables : dict - A dictionary containing the variables for the profile. - """ - def __init__(self, profile_id, product, variables): - self.profile_id = profile_id - self.product = product - self.variables = variables - - -def _load_product_yaml(content_dir: str, product: str) -> object: - """ - Load the product YAML file and return its content as a Python object. - - Args: - content_dir (str): The directory where the content is stored. - product (str): The name of the product. - - Returns: - object: The loaded YAML content as a Python object. - """ - file_yaml_path = product_yaml_path(content_dir, product) - return load_product_yaml(file_yaml_path) - - -def _load_yaml_profile_file(file_path: str) -> dict_type: - """ - Load the content of a YAML file intended to profiles definitions. - - It is not necessary to process macros in this case. - - Args: - file_path (str): The path to the YAML file. - - Returns: - dict: The content of the YAML file as a dictionary. - """ - with open(file_path, 'r') as file: - try: - return yaml.safe_load(file) - except yaml.YAMLError as e: - print(f"Error loading YAML profile file {file_path}: {e}") - return {} - - -def _get_extended_profile_path(profiles_files: list, profile_name: str) -> str: - """ - Retrieve the full path of a profile file from a list of profile file paths. - - Args: - profiles_files (list of str): A list of file paths where profile files are located. - profile_name (str): The name of the profile to search for. - - Returns: - str: The full path of the profile file if found, otherwise None. - """ - profile_file = f"{profile_name}.profile" - profile_path = next((path for path in profiles_files if profile_file in path), None) - return profile_path - - -def _process_profile_extension(profiles_files: list, profile_yaml: dict, - profile_variables: dict, policies: dict) -> dict_type: - """ - Processes the extension of a profile by recursively checking if the profile extends another - profile and updating the profile variables accordingly. - - Args: - profiles_files (list): List of profile file paths. - profile_yaml (dict): The YAML content of the current profile. - profile_variables (dict): The variables already defined in the current profile. - policies (dict): The policies defined in the current profile. - - Returns: - dict: The updated profile variables after processing the extended profile, - or the original profile variables if no extension is found. - """ - extended_profile = profile_yaml.get("extends") - if isinstance(extended_profile, str): - extended_profile = _get_extended_profile_path(profiles_files, extended_profile) - if extended_profile is not None: - return _process_profile(profiles_files, extended_profile, policies, profile_variables) - return profile_variables - - -def _process_controls(control_line: str, profile_variables: dict, policies: dict) -> dict_type: - """ - Process a control file inheritance to update profile variables based on the given policies. - - Args: - control_line (str): A string representing the control line, which contains a policy ID and - optionally a level, separated by colons. - profile_variables (dict): A dictionary of profile variables to be updated. - policies (dict): A dictionary of policies, where each key is a policy ID and each value is - a policy object containing the controls. - - Returns: - dict: The updated profile variables dictionary. - - Raises: - KeyError: If the policy ID from the control line is not found in the policies dictionary. - """ - if control_line.count(":") == 2: - policy_id, _, level = control_line.split(":") - else: - policy_id, _ = control_line.split(":") - level = None - - try: - policy = policies[policy_id] - except KeyError: - print(f"Policy {policy_id} not found") - return profile_variables - - for control in policy.controls: - if level in control.levels: - for rule in control.rules: - if "=" in rule: - variable_name, variable_value = rule.split('=', 1) - # When a profile extends a control file, the variables explicitly defined in - # profiles files must be honored, so don't update variables already defined. - if variable_name not in profile_variables: - profile_variables[variable_name] = variable_value - return profile_variables - - -def _process_selections(profile_yaml: dict, profile_variables: dict, policies: dict) -> dict_type: - """ - Processes the selections from the profile YAML and updates the profile variables accordingly. - - Args: - profile_yaml (dict): A dictionary containing the profile YAML data. - profile_variables (dict): A dictionary to store the profile variables. - policies (dict): A dictionary containing policy information. - - Returns: - dict: The updated profile variables dictionary. - """ - selections = profile_yaml.get("selections") - for selected in selections: - if "=" in selected and "!" not in selected: - variable_name, variable_value = selected.split('=', 1) - profile_variables[variable_name] = variable_value - elif ":" in selected: - profile_variables = _process_controls(selected, profile_variables, policies) - return profile_variables - - -def _process_profile(profiles_files: list, file: str, policies: dict, - profile_variables={}) -> dict_type: - """ - Processes a profile by loading its YAML file, handling profile extensions, and processing - selections. - - Args: - profiles_files (list): A list of profile file paths. - file (str): The path to the profile file to be processed. - policies (dict): A dictionary of policies defined by control files. - profile_variables (dict, optional): A dictionary of profile variables. Defaults to empty. - - Returns: - dict: A dictionary containing the processed profile variables. - """ - profile_yaml = _load_yaml_profile_file(file) - profile_variables = _process_profile_extension(profiles_files, profile_yaml, - profile_variables, policies) - profile_variables = _process_selections(profile_yaml, profile_variables, policies) - return profile_variables - - -def _load_controls_manager(controls_dir: str, product_yaml: dict) -> object: - """ - Loads and initializes a ControlsManager instance. - - Args: - controls_dir (str): The directory containing control files. - product_yaml (dict): The product configuration in YAML format. - - Returns: - object: An instance of ControlsManager with loaded controls. - """ - control_mgr = ControlsManager(controls_dir, product_yaml) - control_mgr.load() - return control_mgr - - -def _get_profiles_from_products(content_dir: str, products: list) -> list_type: - """ - Retrieves profiles with respective variables from the given products. - - Args: - content_dir (str): The directory containing the content. - products (list): A list of product names to retrieve profiles from. - - Returns: - list: A list of ProfileVariables objects containing profile variables for each product. - """ - profiles = [] - controls_dir = os.path.join(content_dir, 'controls') - - for product in products: - product_yaml = _load_product_yaml(content_dir, product) - profiles_files = get_profile_files_from_root(product_yaml, product_yaml) - controls_manager = _load_controls_manager(controls_dir, product_yaml) - for file in profiles_files: - profile_id = os.path.basename(file).split('.profile')[0] - profile_variables = _process_profile(profiles_files, file, controls_manager.policies) - profile = ProfileVariables(profile_id, product, profile_variables) - profiles.append(profile) - - return profiles - - def _get_variables_from_profiles(profiles: list) -> dict_type: """ Extracts variables from a list of profiles and organizes them into a nested dictionary. Args: - profiles (list): A list of profile objects, each containing variables, product, and id - attributes. + profiles (list): A list of profile objects, each containing selections and id attributes. Returns: dict: A nested dictionary where the first level keys are variable names, the second level @@ -383,7 +154,7 @@ def get_variables_by_products(content_dir: str, products: list) -> dict_type[str dict: A dictionary where keys are variable names and values are dictionaries of product-profile pairs. """ - profiles = _get_profiles_from_products(content_dir, products) + profiles = get_profiles_from_products(content_dir, products) profiles_variables = _get_variables_from_profiles(profiles) return _convert_defaultdict_to_dict(profiles_variables) From 351313cf7fd909be46457bbcda48b2997af8e27e Mon Sep 17 00:00:00 2001 From: Marcus Burghardt Date: Thu, 9 Jan 2025 12:10:14 +0100 Subject: [PATCH 02/12] Fix criteria to process controls rules It was not considering the case when a profile inherits all levels of a control file, which is also a common case. Signed-off-by: Marcus Burghardt --- ssg/profiles.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ssg/profiles.py b/ssg/profiles.py index 8c77b05517e..116694150c4 100644 --- a/ssg/profiles.py +++ b/ssg/profiles.py @@ -137,7 +137,7 @@ def _process_controls(profile: ProfileSelections, control_line: str, policies: d policy_id, _, level = control_line.split(":") else: policy_id, _ = control_line.split(":") - level = None + level = 'all' try: policy = policies[policy_id] @@ -146,7 +146,7 @@ def _process_controls(profile: ProfileSelections, control_line: str, policies: d return profile for control in policy.controls: - if level in control.levels: + if level == 'all' or level in control.levels: for rule in control.rules: if "=" in rule: variable_name, variable_value = rule.split('=', 1) From b885e2f19763aa8ef949f0980ba0cc6c11df5eb4 Mon Sep 17 00:00:00 2001 From: Marcus Burghardt Date: Thu, 9 Jan 2025 12:20:52 +0100 Subject: [PATCH 03/12] Include function to allow sorted selections It may be cases where the integration may or may not benefit of sorted selections, so the capability was included as optional where the default is false, meaning the result will be ordered by the original order they where defined in profiles and control files. Signed-off-by: Marcus Burghardt --- ssg/profiles.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/ssg/profiles.py b/ssg/profiles.py index 116694150c4..723be94cb48 100644 --- a/ssg/profiles.py +++ b/ssg/profiles.py @@ -220,7 +220,24 @@ def _load_controls_manager(controls_dir: str, product_yaml: dict) -> object: return control_mgr -def get_profiles_from_products(content_dir: str, products: list) -> list_type: +def _sort_profiles_selections(profiles: list) -> ProfileSelections: + """ + Sorts profiles selections (rules and variables) by selections ids. + + Args: + profiles (list): A list of ProfileSelections objects to be sorted. + + Returns: + ProfileSelections: The sorted list of ProfileSelections objects. + """ + for profile in profiles: + profile.rules = sorted(profile.rules) + profile.unselected_rules = sorted(profile.unselected_rules) + profile.variables = dict(sorted(profile.variables.items())) + return profiles + + +def get_profiles_from_products(content_dir: str, products: list, sorted: bool = False) -> list_type: """ Retrieves profiles with respective variables from the given products. @@ -246,4 +263,7 @@ def get_profiles_from_products(content_dir: str, products: list) -> list_type: profile = _process_profile(profile, profile_yaml, profiles_files, controls_manager.policies) profiles.append(profile) + if sorted: + profiles = _sort_profiles_selections(profiles) + return profiles From b882b85ac10f13446d64b826301c250a42fe652e Mon Sep 17 00:00:00 2001 From: Marcus Burghardt Date: Thu, 9 Jan 2025 12:57:10 +0100 Subject: [PATCH 04/12] Allow more efficient to process variables With the introduction of profiles.py, it sounds more straight forward in some cases to collect all profiles selections first and then only work with the variables options and values. So the function get_variables_from_profiles is now external to optmize cases where profiles selections were already obtained. Signed-off-by: Marcus Burghardt --- ssg/variables.py | 7 ++--- tests/unit/ssg-module/test_variables.py | 35 +++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/ssg/variables.py b/ssg/variables.py index 92d81205199..5871aaecd6c 100644 --- a/ssg/variables.py +++ b/ssg/variables.py @@ -105,7 +105,7 @@ def get_variable_options(content_dir: str, variable_id: str = None) -> dict_type return all_options -def _get_variables_from_profiles(profiles: list) -> dict_type: +def get_variables_from_profiles(profiles: list) -> dict_type: """ Extracts variables from a list of profiles and organizes them into a nested dictionary. @@ -144,7 +144,8 @@ def get_variables_by_products(content_dir: str, products: list) -> dict_type[str Retrieve variables by products from the specified content root directory. This function collects profiles for the given products and extracts variables from these - profiles. + profiles. If you already have a list of Profiles obtained by get_profiles_from_products() + defined in profiles.py, consider to use get_variables_from_profiles() instead. Args: content_dir (str): The root directory of the content. @@ -155,7 +156,7 @@ def get_variables_by_products(content_dir: str, products: list) -> dict_type[str product-profile pairs. """ profiles = get_profiles_from_products(content_dir, products) - profiles_variables = _get_variables_from_profiles(profiles) + profiles_variables = get_variables_from_profiles(profiles) return _convert_defaultdict_to_dict(profiles_variables) diff --git a/tests/unit/ssg-module/test_variables.py b/tests/unit/ssg-module/test_variables.py index 89e8c77a70b..91fb752bca4 100644 --- a/tests/unit/ssg-module/test_variables.py +++ b/tests/unit/ssg-module/test_variables.py @@ -7,6 +7,7 @@ get_variable_files, get_variable_options, get_variables_by_products, + get_variables_from_profiles, get_variable_values, ) @@ -79,3 +80,37 @@ def test_get_variable_values(tmp_path): result = get_variable_values(str(content_dir), profiles_variables) assert result["test"]["product1"]["profile1"] == "value1" assert result["test"]["product2"]["profile2"] == "value2" + + +def test_get_variables_from_profiles(): + class MockProfile: + def __init__(self, product, profile_id, variables): + self.product = product + self.profile_id = profile_id + self.variables = variables + + profiles = [ + MockProfile("product1", "profile1", {"var1": "value1", "var2": "value2"}), + MockProfile("product1", "profile2", {"var1": "value3"}), + MockProfile("product2", "profile1", {"var2": "value4"}), + ] + + expected_result = { + "var1": { + "product1": { + "profile1": "value1", + "profile2": "value3", + } + }, + "var2": { + "product1": { + "profile1": "value2", + }, + "product2": { + "profile1": "value4", + } + } + } + + result = get_variables_from_profiles(profiles) + assert result == expected_result From 4369acfd91acc9214ea0e8fee053f0faa6ae4cc1 Mon Sep 17 00:00:00 2001 From: Marcus Burghardt Date: Thu, 9 Jan 2025 14:26:52 +0100 Subject: [PATCH 05/12] Include unit tests for new external function It was tricky to mock a test scenario for this function without creating a very complex test. That was the reason I opted to use real content taking into account the predictability of the project, which I considered enough for this particular unit test. Signed-off-by: Marcus Burghardt --- tests/unit/ssg-module/test_profiles.py | 40 ++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 tests/unit/ssg-module/test_profiles.py diff --git a/tests/unit/ssg-module/test_profiles.py b/tests/unit/ssg-module/test_profiles.py new file mode 100644 index 00000000000..26a81296884 --- /dev/null +++ b/tests/unit/ssg-module/test_profiles.py @@ -0,0 +1,40 @@ +import os +import pytest + +from ssg.products import ( + get_all, + get_profile_files_from_root, +) +from ssg.profiles import ( + get_profiles_from_products, + _load_product_yaml, +) + + +# The get_profiles_from_products function interacts with many objects and other functions that +# would be complex to mock. So it will be tested with a real content directory. To make it +# predictable, all existing products will be collected and the first rhel product will be used +# for testing. The decision to use a rhel product is that I am more used to them and I know their +# profiles also use control files. +content_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")) + +def get_first_rhel_product_from_products_dir(): + products = get_all(content_dir) + rhel_products = [product for product in products.linux if "rhel" in product] + return rhel_products[0] + + +def count_profiles_in_products_dir(product): + product_yaml = _load_product_yaml(content_dir, product) + profiles_files = get_profile_files_from_root(product_yaml, product_yaml) + return len(profiles_files) + + +def test_get_profiles_from_products(): + products = [get_first_rhel_product_from_products_dir()] + profiles = get_profiles_from_products(content_dir, products, sorted=True) + + assert len(profiles) == count_profiles_in_products_dir(products[0]) + assert 'rhel' in profiles[0].product + assert len(profiles[0].rules) > 0 + assert len(profiles[0].variables) > 0 From 43a93955be69d96c705c9fb2f089afccc273bc1d Mon Sep 17 00:00:00 2001 From: Marcus Burghardt Date: Thu, 9 Jan 2025 14:58:36 +0100 Subject: [PATCH 06/12] Fix TypeError since profiles have no selections The merit of this commit is for @qduanmu. She discovered and fixed this in PR#12792 but I was refactoring the library in the meantime, so it was incorporated here. Signed-off-by: Marcus Burghardt --- ssg/profiles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ssg/profiles.py b/ssg/profiles.py index 723be94cb48..b71f02a47db 100644 --- a/ssg/profiles.py +++ b/ssg/profiles.py @@ -171,7 +171,7 @@ def _process_selections(profile: ProfileSelections, profile_yaml: dict, policies Returns: profile: The updated profile object. """ - selections = profile_yaml.get("selections") + selections = profile_yaml.get("selections", []) for selected in selections: if selected.startswith("!"): profile.unselected_rules.append(selected[1:]) From be49ad88b0a5482f6a21d598902ed483a5dc1691 Mon Sep 17 00:00:00 2001 From: Marcus Burghardt Date: Thu, 9 Jan 2025 16:37:43 +0100 Subject: [PATCH 07/12] Minor updates in alignment to project style guide Signed-off-by: Marcus Burghardt --- ssg/profiles.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/ssg/profiles.py b/ssg/profiles.py index b71f02a47db..dace934d9b2 100644 --- a/ssg/profiles.py +++ b/ssg/profiles.py @@ -19,6 +19,7 @@ from typing import List as list_type # Fallback for older versions from typing import Dict as dict_type + class ProfileSelections: """ A class to represent profile with sections of rules and variables. @@ -116,7 +117,8 @@ def _process_profile_extension(profile: ProfileSelections, profile_yaml: dict, return profile -def _process_controls(profile: ProfileSelections, control_line: str, policies: dict) -> ProfileSelections: +def _process_controls(profile: ProfileSelections, control_line: str, + policies: dict) -> ProfileSelections: """ Process a control file inheritance to update profile selections based on the given policies. @@ -159,7 +161,8 @@ def _process_controls(profile: ProfileSelections, control_line: str, policies: d return profile -def _process_selections(profile: ProfileSelections, profile_yaml: dict, policies: dict) -> ProfileSelections: +def _process_selections(profile: ProfileSelections, profile_yaml: dict, + policies: dict) -> ProfileSelections: """ Processes the selections from the profile YAML and updates the profile accordingly. @@ -237,7 +240,8 @@ def _sort_profiles_selections(profiles: list) -> ProfileSelections: return profiles -def get_profiles_from_products(content_dir: str, products: list, sorted: bool = False) -> list_type: +def get_profiles_from_products(content_dir: str, products: list, + sorted: bool = False) -> list_type: """ Retrieves profiles with respective variables from the given products. @@ -260,7 +264,8 @@ def get_profiles_from_products(content_dir: str, products: list, sorted: bool = profile_yaml = _load_yaml_profile_file(file) profile_title = profile_yaml.get("title") profile = ProfileSelections(profile_id, profile_title, product) - profile = _process_profile(profile, profile_yaml, profiles_files, controls_manager.policies) + profile = _process_profile(profile, profile_yaml, profiles_files, + controls_manager.policies) profiles.append(profile) if sorted: From 7e7e11cc717d31e4fb771c49c4078c7f50931369 Mon Sep 17 00:00:00 2001 From: Marcus Burghardt Date: Thu, 9 Jan 2025 17:09:37 +0100 Subject: [PATCH 08/12] Break the _process_controls to improve readability It was broken in smaller functions for a better readability with less complexity. Signed-off-by: Marcus Burghardt --- ssg/profiles.py | 114 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 91 insertions(+), 23 deletions(-) diff --git a/ssg/profiles.py b/ssg/profiles.py index dace934d9b2..f1422077b2a 100644 --- a/ssg/profiles.py +++ b/ssg/profiles.py @@ -5,7 +5,7 @@ import sys import yaml -from .controls import ControlsManager +from .controls import ControlsManager, Policy from .products import ( get_profile_files_from_root, load_product_yaml, @@ -117,6 +117,92 @@ def _process_profile_extension(profile: ProfileSelections, profile_yaml: dict, return profile +def _parse_control_line(control_line: str) -> tuple[str, str]: + """ + Parses a control line string and returns a tuple containing the first and third parts of the + string, separated by a colon. If the string does not contain three parts, the second element + of the tuple defaults to 'all'. + + Args: + control_line (str): The control line string to be parsed. + + Returns: + tuple[str, str]: A tuple containing the first part of the control line and either the + third part or 'all' if the third part is not present. + """ + parts = control_line.split(":") + if len(parts) == 3: + return parts[0], parts[2] + return parts[0], 'all' + + +def _process_selected_variable(profile: ProfileSelections, variable: str) -> None: + """ + Processes a selected variable and updates the profile's variables. + + Args: + profile (ProfileSelections): The profile object containing variables. + variable (str): The variable in the format 'name=value'. + + Raises: + ValueError: If the variable is not in the correct format. + """ + variable_name, variable_value = variable.split('=', 1) + if variable_name not in profile.variables: + profile.variables[variable_name] = variable_value + + +def _process_selected_rule(profile: ProfileSelections, rule: str) -> None: + """ + Adds a rule to the profile's selected rules if it is not already selected or unselected. + + Args: + profile (ProfileSelections): The profile containing selected and unselected rules. + rule (str): The rule to be added to the profile's selected rules. + + Returns: + None + """ + if rule not in profile.unselected_rules and rule not in profile.rules: + profile.rules.append(rule) + + +def _process_control(profile: ProfileSelections, control: object) -> None: + """ + Processes a control by iterating through its rules and applying the appropriate processing + function. Not that at this level rules list in control can include both variables and rules. + The function distinguishes between variable and rules based on the presence of an '=' + character in the rule. + + Args: + profile (ProfileSelections): The profile selections to be processed. + control: The control object containing rules to be processed. + """ + for rule in control.rules: + if "=" in rule: + _process_selected_variable(profile, rule) + else: + _process_selected_rule(profile, rule) + + +def _update_profile_with_policy(profile: ProfileSelections, policy: Policy, level: str) -> None: + """ + Updates the given profile with controls from the specified policy based on the provided level. + + Args: + profile (ProfileSelections): The profile to be updated. + policy (Policy): The policy containing controls to update the profile with. + level (str): The level of controls to be processed. If 'all', all controls are processed. + Otherwise, only controls matching the specified level are processed. + + Returns: + None + """ + for control in policy.controls: + if level == 'all' or level in control.levels: + _process_control(profile, control) + + def _process_controls(profile: ProfileSelections, control_line: str, policies: dict) -> ProfileSelections: """ @@ -131,33 +217,15 @@ def _process_controls(profile: ProfileSelections, control_line: str, Returns: ProfileSelections: The updated profile object. - - Raises: - KeyError: If the policy ID from the control line is not found in the policies dictionary. """ - if control_line.count(":") == 2: - policy_id, _, level = control_line.split(":") - else: - policy_id, _ = control_line.split(":") - level = 'all' + policy_id, level = _parse_control_line(control_line) + policy = policies.get(policy_id) - try: - policy = policies[policy_id] - except KeyError: + if policy is None: print(f"Policy {policy_id} not found") return profile - for control in policy.controls: - if level == 'all' or level in control.levels: - for rule in control.rules: - if "=" in rule: - variable_name, variable_value = rule.split('=', 1) - # When a profile extends a control file, the variables explicitly defined in - # profiles files must be honored, so don't update variables already defined. - if variable_name not in profile.variables: - profile.variables[variable_name] = variable_value - elif rule not in profile.unselected_rules and rule not in profile.rules: - profile.rules.append(rule) + _update_profile_with_policy(profile, policy, level) return profile From 0c74b573342fc3639b3641eacc2478eeec73ddde Mon Sep 17 00:00:00 2001 From: Marcus Burghardt Date: Thu, 9 Jan 2025 17:29:33 +0100 Subject: [PATCH 09/12] Include product title in profiles objects The product title is also useful to give more context about the product id and the cost to include it in the profile objects is minimal, so it was included to simplify collection of profiles information. Signed-off-by: Marcus Burghardt --- ssg/profiles.py | 20 ++++++++++++-------- ssg/variables.py | 2 +- tests/unit/ssg-module/test_profiles.py | 2 +- tests/unit/ssg-module/test_variables.py | 5 ++--- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/ssg/profiles.py b/ssg/profiles.py index f1422077b2a..92b5ab2660b 100644 --- a/ssg/profiles.py +++ b/ssg/profiles.py @@ -28,15 +28,18 @@ class ProfileSelections: ----------- profile_id : str The unique identifier for the profile. - product : str - The product associated with the profile. - variables : dict - A dictionary containing the variables for the profile. - """ - def __init__(self, profile_id, profile_title, product): + profile_title : str + The profile title associated with the profile id. + product_id : str + The product id associated with the profile. + product_title : str + The product title associated with the product id. + """ + def __init__(self, profile_id, profile_title, product_id, product_title): self.profile_id = profile_id self.profile_title = profile_title - self.product = product + self.product_id = product_id + self.product_title = product_title self.rules = [] self.unselected_rules = [] self.variables = {} @@ -325,13 +328,14 @@ def get_profiles_from_products(content_dir: str, products: list, for product in products: product_yaml = _load_product_yaml(content_dir, product) + product_title = product_yaml.get("full_name") profiles_files = get_profile_files_from_root(product_yaml, product_yaml) controls_manager = _load_controls_manager(controls_dir, product_yaml) for file in profiles_files: profile_id = os.path.basename(file).split('.profile')[0] profile_yaml = _load_yaml_profile_file(file) profile_title = profile_yaml.get("title") - profile = ProfileSelections(profile_id, profile_title, product) + profile = ProfileSelections(profile_id, profile_title, product, product_title) profile = _process_profile(profile, profile_yaml, profiles_files, controls_manager.policies) profiles.append(profile) diff --git a/ssg/variables.py b/ssg/variables.py index 5871aaecd6c..49dd5553741 100644 --- a/ssg/variables.py +++ b/ssg/variables.py @@ -120,7 +120,7 @@ def get_variables_from_profiles(profiles: list) -> dict_type: variables = defaultdict(lambda: defaultdict(dict)) for profile in profiles: for variable, value in profile.variables.items(): - variables[variable][profile.product][profile.profile_id] = value + variables[variable][profile.product_id][profile.profile_id] = value return variables diff --git a/tests/unit/ssg-module/test_profiles.py b/tests/unit/ssg-module/test_profiles.py index 26a81296884..c1ac79afd64 100644 --- a/tests/unit/ssg-module/test_profiles.py +++ b/tests/unit/ssg-module/test_profiles.py @@ -35,6 +35,6 @@ def test_get_profiles_from_products(): profiles = get_profiles_from_products(content_dir, products, sorted=True) assert len(profiles) == count_profiles_in_products_dir(products[0]) - assert 'rhel' in profiles[0].product + assert 'rhel' in profiles[0].product_id assert len(profiles[0].rules) > 0 assert len(profiles[0].variables) > 0 diff --git a/tests/unit/ssg-module/test_variables.py b/tests/unit/ssg-module/test_variables.py index 91fb752bca4..2cc9ebc03e2 100644 --- a/tests/unit/ssg-module/test_variables.py +++ b/tests/unit/ssg-module/test_variables.py @@ -81,11 +81,10 @@ def test_get_variable_values(tmp_path): assert result["test"]["product1"]["profile1"] == "value1" assert result["test"]["product2"]["profile2"] == "value2" - def test_get_variables_from_profiles(): class MockProfile: - def __init__(self, product, profile_id, variables): - self.product = product + def __init__(self, product_id, profile_id, variables): + self.product_id = product_id self.profile_id = profile_id self.variables = variables From 40c0d09bd546959fe84c29c05e17a7ae7ba99dba Mon Sep 17 00:00:00 2001 From: Marcus Burghardt Date: Thu, 9 Jan 2025 20:29:20 +0100 Subject: [PATCH 10/12] Fix compatibility issue with types Signed-off-by: Marcus Burghardt --- ssg/profiles.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/ssg/profiles.py b/ssg/profiles.py index 92b5ab2660b..d2a1af208d7 100644 --- a/ssg/profiles.py +++ b/ssg/profiles.py @@ -12,12 +12,15 @@ product_yaml_path, ) + if sys.version_info >= (3, 9): - list_type = list # Python 3.9+ supports built-in generics - dict_type = dict + dict_type = dict # Python 3.9+ supports built-in generics + list_type = list + tuple_type = tuple else: - from typing import List as list_type # Fallback for older versions - from typing import Dict as dict_type + from typing import Dict as dict_type # Fallback for older versions + from typing import List as list_type + from typing import Tuple as tuple_type class ProfileSelections: @@ -120,7 +123,7 @@ def _process_profile_extension(profile: ProfileSelections, profile_yaml: dict, return profile -def _parse_control_line(control_line: str) -> tuple[str, str]: +def _parse_control_line(control_line: str) -> tuple_type[str, str]: """ Parses a control line string and returns a tuple containing the first and third parts of the string, separated by a colon. If the string does not contain three parts, the second element From f3bef115d7abba664d1981fd75dcf7abcd928519 Mon Sep 17 00:00:00 2001 From: Marcus Burghardt Date: Thu, 9 Jan 2025 22:10:52 +0100 Subject: [PATCH 11/12] Create a cache for variables yaml For now there are specific functions to return variables options and values but there are more properties in variables files that may be interesting, such as title and description. To avoid multiple reads of the same file, it was introduced a cache for variables yaml content. Signed-off-by: Marcus Burghardt --- ssg/variables.py | 50 ++++++++++++++++++------- tests/unit/ssg-module/test_variables.py | 1 + 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/ssg/variables.py b/ssg/variables.py index 49dd5553741..7d8d3813965 100644 --- a/ssg/variables.py +++ b/ssg/variables.py @@ -19,8 +19,9 @@ from typing import Dict as dict_type -# Cache variable files to avoid multiple reads +# Cache variable files and respective content to avoid multiple reads _var_files_cache = {} +_vars_content_cache = {} def get_variable_files_in_folder(content_dir: str, subfolder: str) -> list_type[str]: @@ -63,12 +64,42 @@ def get_variable_files(content_dir: str) -> list_type[str]: return variable_files +def _get_variables_content(content_dir: str) -> dict_type: + """ + Retrieve the content of all variable files from the specified content root directory. + + Args: + content_dir (str): The root directory containing benchmark directories. + + Returns: + dict: A dictionary where keys are variable IDs and values are the content of the variable + files. + """ + if content_dir in _vars_content_cache: + return _vars_content_cache[content_dir] + + variables_content = {} + + for var_file in get_variable_files(content_dir): + try: + yaml_content = open_and_macro_expand(var_file) + except Exception as e: + print(f"Error processing file {var_file}: {e}") + continue + + var_id = os.path.basename(var_file).split('.var')[0] + variables_content[var_id] = yaml_content + + _vars_content_cache[content_dir] = variables_content + return variables_content + + def get_variable_options(content_dir: str, variable_id: str = None) -> dict_type: """ Retrieve the options for specific or all variables from the content root directory. If `variable_id` is provided, returns options for that variable only. - If `variable_id` is not provided, returns a dictionary of all variable options. + If `variable_id` is not provided, returns a dictionary of all variables with their options. Args: content_dir (str): The root directory containing benchmark directories. @@ -79,18 +110,11 @@ def get_variable_options(content_dir: str, variable_id: str = None) -> dict_type dict: If `variable_id` is None, a dictionary where keys are variable IDs and values are their options. Otherwise, a dictionary of options for the specified variable. """ - all_variable_files = get_variable_files(content_dir) + variables_content = _get_variables_content(content_dir) all_options = {} - for var_file in all_variable_files: - try: - yaml_content = open_and_macro_expand(var_file) - except Exception as e: - print(f"Error processing file {var_file}: {e}") - continue - - var_id = os.path.basename(var_file).split('.var')[0] - options = yaml_content.get("options", {}) + for var_id, var_yaml in variables_content.items(): + options = var_yaml.get("options", {}) if variable_id: if var_id == variable_id: @@ -121,7 +145,7 @@ def get_variables_from_profiles(profiles: list) -> dict_type: for profile in profiles: for variable, value in profile.variables.items(): variables[variable][profile.product_id][profile.profile_id] = value - return variables + return _convert_defaultdict_to_dict(variables) def _convert_defaultdict_to_dict(dictionary: defaultdict) -> dict_type: diff --git a/tests/unit/ssg-module/test_variables.py b/tests/unit/ssg-module/test_variables.py index 2cc9ebc03e2..cf8da878055 100644 --- a/tests/unit/ssg-module/test_variables.py +++ b/tests/unit/ssg-module/test_variables.py @@ -81,6 +81,7 @@ def test_get_variable_values(tmp_path): assert result["test"]["product1"]["profile1"] == "value1" assert result["test"]["product2"]["profile2"] == "value2" + def test_get_variables_from_profiles(): class MockProfile: def __init__(self, product_id, profile_id, variables): From 8dd74d89964e745864a3759bced6c453604f6875 Mon Sep 17 00:00:00 2001 From: Marcus Burghardt Date: Thu, 9 Jan 2025 22:28:24 +0100 Subject: [PATCH 12/12] Introduce generic function to get variables properties Signed-off-by: Marcus Burghardt --- ssg/variables.py | 17 +++++++++++++++++ tests/unit/ssg-module/test_variables.py | 22 +++++++++++++++++++++- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/ssg/variables.py b/ssg/variables.py index 7d8d3813965..3196ad9acf5 100644 --- a/ssg/variables.py +++ b/ssg/variables.py @@ -94,6 +94,23 @@ def _get_variables_content(content_dir: str) -> dict_type: return variables_content +def get_variable_property(content_dir: str, variable_id: str, property_name: str) -> str: + """ + Retrieve a specific property of a variable from the content root directory. + + Args: + content_dir (str): The root directory containing benchmark directories. + variable_id (str): The ID of the variable to retrieve the property for. + property_name (str): The name of the property to retrieve. + + Returns: + str: The value of the specified property for the variable. + """ + variables_content = _get_variables_content(content_dir) + variable_content = variables_content.get(variable_id, {}) + return variable_content.get(property_name, '') + + def get_variable_options(content_dir: str, variable_id: str = None) -> dict_type: """ Retrieve the options for specific or all variables from the content root directory. diff --git a/tests/unit/ssg-module/test_variables.py b/tests/unit/ssg-module/test_variables.py index cf8da878055..1858b5d36bd 100644 --- a/tests/unit/ssg-module/test_variables.py +++ b/tests/unit/ssg-module/test_variables.py @@ -6,6 +6,7 @@ get_variable_files_in_folder, get_variable_files, get_variable_options, + get_variable_property, get_variables_by_products, get_variables_from_profiles, get_variable_values, @@ -25,7 +26,10 @@ def setup_test_files(base_dir, benchmark_dirs, create_txt_file=False): path = base_dir / benchmark_dir os.makedirs(path, exist_ok=True) var_file = path / "test.var" - var_file.write_text("options:\n default: value\n option1: value1\n option2: value2\n") + var_file.write_text( + "options:\n default: value\n option1: value1\n option2: value2\n" + "title: Test Title\ndescription: Test Description\n" + ) if create_txt_file: txt_file = path / "test.txt" txt_file.write_text("options:\n option: value\n") @@ -114,3 +118,19 @@ def __init__(self, product_id, profile_id, variables): result = get_variables_from_profiles(profiles) assert result == expected_result + +def test_get_variable_property(tmp_path): + content_dir = tmp_path / "content" + benchmark_dirs = ["app", "app/rules"] + setup_test_files(content_dir, benchmark_dirs) + + result = get_variable_property(str(content_dir), "test", "title") + assert result == "Test Title" + + # Test for a non-existent property + result = get_variable_property(str(content_dir), "test", "non_existent_property") + assert result == "" + + # Test for a non-existent variable + result = get_variable_property(str(content_dir), "non_existent_variable", "property_name") + assert result == ""