diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 0000000..f1c546d --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1,3 @@ +# ignore ruff format and --fix +7d074d129ef30f64dee361531fe3d6a020cdff98 +3c5addd30d6e6f6ae34b02333d34360d773ed3bf diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml new file mode 100644 index 0000000..82ad245 --- /dev/null +++ b/.github/workflows/linter.yml @@ -0,0 +1,7 @@ +name: Linter +on: [pull_request] +jobs: + call-workflow: + uses: ISISComputingGroup/reusable-workflows/.github/workflows/linters.yml@main + with: + compare-branch: origin/master diff --git a/check_version.py b/check_version.py index b03118e..7ce98de 100644 --- a/check_version.py +++ b/check_version.py @@ -1,4 +1,7 @@ -import upgrade, sys +import sys + +import upgrade + def compare_version_number(version_to_check): latest_version, _ = upgrade.UPGRADE_STEPS[-1] @@ -8,6 +11,7 @@ def compare_version_number(version_to_check): else: return 1 + if __name__ == "__main__": version_to_check = str(sys.argv[1]) sys.exit(compare_version_number(version_to_check)) diff --git a/run_tests.py b/run_tests.py index 31072bd..b223750 100644 --- a/run_tests.py +++ b/run_tests.py @@ -17,21 +17,29 @@ # Add root path for access to server_commons import os import sys + # Set MYDIRBLOCK so that example_base can be found os.environ["MYDIRBLOCK"] = ".." sys.path.insert(0, os.path.abspath("..")) # Standard imports +import argparse import unittest + import xmlrunner -import argparse -DEFAULT_DIRECTORY = os.path.join('..', '..', '..', 'test-reports') +DEFAULT_DIRECTORY = os.path.join("..", "..", "..", "test-reports") -if __name__ == '__main__': +if __name__ == "__main__": # get output directory from command line arguments parser = argparse.ArgumentParser() - parser.add_argument('-o', '--output_dir', nargs=1, type=str, default=[DEFAULT_DIRECTORY], - help='The directory to save the test reports') + parser.add_argument( + "-o", + "--output_dir", + nargs=1, + type=str, + default=[DEFAULT_DIRECTORY], + help="The directory to save the test reports", + ) args = parser.parse_args() xml_dir = args.output_dir[0] diff --git a/src/common_upgrades/add_to_base_iocs.py b/src/common_upgrades/add_to_base_iocs.py index 98bccb1..063383b 100644 --- a/src/common_upgrades/add_to_base_iocs.py +++ b/src/common_upgrades/add_to_base_iocs.py @@ -1,8 +1,6 @@ from xml.dom import minidom from xml.parsers.expat import ExpatError -from src.local_logger import LocalLogger - IOC_FILENAME = "configurations\components\_base\iocs.xml" FILE_TO_CHECK_STR = "IOC default component file" @@ -12,9 +10,7 @@ class AddToBaseIOCs: - """ - Add the ioc autostart to _base ioc so that it autostarts - """ + """Add the ioc autostart to _base ioc so that it autostarts""" def __init__(self, ioc_to_add, add_after_ioc, xml_to_add): self._ioc_to_add = ioc_to_add @@ -22,14 +18,13 @@ def __init__(self, ioc_to_add, add_after_ioc, xml_to_add): self._xml_to_add = xml_to_add def perform(self, file_access, logger): - """ - Add the autostart of the given. + """Add the autostart of the given. Args: file_access (FileAccess): file access. logger (LocalLogger): logger. - Returns: + Returns: exit code 0 success; anything else fail. """ @@ -48,7 +43,9 @@ def perform(self, file_access, logger): return -2 modified_file_contents = self._add_ioc(ioc_file_contents, logger) - logger.info("Adding {what} ioc to autostart in {0}.".format(IOC_FILENAME, what=self._ioc_to_add)) + logger.info( + "Adding {what} ioc to autostart in {0}.".format(IOC_FILENAME, what=self._ioc_to_add) + ) if not self._check_final_file_contains_one_of_added_ioc(logger, modified_file_contents): return -3 @@ -58,8 +55,7 @@ def perform(self, file_access, logger): @staticmethod def _get_ioc_names(xml): - """ - Gets the names of all the iocs in the xml. + """Gets the names of all the iocs in the xml. Args: xml: XML to check. @@ -70,13 +66,12 @@ def _get_ioc_names(xml): return [ioc.getAttribute("name") for ioc in xml.getElementsByTagName("ioc")] def _check_final_file_contains_one_of_added_ioc(self, logger, xml): - """ - Check the file to make sure it now contains one and only one ioc added entry. - + """Check the file to make sure it now contains one and only one ioc added entry. + Args: logger (Logger): Logger to write to. xml: XML to check. - + Returns: True if ok, else False. """ @@ -90,13 +85,12 @@ def _check_final_file_contains_one_of_added_ioc(self, logger, xml): return True def _check_prerequistes_for_file(self, xml, logger): - """ - Check the file can be modified. - + """Check the file can be modified. + Args: xml: XML to check logger (Logger): logger to write errors to. - + Returns: True if everything is ok, else False. """ @@ -108,17 +102,18 @@ def _check_prerequistes_for_file(self, xml, logger): node_count = ioc_names.count(self._add_after_ioc) if node_count != 1: - logger.error(ADD_AFTER_MISSING.format(FILE_TO_CHECK_STR, node_count, self._add_after_ioc)) + logger.error( + ADD_AFTER_MISSING.format(FILE_TO_CHECK_STR, node_count, self._add_after_ioc) + ) return False return True def _add_ioc(self, ioc_xml, logger): - """ - Add IOC entry after add after ioc specified if it exists. - + """Add IOC entry after add after ioc specified if it exists. + Args: ioc_xml: XML to add to. - + Returns: The XML with the added note. """ @@ -130,6 +125,9 @@ def _add_ioc(self, ioc_xml, logger): ioc_xml.firstChild.insertBefore(ioc_xml.createTextNode("\n "), new_ioc_node) return ioc_xml - logger.error("Could not find {0} ioc in file so no {1} ioc added.".format( - self._add_after_ioc, self._ioc_to_add)) + logger.error( + "Could not find {0} ioc in file so no {1} ioc added.".format( + self._add_after_ioc, self._ioc_to_add + ) + ) return ioc_xml diff --git a/src/common_upgrades/change_macro_in_globals.py b/src/common_upgrades/change_macro_in_globals.py index 008bd39..500eb2c 100644 --- a/src/common_upgrades/change_macro_in_globals.py +++ b/src/common_upgrades/change_macro_in_globals.py @@ -1,15 +1,13 @@ import re + from src.common_upgrades.utils.constants import GLOBALS_FILENAME class ChangeMacroInGlobals(object): - """ - An interface to replace arbitrary macros in a globals.txt file - """ + """An interface to replace arbitrary macros in a globals.txt file""" def __init__(self, file_access, logger): - """ - Initialise. + """Initialise. Args: file_access: Object to allow for file access. @@ -20,22 +18,19 @@ def __init__(self, file_access, logger): self._loaded_file = self.load_globals_file() def load_globals_file(self): - """ - Loads in a globals file as a list of strings. + """Loads in a globals file as a list of strings. Returns: Globals file loaded as list of strings if globals file exists. Empty list otherwise. """ - if self._file_access.exists(GLOBALS_FILENAME): return self._file_access.open_file(GLOBALS_FILENAME) else: return [] def change_macros(self, ioc_name, macros_to_change): - """ - Changes a list of macros in the globals.txt file for a specific IOC. + """Changes a list of macros in the globals.txt file for a specific IOC. Args: ioc_name: Name of the IOC. @@ -45,7 +40,6 @@ def change_macros(self, ioc_name, macros_to_change): Returns: None """ - for old_macro, new_macro in macros_to_change: for index in self._globals_filter_generator(ioc_name): self._apply_regex_macro_change(ioc_name, old_macro, new_macro, index) @@ -53,8 +47,7 @@ def change_macros(self, ioc_name, macros_to_change): self.write_modified_globals_file() def change_ioc_name(self, old_ioc_name, new_ioc_name): - """ - Changes the name of an IOC in a globals.txt file. + """Changes the name of an IOC in a globals.txt file. Args: old_ioc_name: String, the old name of the IOC @@ -64,15 +57,13 @@ def change_ioc_name(self, old_ioc_name, new_ioc_name): None """ - for index in self._globals_filter_generator(old_ioc_name): self._change_ioc_name(old_ioc_name, new_ioc_name, index) self.write_modified_globals_file() def _globals_filter_generator(self, ioc_to_change): - """ - Returns lines containing specified IOCs from globals.txt + """Returns lines containing specified IOCs from globals.txt Generator that gives all the lines for a given IOC in globals.txt. This will match IOCs with the same name as the root plus any that have a number @@ -84,15 +75,13 @@ def _globals_filter_generator(self, ioc_to_change): Yields: Index that the ioc is on. """ - for index, line in enumerate(self._loaded_file): if line.startswith("{}_".format(ioc_to_change)): self._logger.info("Found line '{}' in {}".format(line, GLOBALS_FILENAME)) yield index def _determine_replacement_values(self, old_macro, new_macro): - """ - Determines the strings to search for and replace. + """Determines the strings to search for and replace. Args: old_macro: Old Macro object with old macro name and old macro value. @@ -101,7 +90,6 @@ def _determine_replacement_values(self, old_macro, new_macro): Returns: regex_changes: Dictionary of regex representations of the strings to search for/replace. """ - if old_macro.value is None: old_value_search = r".*" @@ -113,16 +101,17 @@ def _determine_replacement_values(self, old_macro, new_macro): old_value_search = old_macro.value new_value_replacement = new_macro.value - regex_changes = {'old_macro_search': old_macro.name, - 'new_macro_replacement': new_macro.name, - 'old_value_search': old_value_search, - 'new_value_replacement': new_value_replacement} + regex_changes = { + "old_macro_search": old_macro.name, + "new_macro_replacement": new_macro.name, + "old_value_search": old_value_search, + "new_value_replacement": new_value_replacement, + } return regex_changes def _apply_regex_macro_change(self, ioc_name, old_macro, new_macro, line_number): - """ - Applies a regular expression to modify a macro. + """Applies a regular expression to modify a macro. Args: ioc_name: Name of the IOC to @@ -130,19 +119,24 @@ def _apply_regex_macro_change(self, ioc_name, old_macro, new_macro, line_number) Returns: None """ - regex_args = self._determine_replacement_values(old_macro, new_macro) - replace_regex = re.compile(r"({}_\d\d__)({})=({})".format(ioc_name, regex_args["old_macro_search"], - regex_args["old_value_search"])) + replace_regex = re.compile( + r"({}_\d\d__)({})=({})".format( + ioc_name, regex_args["old_macro_search"], regex_args["old_value_search"] + ) + ) - self._loaded_file[line_number] = re.sub(replace_regex, r"\1{}={}".format(regex_args["new_macro_replacement"], - regex_args["new_value_replacement"]), - self._loaded_file[line_number]) + self._loaded_file[line_number] = re.sub( + replace_regex, + r"\1{}={}".format( + regex_args["new_macro_replacement"], regex_args["new_value_replacement"] + ), + self._loaded_file[line_number], + ) def _change_ioc_name(self, ioc_name, new_ioc_name, line_number): - """ - If a new name is supplied, changes the name of the IOC + """If a new name is supplied, changes the name of the IOC Args: ioc_name: String, the current name of the IOC @@ -151,17 +145,16 @@ def _change_ioc_name(self, ioc_name, new_ioc_name, line_number): Returns: None """ - if new_ioc_name is not None: - self._loaded_file[line_number] = self._loaded_file[line_number].replace(ioc_name, new_ioc_name.upper()) + self._loaded_file[line_number] = self._loaded_file[line_number].replace( + ioc_name, new_ioc_name.upper() + ) def write_modified_globals_file(self): - """ - Writes the modified globals file if it has been loaded. + """Writes the modified globals file if it has been loaded. Returns: None """ - if self._loaded_file: self._file_access.write_file(GLOBALS_FILENAME, self._loaded_file) diff --git a/src/common_upgrades/change_macros_in_xml.py b/src/common_upgrades/change_macros_in_xml.py index 07335ab..a9fae1b 100644 --- a/src/common_upgrades/change_macros_in_xml.py +++ b/src/common_upgrades/change_macros_in_xml.py @@ -5,8 +5,7 @@ def change_macro_name(macro, old_macro_name, new_macro_name): - """ - Changes the macro name of a macro xml node. + """Changes the macro name of a macro xml node. Args: macro : The macro node to change. @@ -19,8 +18,7 @@ def change_macro_name(macro, old_macro_name, new_macro_name): def change_macro_value(macro, old_macro_value, new_macro_value): - """ - Changes the macros in the given xml if a new macro value is given. + """Changes the macros in the given xml if a new macro value is given. Args: macro : The macro xml node to change. @@ -36,8 +34,7 @@ def change_macro_value(macro, old_macro_value, new_macro_value): def find_macro_with_name(macros, name_to_find): - """ - Find whether macro with name attribute equal to argument name_to_find exists + """Find whether macro with name attribute equal to argument name_to_find exists Args: macros: XML element containing list of macros @@ -50,14 +47,12 @@ def find_macro_with_name(macros, name_to_find): return True return False + class ChangeMacrosInXML(object): - """ - Changes macros in XML files. - """ + """Changes macros in XML files.""" def __init__(self, file_access, logger): - """ - Initialise. + """Initialise. Args: file_access: Object to allow for file access. @@ -66,9 +61,10 @@ def __init__(self, file_access, logger): self._file_access = file_access self._logger = logger - def add_macro(self, ioc_name, macro_to_add, pattern, description="No description", default_value=None): - """ - Add a macro with a specified name and value to all IOCs whose name begins with ioc_name, unless a macro + def add_macro( + self, ioc_name, macro_to_add, pattern, description="No description", default_value=None + ): + """Add a macro with a specified name and value to all IOCs whose name begins with ioc_name, unless a macro with that name already exists Args: @@ -93,12 +89,12 @@ def add_macro(self, ioc_name, macro_to_add, pattern, description="No description self._file_access.write_xml_file(path, ioc_xml) def change_macros(self, ioc_name, macros_to_change): - """ - Changes macros in all xml files that contain the correct macros for a specified ioc. + """Changes macros in all xml files that contain the correct macros for a specified ioc. Args: ioc_name: Name of the IOC to change macros within. macros_to_change: List of 2-tuples of old_macro and new_macro Macro classes. + Returns: None. """ @@ -108,7 +104,7 @@ def change_macros(self, ioc_name, macros_to_change): for macro in macros.getElementsByTagName("macro"): name = macro.getAttribute("name") for old_macro, new_macro in macros_to_change: - #Check if current macro name starts with name of macro to be changed + # Check if current macro name starts with name of macro to be changed if re.match(old_macro.name, name) is not None: change_macro_name(macro, old_macro.name, new_macro.name) change_macro_value(macro, old_macro.value, new_macro.value) @@ -116,8 +112,7 @@ def change_macros(self, ioc_name, macros_to_change): self._file_access.write_xml_file(path, ioc_xml) def change_ioc_name(self, old_ioc_name, new_ioc_name): - """ - Replaces all instances of old_ioc_name with new_ioc_name in an XML tree + """Replaces all instances of old_ioc_name with new_ioc_name in an XML tree Args: old_ioc_name: String, the old ioc prefix (without _XX number suffix) new_ioc_name: String, The desired new IOC prefix (without _XX number suffix) @@ -129,14 +124,15 @@ def change_ioc_name(self, old_ioc_name, new_ioc_name): for ioc in ioc_xml.getElementsByTagName("ioc"): ioc_name_with_suffix = ioc.getAttribute("name") if old_ioc_name in ioc_name_with_suffix: - ioc_replacement = ioc_name_with_suffix.replace(old_ioc_name, new_ioc_name).upper() + ioc_replacement = ioc_name_with_suffix.replace( + old_ioc_name, new_ioc_name + ).upper() ioc.setAttribute("name", ioc_replacement) self._file_access.write_xml_file(path, ioc_xml) def change_ioc_name_in_synoptics(self, old_ioc_name, new_ioc_name): - """ - Replaces instances of old_ioc_name with new_ioc_name + """Replaces instances of old_ioc_name with new_ioc_name Args: old_ioc_name: String, the old ioc prefix (without _XX number suffix) @@ -163,14 +159,15 @@ def change_ioc_name_in_synoptics(self, old_ioc_name, new_ioc_name): ioc_name_with_suffix = element.firstChild.nodeValue if old_ioc_name in ioc_name_with_suffix: - ioc_replacement = ioc_name_with_suffix.replace(old_ioc_name, new_ioc_name).upper() + ioc_replacement = ioc_name_with_suffix.replace( + old_ioc_name, new_ioc_name + ).upper() element.firstChild.replaceWholeText(ioc_replacement) self._file_access.write_xml_file(xml_path, synoptic_xml) def ioc_tag_generator(self, path, ioc_xml, ioc_to_change): - """ - Generator giving all the IOC tags in all configurations. + """Generator giving all the IOC tags in all configurations. Args: path: Path to the xml file diff --git a/src/common_upgrades/change_pvs_in_xml.py b/src/common_upgrades/change_pvs_in_xml.py index e16e83c..0c7f168 100644 --- a/src/common_upgrades/change_pvs_in_xml.py +++ b/src/common_upgrades/change_pvs_in_xml.py @@ -2,13 +2,10 @@ class ChangePVsInXML(object): - """ - Changes pvs in XML files. - """ + """Changes pvs in XML files.""" def __init__(self, file_access, logger): - """ - Initialise. + """Initialise. Args: file_access: Object to allow for file access. @@ -18,8 +15,8 @@ def __init__(self, file_access, logger): self._logger = logger def node_text_filter(self, filter_text, element_name, path, xml): - """ - A generator that gives all the instances of filter_text within the element_name elements of the input_files. + """A generator that gives all the instances of filter_text within the element_name elements of the input_files. + Args: filter_text: String, ext to find element_name: String, tag name of the elements where to look for filter_text @@ -38,8 +35,7 @@ def node_text_filter(self, filter_text, element_name, path, xml): yield node def _replace_text_in_elements(self, old_text, new_text, element_name, input_files): - """ - Replaces all instances of old_text with new_text in all element_name elements of one or more XML files + """Replaces all instances of old_text with new_text in all element_name elements of one or more XML files Args: old_text: String, old text to find new_text: String, new text to substitute @@ -54,8 +50,7 @@ def _replace_text_in_elements(self, old_text, new_text, element_name, input_file self._file_access.write_xml_file(path, xml) def change_pv_name(self, old_pv_name, new_pv_name): - """ - Replaces all instances of old_pv_name with new_pv_name in the blocks config and all synoptics + """Replaces all instances of old_pv_name with new_pv_name in the blocks config and all synoptics Args: old_pv_name: String, the old pv name new_pv_name: String, The desired new pv name @@ -65,27 +60,30 @@ def change_pv_name(self, old_pv_name, new_pv_name): self.change_pv_names_in_synoptics(old_pv_name, new_pv_name) def change_pv_name_in_blocks(self, old_pv_name, new_pv_name): - """ - Move any blocks pointing at old_pv_name to point at new_pv_name. + """Move any blocks pointing at old_pv_name to point at new_pv_name. + Args: old_pv_name: The old PV to remove references to new_pv_name: The new PV to replace it with """ - self._replace_text_in_elements(old_pv_name, new_pv_name, "read_pv", - self._file_access.get_config_files(BLOCK_FILE)) + self._replace_text_in_elements( + old_pv_name, new_pv_name, "read_pv", self._file_access.get_config_files(BLOCK_FILE) + ) def change_pv_names_in_synoptics(self, old_pv_name, new_pv_name): - """ - Move any synoptic PV targets from pointing at old_pv_name to point to new_pv_name. + """Move any synoptic PV targets from pointing at old_pv_name to point to new_pv_name. + Args: old_pv_name: The old PV to remove references to new_pv_name: The new PV to replace it with """ - self._replace_text_in_elements(old_pv_name, new_pv_name, "address", self._file_access.get_synoptic_files()) + self._replace_text_in_elements( + old_pv_name, new_pv_name, "address", self._file_access.get_synoptic_files() + ) def get_number_of_instances_of_pv(self, pv_names): - """ - Get the number of instances of a PV in the config and synoptic. + """Get the number of instances of a PV in the config and synoptic. + Args: pv_names: String, a list of pvs to search for Return: diff --git a/src/common_upgrades/sql_utilities.py b/src/common_upgrades/sql_utilities.py index b0d0e5d..b252986 100644 --- a/src/common_upgrades/sql_utilities.py +++ b/src/common_upgrades/sql_utilities.py @@ -1,16 +1,14 @@ -""" -Helpful sql utilities -""" -from getpass import getpass +"""Helpful sql utilities""" + import os import re +from getpass import getpass + import mysql.connector class SqlConnection: - """ - Class to allow sql access. Should be used in the top scope and sessions are got using get_session. - """ + """Class to allow sql access. Should be used in the top scope and sessions are got using get_session.""" _connection = None @@ -19,8 +17,8 @@ def __init__(self): @staticmethod def get_session(logger): - """ - Get the database session; creates one if needed. + """Get the database session; creates one if needed. + Args: logger: the logger to use @@ -29,8 +27,10 @@ def get_session(logger): """ while SqlConnection._connection is None: try: - root_pass = os.getenv("MYSQL_PASSWORD") or getpass("Please enter db root password: ") - SqlConnection._connection = mysql.connector.connect(user='root', password=root_pass) + root_pass = os.getenv("MYSQL_PASSWORD") or getpass( + "Please enter db root password: " + ) + SqlConnection._connection = mysql.connector.connect(user="root", password=root_pass) except Exception as e: logger.error("Failed to connect to database: {}".format(e)) return SqlConnection._connection @@ -46,8 +46,8 @@ def __exit__(self, exc_type, exc_val, exc_tb): def run_sql(logger, sql): - """ - Sends an SQL statement to the database. + """Sends an SQL statement to the database. + Args: logger: the logger to use to log messages sql: The statement to send @@ -59,9 +59,10 @@ def run_sql(logger, sql): SqlConnection.get_session(logger).commit() cursor.close() + def run_sql_list(logger, sql_list): - """ - Sends a list of SQL statement to the database. + """Sends a list of SQL statement to the database. + Args: logger: the logger to use to log messages sql_list: The statement to send @@ -75,9 +76,10 @@ def run_sql_list(logger, sql_list): SqlConnection.get_session(logger).commit() cursor.close() + def run_sql_file(logger, file): - """ - Sends an SQL statement to the database. + """Sends an SQL statement to the database. + Args: logger: the logger to use to log messages file: The file of sql statement to send @@ -89,32 +91,33 @@ def run_sql_file(logger, file): lines = [] with open(file) as f: lines = f.readlines() - statement = '' + statement = "" for line in lines: - if re.match(r'--', line) or re.match(r'#', line): + if re.match(r"--", line) or re.match(r"#", line): continue statement = statement + line - if re.search(r';[ ]*$', line): + if re.search(r";[ ]*$", line): statement_list.append(statement) - statement = '' + statement = "" logger.info(f"Applying DB schema from {file}") run_sql_list(logger, statement_list) return 0 + def add_new_user(logger, user, password): - """ - Adds a user with all permissions to the exp_user database + """Adds a user with all permissions to the exp_user database Args: logger: the logger to use to log messages user: The name of the user in form 'username'@'host' password: The password that the user will be required to use """ try: - run_sql(logger, "CREATE USER {} IDENTIFIED WITH mysql_native_password BY '{}';".format( - user, password)) - run_sql(logger, "GRANT INSERT, SELECT, UPDATE, DELETE ON exp_data.* TO {};".format( - user)) + run_sql( + logger, + "CREATE USER {} IDENTIFIED WITH mysql_native_password BY '{}';".format(user, password), + ) + run_sql(logger, "GRANT INSERT, SELECT, UPDATE, DELETE ON exp_data.* TO {};".format(user)) return 0 except Exception as e: logger.error("Failed to add user: {}".format(e)) diff --git a/src/common_upgrades/synoptics_and_device_screens.py b/src/common_upgrades/synoptics_and_device_screens.py index 20eb7b4..63328ec 100644 --- a/src/common_upgrades/synoptics_and_device_screens.py +++ b/src/common_upgrades/synoptics_and_device_screens.py @@ -2,24 +2,25 @@ class SynopticsAndDeviceScreens(object): - """ - Manipulate an instrument's synoptics and device_screens - """ + """Manipulate an instrument's synoptics and device_screens""" def __init__(self, file_access, logger): self.file_access = file_access self.logger = logger - self._update_keys_in_device_screens = partial(self._update_opi_keys_in_xml, root_tag="device", key_tag="key") - self._update_keys_in_synoptics = partial(self._update_opi_keys_in_xml, root_tag="target", key_tag="name") + self._update_keys_in_device_screens = partial( + self._update_opi_keys_in_xml, root_tag="device", key_tag="key" + ) + self._update_keys_in_synoptics = partial( + self._update_opi_keys_in_xml, root_tag="target", key_tag="name" + ) def update_opi_keys(self, keys_to_update): - """ - Update the OPI keys in all synoptics and device screens + """Update the OPI keys in all synoptics and device screens Args: keys_to_update (Dict)): The OPI keys that need updating as a dictionary with {old_key: new_key} - Returns: + Returns: exit code 0 success; anything else fail """ @@ -39,15 +40,16 @@ def update_opi_keys(self, keys_to_update): break try: if device_screens: - self._update_keys_in_device_screens(device_screens[0], device_screens[1], keys_to_update) + self._update_keys_in_device_screens( + device_screens[0], device_screens[1], keys_to_update + ) except Exception as e: self.logger.error("Cannot upgrade device screens {}: {}".format(path, e)) result = -2 return result def _update_opi_keys_in_xml(self, path, xml, keys_to_update, root_tag, key_tag): - """ - Replaces an opi key with a different key + """Replaces an opi key with a different key Args: path (String): path to file to update @@ -64,7 +66,10 @@ def _update_opi_keys_in_xml(self, path, xml, keys_to_update, root_tag, key_tag): key_element.firstChild.nodeValue = new_key if new_key != old_key: file_changed = True - self.logger.info("OPI key '{}' replaced with corresponding key '{}' in {}" - .format(old_key, new_key, path)) + self.logger.info( + "OPI key '{}' replaced with corresponding key '{}' in {}".format( + old_key, new_key, path + ) + ) if file_changed: self.file_access.write_xml_file(path, xml) diff --git a/src/common_upgrades/utils/macro.py b/src/common_upgrades/utils/macro.py index 212925a..c765f50 100644 --- a/src/common_upgrades/utils/macro.py +++ b/src/common_upgrades/utils/macro.py @@ -1,6 +1,5 @@ class Macro(object): - """ - Macro Object + """Macro Object Attributes: name: Macro name. E.g. GALILADDR. @@ -12,7 +11,7 @@ def __init__(self, name, value=None): self.__value = value def __repr__(self): - return ''.format(self.__name, self.__value) + return "".format(self.__name, self.__value) @property def name(self): diff --git a/src/file_access.py b/src/file_access.py index f288acc..440b81b 100644 --- a/src/file_access.py +++ b/src/file_access.py @@ -1,19 +1,22 @@ import os -from xml.dom import minidom import shutil +from xml.dom import minidom from xml.parsers.expat import ExpatError -from src.common_upgrades.utils.constants import CONFIG_FOLDER, COMPONENT_FOLDER, SYNOPTIC_FOLDER, \ - DEVICE_SCREEN_FILE, DEVICE_SCREENS_FOLDER + +from src.common_upgrades.utils.constants import ( + COMPONENT_FOLDER, + CONFIG_FOLDER, + DEVICE_SCREEN_FILE, + DEVICE_SCREENS_FOLDER, + SYNOPTIC_FOLDER, +) class FileAccess(object): - """ - File access for the configuration - """ + """File access for the configuration""" def __init__(self, logger, config_root): - """ - Constructor + """Constructor Args: logger: the logger to use @@ -22,23 +25,19 @@ def __init__(self, logger, config_root): """ self.config_base = config_root self._logger = logger - + def rename_file(self, filename, new_name): - """ - - Rename a file - + """Rename a file + Args: filename: current filename new_name: new filename to rename to - + """ os.rename(filename, new_name) def open_file(self, filename): - """ - - Open a file and return the object + """Open a file and return the object Args: filename: filename to open @@ -53,8 +52,7 @@ def open_file(self, filename): return lines def write_version_number(self, version, filename): - """ - Write the version number to the file + """Write the version number to the file Args: version: version to write filename: filename to write to (relative to config root) @@ -66,9 +64,8 @@ def write_version_number(self, version, filename): self._logger.info("Writing new version number {0}".format(version)) f.write("{}\n".format(version)) - def write_file(self, filename, file_contents, mode = "w", file_full=False): - """ - Write file contents (will overwrite existing files) + def write_file(self, filename, file_contents, mode="w", file_full=False): + """Write file contents (will overwrite existing files) Args: filename: filename to write to @@ -88,8 +85,7 @@ def write_file(self, filename, file_contents, mode = "w", file_full=False): f.write(file_contents) def create_directories(self, path): - """ - Create directories starting at config base path + """Create directories starting at config base path Args: path: path for directories to be created @@ -100,9 +96,7 @@ def create_directories(self, path): os.makedirs(os.path.dirname(os.path.join(self.config_base, path)), exist_ok=True) def line_exists(self, filename, string): - """ - Check if string exists as a line in file - """ + """Check if string exists as a line in file""" with open(os.path.join(self.config_base, filename), "r") as f: for line in f: if line == string: @@ -110,9 +104,7 @@ def line_exists(self, filename, string): return False def file_contains(self, filename, string): - """ - Check if a string exists in a file - """ + """Check if a string exists in a file""" with open(os.path.join(self.config_base, filename), "r") as f: for line in f: if string in line: @@ -120,8 +112,7 @@ def file_contains(self, filename, string): return False def open_xml_file(self, filename): - """ - Open a file and returns the xml it contains + """Open a file and returns the xml it contains Args: filename: filename to open @@ -132,9 +123,7 @@ def open_xml_file(self, filename): return minidom.parse(os.path.join(self.config_base, filename)) def write_xml_file(self, filename, xml): - """ - - Saves xml to a file + """Saves xml to a file Args: filename: filename to save @@ -142,29 +131,26 @@ def write_xml_file(self, filename, xml): Returns: """ - # this can not use pretty print because that will cause it to gain tabs and newlines with open(os.path.join(self.config_base, filename), mode="w") as f: self._logger.info("Writing xml file {0}".format(filename)) f.write('\n') xml.firstChild.writexml(f) - f.write('\n') + f.write("\n") def listdir(self, dir): - """ - Returns a list of files in a directory - + """Returns a list of files in a directory + Args: dir (String): The directory to list - + Return: List of file paths (strings) """ return [os.path.join(dir, f) for f in os.listdir(os.path.join(self.config_base, dir))] def remove_file(self, filename): - """ - Removes a file from the file system. + """Removes a file from the file system. Args: filename (str): The file to remove, relative to the config directory @@ -173,8 +159,7 @@ def remove_file(self, filename): os.remove(os.path.join(self.config_base, filename)) def delete_folder(self, path): - """ - Deletes a folder recursively. + """Deletes a folder recursively. Args: path (String): The folder to remove @@ -182,8 +167,7 @@ def delete_folder(self, path): shutil.rmtree(path) def is_dir(self, path): - """ - Checks whether a path is a directory or file. + """Checks whether a path is a directory or file. Args: path (str): The path relative to the configuration directory. @@ -205,8 +189,7 @@ def _get_xml(self, path): raise ExpatError("{} is invalid xml '{}'".format(path, ex)) def get_config_files(self, file_type): - """ - Generator giving all the config files of a given type. + """Generator giving all the config files of a given type. Args: file_type: The type of file that you want to get e.g. iocs.xml @@ -220,19 +203,18 @@ def get_config_files(self, file_type): yield xml_path, self._get_xml(xml_path) def get_synoptic_files(self): - """ - Generator giving all the synoptic config files + """Generator giving all the synoptic config files Yields: Tuple: The path to the synoptic file and its xml representation. """ - for synoptic_path in [filename for filename in self.listdir(SYNOPTIC_FOLDER) if filename.endswith('.xml')]: + for synoptic_path in [ + filename for filename in self.listdir(SYNOPTIC_FOLDER) if filename.endswith(".xml") + ]: yield synoptic_path, self._get_xml(synoptic_path) def get_device_screens(self): - """ - Returns the device screen file if it exists, else None. - """ + """Returns the device screen file if it exists, else None.""" device_screens_path = os.path.join(DEVICE_SCREENS_FOLDER, DEVICE_SCREEN_FILE) if os.path.exists(device_screens_path): return device_screens_path, self._get_xml(device_screens_path) @@ -240,8 +222,7 @@ def get_device_screens(self): return None def get_file_paths(self, directory: str, extension: str = None): - """ - Generator giving the paths of all files inside a directory, recursively searching all subdirectories. + """Generator giving the paths of all files inside a directory, recursively searching all subdirectories. Args: directory: The directory to search. @@ -257,10 +238,10 @@ def get_file_paths(self, directory: str, extension: str = None): class CachingFileAccess(object): - """ - Context that uses the given file access object but does not actually write to file until the context is left + """Context that uses the given file access object but does not actually write to file until the context is left without an error. """ + def __init__(self, file_access): self.cached_writes = dict() self._file_access = file_access @@ -278,8 +259,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.write() def open_xml_file(self, filename): - """ - Open a file and returns the xml it contains (returns the cached file if it exists) + """Open a file and returns the xml it contains (returns the cached file if it exists) Args: filename: filename to open @@ -293,8 +273,7 @@ def open_xml_file(self, filename): return self.old_open_method(filename) def write_xml_file(self, filename, xml): - """ - Caches a write of xml to a file + """Caches a write of xml to a file Args: filename: filename to save @@ -303,8 +282,6 @@ def write_xml_file(self, filename, xml): self.cached_writes[filename] = xml def write(self): - """ - Write all cached writes to the file. - """ + """Write all cached writes to the file.""" for filename, xml in self.cached_writes.items(): self._file_access.write_xml_file(filename, xml) diff --git a/src/git_utils.py b/src/git_utils.py index c8b5712..8f5d90a 100644 --- a/src/git_utils.py +++ b/src/git_utils.py @@ -7,8 +7,6 @@ def get_repo(working_directory): # Check repo try: return git.Repo(working_directory, search_parent_directories=True) - except Exception as e: + except Exception: # Not a valid repository raise git.NotUnderVersionControl(working_directory) - - diff --git a/src/local_logger.py b/src/local_logger.py index 5632067..3e3bc8d 100644 --- a/src/local_logger.py +++ b/src/local_logger.py @@ -1,17 +1,13 @@ +import datetime import os import sys -import datetime - class LocalLogger(object): - """ - A local logging object which will write to the screen and a file - """ + """A local logging object which will write to the screen and a file""" def __init__(self, log_dir): - """ - The logging directory in to which to write the log file + """The logging directory in to which to write the log file Args: log_dir: the directory for the file @@ -19,13 +15,14 @@ def __init__(self, log_dir): if not os.path.exists(log_dir): os.mkdir(log_dir) - log_file = os.path.join(log_dir, "upgrade_{0}.txt".format(datetime.datetime.now().strftime("%Y_%m_%d__%H_%M"))) + log_file = os.path.join( + log_dir, "upgrade_{0}.txt".format(datetime.datetime.now().strftime("%Y_%m_%d__%H_%M")) + ) self._log_file = log_file def error(self, message): - """ - Write the message as an error (to standard err with ERROR in front of it) + """Write the message as an error (to standard err with ERROR in front of it) Args: message: message to write (no new lines needed) @@ -35,12 +32,11 @@ def error(self, message): """ formatted_message = "ERROR: {0}{1}".format(message, os.linesep) with open(self._log_file, mode="a") as f: - f.write(formatted_message) + f.write(formatted_message) sys.stderr.write(formatted_message) def info(self, message): - """ - Write the message as info (to standard out with INFO in front of it) + """Write the message as info (to standard out with INFO in front of it) Args: message: message to write (no new lines needed) @@ -48,8 +44,7 @@ def info(self, message): Returns: """ - formatted_message = " INFO: {0}{1}".format(message, os.linesep) with open(self._log_file, mode="a") as f: - f.write(formatted_message) + f.write(formatted_message) sys.stdout.write(formatted_message) diff --git a/src/upgrade.py b/src/upgrade.py index d494f1f..750286c 100644 --- a/src/upgrade.py +++ b/src/upgrade.py @@ -1,27 +1,21 @@ import os from src.common_upgrades.sql_utilities import SqlConnection -from src.file_access import FileAccess -from src.local_logger import LocalLogger VERSION_FILENAME = os.path.join("configurations", "config_version.txt") class UpgradeError(Exception): - """ - There is an error in the upgrade - """ + """There is an error in the upgrade""" + pass class Upgrade(object): - """ - Use upgrade steps to upgrade a configuration - """ + """Use upgrade steps to upgrade a configuration""" def __init__(self, file_access, logger, upgrade_steps, git_repo): - """ - Constructor + """Constructor Args: file_access (FileAccess): an object to interact with files @@ -39,8 +33,7 @@ def __init__(self, file_access, logger, upgrade_steps, git_repo): self._git_repo = git_repo def get_version_number(self): - """ - Find the current version number of the repository. If there is no version number the + """Find the current version number of the repository. If there is no version number the repository is considered unversioned and the lowest version is written to the repository Returns: the version number @@ -54,8 +47,7 @@ def get_version_number(self): return initial_version_number def upgrade(self): - """ - Perform an upgrade on the configuration directory + """Perform an upgrade on the configuration directory Returns: status code 0 for success; not 0 for failure @@ -66,7 +58,6 @@ def upgrade(self): final_upgrade_version = None with SqlConnection(): for version, upgrade_step in self._upgrade_steps: - if version == current_version: upgrade = True if upgrade_step is None: @@ -75,7 +66,6 @@ def upgrade(self): if upgrade: final_upgrade_version = version if upgrade_step is not None: - self._logger.info("Upgrading from {0}".format(version)) self._logger.info("-------------------------") result = upgrade_step.perform(self._file_access, self._logger) @@ -93,10 +83,10 @@ def upgrade(self): self._logger.error("Unknown version number {0}".format(current_version)) return -1 - def _commit_tag_and_push(self, version, final=False): + def _commit_tag_and_push(self, version, final=False): self._git_repo.git.add(A=True) commit_message = f"IBEX Upgrade {'from' if not final else 'to'} {version}" self._git_repo.index.commit(commit_message) tag_name = f"{self._git_repo.active_branch}_{version}{'_upgrade' if not final else ''}" self._git_repo.create_tag(tag_name, message=commit_message, force=True) - self._git_repo.remote(name='origin').push() + self._git_repo.remote(name="origin").push() diff --git a/src/upgrade_step.py b/src/upgrade_step.py index 33d39e9..4eeb066 100644 --- a/src/upgrade_step.py +++ b/src/upgrade_step.py @@ -1,18 +1,14 @@ from abc import ABCMeta, abstractmethod -from src.file_access import FileAccess -from src.local_logger import LocalLogger class UpgradeStep(object): - """ - An upgrade step base object to be inherited from - """ + """An upgrade step base object to be inherited from""" + __metaclass__ = ABCMeta @abstractmethod def perform(self, file_access, logger): - """ - Perform the upgrade step this should be implemented + """Perform the upgrade step this should be implemented Args: file_access (FileAccess): file access diff --git a/src/upgrade_step_add_meta_tag.py b/src/upgrade_step_add_meta_tag.py index 7768ee8..3c7ff86 100644 --- a/src/upgrade_step_add_meta_tag.py +++ b/src/upgrade_step_add_meta_tag.py @@ -1,15 +1,12 @@ -from src.upgrade_step import UpgradeStep -from src.common_upgrades.utils.constants import CONFIG_FOLDER -from src.common_upgrades.utils.constants import COMPONENT_FOLDER - import os import xml.etree.ElementTree as ET +from src.common_upgrades.utils.constants import COMPONENT_FOLDER, CONFIG_FOLDER +from src.upgrade_step import UpgradeStep + class UpgradeStepAddMetaXmlElement(UpgradeStep): - """ - An upgrade step that adds a passed element to the meta.xml for a configuration. - """ + """An upgrade step that adds a passed element to the meta.xml for a configuration.""" def __init__(self, tag, tag_value): self.tag = tag @@ -17,8 +14,7 @@ def __init__(self, tag, tag_value): super(UpgradeStepAddMetaXmlElement, self).__init__() def perform(self, file_access, logger): - """ - Change meta.xml configuration schema to have self.tag element + """Change meta.xml configuration schema to have self.tag element Args: file_access (FileAccess): file access diff --git a/src/upgrade_step_check_init_inst.py b/src/upgrade_step_check_init_inst.py index ba73359..49ae381 100644 --- a/src/upgrade_step_check_init_inst.py +++ b/src/upgrade_step_check_init_inst.py @@ -1,25 +1,22 @@ -from src.file_access import FileAccess -from src.local_logger import LocalLogger -from src.upgrade_step import UpgradeStep -from src.common_upgrades.utils.constants import SCRIPTS_ROOT import os +from src.common_upgrades.utils.constants import SCRIPTS_ROOT +from src.upgrade_step import UpgradeStep + class UpgradeStepCheckInitInst(UpgradeStep): - """ - An upgrade step to check if the instrument uses the old style of loading in pre and post cmd. - This old style is via API.__localmod in init_.py in the Instrument/Settings/config/NDX/Python folder. + """An upgrade step to check if the instrument uses the old style of loading in pre and post cmd. + This old style is via API.__localmod in init_.py in the Instrument/Settings/config/NDX/Python folder. """ def search_files(self, files, root, file_access): - """ - Search files from a root folder for pre and post cmd methods. + """Search files from a root folder for pre and post cmd methods. Args: files (List[str]): The names of the files in the root directory. root (str): The root directory of the files. file_access (FileAccess): file access - + Returns: 0 if pre and post cmd methods in old style are not present; error message if they are. """ for file_name in files: @@ -27,18 +24,21 @@ def search_files(self, files, root, file_access): search_file = open(os.path.join(root, file_name)) search_file_contents = search_file.read() if "precmd" in search_file_contents or "postcmd" in search_file_contents: - return "Pre or post cmd methods found in {} these will now no longer be hooked into the command. Please ensure they are hooked using the new style of inserting these methods, " \ - "see https://github.com/ISISComputingGroup/ibex_user_manual/wiki/Pre-and-Post-Command-Hooks".format(search_file.name) + return ( + "Pre or post cmd methods found in {} these will now no longer be hooked into the command. Please ensure they are hooked using the new style of inserting these methods, " + "see https://github.com/ISISComputingGroup/ibex_user_manual/wiki/Pre-and-Post-Command-Hooks".format( + search_file.name + ) + ) return 0 def search_folder(self, folder, file_access): - """ - Search folders for the search string. + """Search folders for the search string. Args: folder (str): The folder to search through. file_access (FileAccess): file access - + Returns: 0 if pre and post cmd methods in old style are not present; error message if they are. """ file_returns = "" @@ -49,10 +49,8 @@ def search_folder(self, folder, file_access): file_returns += "{}\n".format(file_search_return) return 0 if file_returns == "" else file_returns - def perform(self, file_access, logger): - """ - Check if file exists and if the file includes pre and post cmd methods. + """Check if file exists and if the file includes pre and post cmd methods. Args: file_access (FileAccess): file access @@ -62,4 +60,3 @@ def perform(self, file_access, logger): """ return self.search_folder(SCRIPTS_ROOT, file_access) - diff --git a/src/upgrade_step_from_10p0p0.py b/src/upgrade_step_from_10p0p0.py index 7f4b0a5..030e25d 100644 --- a/src/upgrade_step_from_10p0p0.py +++ b/src/upgrade_step_from_10p0p0.py @@ -1,11 +1,10 @@ -from src.upgrade_step import UpgradeStep import os +from src.upgrade_step import UpgradeStep + class RemoveReflDeviceScreen(UpgradeStep): - """ - Remove reflectometry device screen from all configs and components - """ + """Remove reflectometry device screen from all configs and components""" path = os.path.join("configurations", "devices", "screens.xml") diff --git a/src/upgrade_step_from_11p0p0.py b/src/upgrade_step_from_11p0p0.py index 5f2b994..1a039ec 100644 --- a/src/upgrade_step_from_11p0p0.py +++ b/src/upgrade_step_from_11p0p0.py @@ -1,16 +1,18 @@ import socket -from src.upgrade_step import UpgradeStep -from src.common_upgrades.utils.macro import Macro + from src.common_upgrades.change_macros_in_xml import ChangeMacrosInXML from src.common_upgrades.change_pvs_in_xml import ChangePVsInXML +from src.common_upgrades.utils.macro import Macro +from src.upgrade_step import UpgradeStep + class RenameMercurySoftwarePressureControlMacros(UpgradeStep): - """ - SPC_... macro names have been adjusted to FLOW_SPC... names to differentiate them + """SPC_... macro names have been adjusted to FLOW_SPC... names to differentiate them from the new VTI Software Pressure Control macros that have been added with the new logic. - Rename the old macros to the new ones. + Rename the old macros to the new ones. """ + rename_macros = [ (Macro("FULL_AUTO_PRESSURE_1"), Macro("FLOW_SPC_PRESSURE_1")), (Macro("FULL_AUTO_PRESSURE_2"), Macro("FLOW_SPC_PRESSURE_2")), @@ -25,22 +27,102 @@ class RenameMercurySoftwarePressureControlMacros(UpgradeStep): ] new_macros = [ - (Macro("SPC_TYPE_1"), "^(FLOW|VTI|NONE)$", "Software pressure control method to use on Temperature 1", "NONE"), - (Macro("SPC_TYPE_2"), "^(FLOW|VTI|NONE)$", "Software pressure control method to use on Temperature 2", "NONE"), - (Macro("SPC_TYPE_3"), "^(FLOW|VTI|NONE)$", "Software pressure control method to use on Temperature 3", "NONE"), - (Macro("SPC_TYPE_4"), "^(FLOW|VTI|NONE)$", "Software pressure control method to use on Temperature 4", "NONE"), - (Macro("FLOW_SPC_TABLE_FILE"), "^\.*$", "File to load to related temperature to pressure from calibration directory other_devices.", "little_blue_cryostat.txt"), - (Macro("VTI_SPC_PRESSURE_1"), "^[1,2]$", "VTI software pressure control: The index of the pressure card to control with temp1.", ""), - (Macro("VTI_SPC_PRESSURE_2"), "^[1,2]$", "VTI software pressure control: The index of the pressure card to control with temp2.", ""), - (Macro("VTI_SPC_PRESSURE_3"), "^[1,2]$", "VTI software pressure control: The index of the pressure card to control with temp3.", ""), - (Macro("VTI_SPC_PRESSURE_4"), "^[1,2]$", "VTI software pressure control: The index of the pressure card to control with temp4.", ""), - (Macro("VTI_SPC_MIN_PRESSURE"), "^[0-9]+\.?[0-9]*$", "VTI software pressure control: minimum pressure allowed.", "0.0"), - (Macro("VTI_SPC_MAX_PRESSURE"), "^[0-9]+\.?[0-9]*$", "VTI software pressure control: maximum pressure allowed.", "0.0"), - (Macro("VTI_SPC_PRESSURE_CONSTANT"), "^[0-9]+\.?[0-9]*$", "VTI software pressure control: constant pressure to use when below cutoff point.", "5.0"), - (Macro("VTI_SPC_PRESSURE_MAX_LKUP"), "^\.*$", "VTI software pressure control: Filename for temp-based lookup table when above cutoff point.", "None.txt"), - (Macro("VTI_SPC_TEMP_CUTOFF_POINT"), "^[0-9]+\.?[0-9]*$", "VTI software pressure control: temperature to switch between using a user-set constant and a linear interpolation function.", "5.0"), - (Macro("VTI_SPC_TEMP_SCALE"), "^[0-9]+\.?[0-9]*$", "VTI software pressure control: amount to scale temp by to further control P vs T dependence.", "2.0"), - (Macro("VTI_SPC_SET_DELAY"), "^[0-9]+\.?[0-9]*$", "VTI software pressure control: delay between making adjustments to the pressure setpoint in seconds.", "10.0") + ( + Macro("SPC_TYPE_1"), + "^(FLOW|VTI|NONE)$", + "Software pressure control method to use on Temperature 1", + "NONE", + ), + ( + Macro("SPC_TYPE_2"), + "^(FLOW|VTI|NONE)$", + "Software pressure control method to use on Temperature 2", + "NONE", + ), + ( + Macro("SPC_TYPE_3"), + "^(FLOW|VTI|NONE)$", + "Software pressure control method to use on Temperature 3", + "NONE", + ), + ( + Macro("SPC_TYPE_4"), + "^(FLOW|VTI|NONE)$", + "Software pressure control method to use on Temperature 4", + "NONE", + ), + ( + Macro("FLOW_SPC_TABLE_FILE"), + "^\.*$", + "File to load to related temperature to pressure from calibration directory other_devices.", + "little_blue_cryostat.txt", + ), + ( + Macro("VTI_SPC_PRESSURE_1"), + "^[1,2]$", + "VTI software pressure control: The index of the pressure card to control with temp1.", + "", + ), + ( + Macro("VTI_SPC_PRESSURE_2"), + "^[1,2]$", + "VTI software pressure control: The index of the pressure card to control with temp2.", + "", + ), + ( + Macro("VTI_SPC_PRESSURE_3"), + "^[1,2]$", + "VTI software pressure control: The index of the pressure card to control with temp3.", + "", + ), + ( + Macro("VTI_SPC_PRESSURE_4"), + "^[1,2]$", + "VTI software pressure control: The index of the pressure card to control with temp4.", + "", + ), + ( + Macro("VTI_SPC_MIN_PRESSURE"), + "^[0-9]+\.?[0-9]*$", + "VTI software pressure control: minimum pressure allowed.", + "0.0", + ), + ( + Macro("VTI_SPC_MAX_PRESSURE"), + "^[0-9]+\.?[0-9]*$", + "VTI software pressure control: maximum pressure allowed.", + "0.0", + ), + ( + Macro("VTI_SPC_PRESSURE_CONSTANT"), + "^[0-9]+\.?[0-9]*$", + "VTI software pressure control: constant pressure to use when below cutoff point.", + "5.0", + ), + ( + Macro("VTI_SPC_PRESSURE_MAX_LKUP"), + "^\.*$", + "VTI software pressure control: Filename for temp-based lookup table when above cutoff point.", + "None.txt", + ), + ( + Macro("VTI_SPC_TEMP_CUTOFF_POINT"), + "^[0-9]+\.?[0-9]*$", + "VTI software pressure control: temperature to switch between using a user-set constant and a linear interpolation function.", + "5.0", + ), + ( + Macro("VTI_SPC_TEMP_SCALE"), + "^[0-9]+\.?[0-9]*$", + "VTI software pressure control: amount to scale temp by to further control P vs T dependence.", + "2.0", + ), + ( + Macro("VTI_SPC_SET_DELAY"), + "^[0-9]+\.?[0-9]*$", + "VTI software pressure control: delay between making adjustments to the pressure setpoint in seconds.", + "10.0", + ), ] def perform(self, file_access, logger): diff --git a/src/upgrade_step_from_12p0p0.py b/src/upgrade_step_from_12p0p0.py index c767256..473e9aa 100644 --- a/src/upgrade_step_from_12p0p0.py +++ b/src/upgrade_step_from_12p0p0.py @@ -1,26 +1,24 @@ import os import re -from src.upgrade_step import UpgradeStep from src.common_upgrades.utils.constants import SUPPORT_ROOT from src.file_access import FileAccess from src.local_logger import LocalLogger +from src.upgrade_step import UpgradeStep class UpgradeJawsForPositionAutosave(UpgradeStep): - """ - Update all batch files that load a database file using 'slits.template' to support autosave. - """ + """Update all batch files that load a database file using 'slits.template' to support autosave.""" def perform(self, file_access: FileAccess, logger: LocalLogger): result = 0 # Get database files using 'slits.template'. - database_files = [] + database_files = [] for path in file_access.get_file_paths(SUPPORT_ROOT, ".substitutions"): - if (file_access.file_contains(path, "slits.template")): + if file_access.file_contains(path, "slits.template"): database_files.append(os.path.basename(path).split(".")[0] + ".db") - + logger.info(f"Database files using slits.template: {' '.join(database_files)}") # Check if batch files load any of the database files. @@ -34,21 +32,31 @@ def perform(self, file_access: FileAccess, logger: LocalLogger): new_contents = [] for line in contents: new_line = line - - if any(re.search(fr"[\\|\/|\"]{database_file}", line) for database_file in database_files) and \ - "dbLoadRecords" in line and \ - "IFINIT_FROM_AS" not in line and \ - "IFNOTINIT_FROM_AS" not in line: + if ( + any( + re.search(rf"[\\|\/|\"]{database_file}", line) + for database_file in database_files + ) + and "dbLoadRecords" in line + and "IFINIT_FROM_AS" not in line + and "IFNOTINIT_FROM_AS" not in line + ): logger.info(f"Adding macros to {line}") - new_line = re.sub(r"\,?\"\)$", ",IFINIT_FROM_AS=$(IFINIT_JAWS_FROM_AS=#),IFNOTINIT_FROM_AS=$(IFNOTINIT_JAWS_FROM_AS=)\")", new_line) - + new_line = re.sub( + r"\,?\"\)$", + ',IFINIT_FROM_AS=$(IFINIT_JAWS_FROM_AS=#),IFNOTINIT_FROM_AS=$(IFNOTINIT_JAWS_FROM_AS=)")', + new_line, + ) + if new_line == line: - logger.error(f"Failed to modify {line}, check to see if it needs to be manually changed.") + logger.error( + f"Failed to modify {line}, check to see if it needs to be manually changed." + ) result = -1 - + new_contents.append(new_line) - + # Write if there are any changes. if contents != new_contents: file_access.write_file(path, new_contents) diff --git a/src/upgrade_step_from_12p0p1.py b/src/upgrade_step_from_12p0p1.py index 050da5c..3e6ed6d 100644 --- a/src/upgrade_step_from_12p0p1.py +++ b/src/upgrade_step_from_12p0p1.py @@ -5,11 +5,13 @@ class AddOscCollimMovingIndicator(UpgradeStep): - """ - Update oscillatingCollimator.cmd on LET and MERLIN to load stability check DB - """ + """Update oscillatingCollimator.cmd on LET and MERLIN to load stability check DB""" + path = os.path.join("configurations", "galil", "oscillatingCollimator.cmd") - new_lines = ['\n# load stability check DB', r'dbLoadRecords("$(UTILITIES)/db/check_stability.db", "P=$(MYPVPREFIX)MOT:,INP_VAL=$(MYPVPREFIX)MOT:DMC01:Galil0Bi5_STATUS,SP=$(MYPVPREFIX)MOT:DMC01:Galil0Bi5_STATUS,NSAMP=100,TOLERANCE=$(TOLERANCE=0)")\n'] + new_lines = [ + "\n# load stability check DB", + r'dbLoadRecords("$(UTILITIES)/db/check_stability.db", "P=$(MYPVPREFIX)MOT:,INP_VAL=$(MYPVPREFIX)MOT:DMC01:Galil0Bi5_STATUS,SP=$(MYPVPREFIX)MOT:DMC01:Galil0Bi5_STATUS,NSAMP=100,TOLERANCE=$(TOLERANCE=0)")\n', + ] def perform(self, file_access, logger): try: @@ -20,4 +22,3 @@ def perform(self, file_access, logger): except Exception as e: logger.error("Unable to perform upgrade, caught error: {}".format(e)) return 1 - \ No newline at end of file diff --git a/src/upgrade_step_from_12p0p2.py b/src/upgrade_step_from_12p0p2.py index 2f434b3..c58c5d9 100644 --- a/src/upgrade_step_from_12p0p2.py +++ b/src/upgrade_step_from_12p0p2.py @@ -1,14 +1,12 @@ import os -import socket -from src.upgrade_step import UpgradeStep from src.common_upgrades.sql_utilities import SqlConnection, run_sql_file from src.common_upgrades.utils.constants import EPICS_ROOT +from src.upgrade_step import UpgradeStep + class UpgradeFrom12p0p2(UpgradeStep): - """ - add sql tables for JMS2RDB - """ + """add sql tables for JMS2RDB""" def perform(self, file_access, logger): # add JMS2RDB Tables diff --git a/src/upgrade_step_from_12p0p3.py b/src/upgrade_step_from_12p0p3.py index abd2419..db61302 100644 --- a/src/upgrade_step_from_12p0p3.py +++ b/src/upgrade_step_from_12p0p3.py @@ -1,14 +1,12 @@ import os -import socket -from src.upgrade_step import UpgradeStep from src.common_upgrades.sql_utilities import SqlConnection, run_sql_file from src.common_upgrades.utils.constants import EPICS_ROOT +from src.upgrade_step import UpgradeStep + class UpgradeFrom12p0p3(UpgradeStep): - """ - add sql tables for MOXA - """ + """add sql tables for MOXA""" def perform(self, file_access, logger): # add MOXA Tables diff --git a/src/upgrade_step_from_6p0p0.py b/src/upgrade_step_from_6p0p0.py index 5bb24b0..2674c1e 100644 --- a/src/upgrade_step_from_6p0p0.py +++ b/src/upgrade_step_from_6p0p0.py @@ -4,19 +4,28 @@ from src.common_upgrades.utils.macro import Macro from src.upgrade_step import UpgradeStep + class SetDanfysikDisableAutoonoffMacros(UpgradeStep): - """ - Set the DISABLE_AUTONOFF macro to true for EMU or add it if not present. When this macro is true, + """Set the DISABLE_AUTONOFF macro to true for EMU or add it if not present. When this macro is true, settings will be displayed on the Danfysik OPI allowing automatic power turn on/off. """ + def perform(self, file_access, logger): try: hostname = socket.gethostname() ioc_name = "DFKPS" if hostname == "NDXEMU": change_macros_in_xml = ChangeMacrosInXML(file_access, logger) - change_macros_in_xml.add_macro(ioc_name, Macro("DISABLE_AUTOONOFF", "0"), "^(0|1)$", "Disable automatic PSU on/off feature", "1") - change_macros_in_xml.change_macros(ioc_name, [(Macro("DISABLE_AUTOONOFF"), Macro("DISABLE_AUTOONOFF", "0"))]) + change_macros_in_xml.add_macro( + ioc_name, + Macro("DISABLE_AUTOONOFF", "0"), + "^(0|1)$", + "Disable automatic PSU on/off feature", + "1", + ) + change_macros_in_xml.change_macros( + ioc_name, [(Macro("DISABLE_AUTOONOFF"), Macro("DISABLE_AUTOONOFF", "0"))] + ) return 0 except Exception as e: logger.error("Unable to perform upgrade, caught error: {}".format(e)) diff --git a/src/upgrade_step_from_7p2p0.py b/src/upgrade_step_from_7p2p0.py index efda868..061f0f1 100644 --- a/src/upgrade_step_from_7p2p0.py +++ b/src/upgrade_step_from_7p2p0.py @@ -1,33 +1,23 @@ -import os +from future.builtins import input -from src.upgrade_step import UpgradeStep -from src.common_upgrades.synoptics_and_device_screens import SynopticsAndDeviceScreens from src.common_upgrades.change_pvs_in_xml import ChangePVsInXML +from src.common_upgrades.synoptics_and_device_screens import SynopticsAndDeviceScreens from src.common_upgrades.utils.constants import MOTION_SET_POINTS_FOLDER from src.file_access import CachingFileAccess -from future.builtins import input +from src.upgrade_step import UpgradeStep ERROR_CODE = -1 SUCCESS_CODE = 0 class IgnoreRcpttSynoptics(UpgradeStep): - """ - Adds "rcptt_*" files to .gitignore, so that test synoptics are no longer committed. - """ + """Adds "rcptt_*" files to .gitignore, so that test synoptics are no longer committed.""" file_name = ".gitignore" - text_content = ['*.py[co]', - 'rcptt_*/', - 'rcptt_*', - '*.swp', - '*~', - '.idea/', - '.project/'] + text_content = ["*.py[co]", "rcptt_*/", "rcptt_*", "*.swp", "*~", ".idea/", ".project/"] def perform(self, file_access, logger): - """ - Perform the upgrade step + """Perform the upgrade step Args: file_access (FileAccess): file access logger (LocalLogger): logger @@ -56,13 +46,10 @@ def perform(self, file_access, logger): class UpgradeMotionSetPoints(UpgradeStep): - """ - Changes blocks to point at renamed PVs. Warns about changed setup. - """ + """Changes blocks to point at renamed PVs. Warns about changed setup.""" def perform(self, file_access, logger): - """ - Perform the upgrade step + """Perform the upgrade step Args: file_access (FileAccess): file access logger (LocalLogger): logger @@ -73,7 +60,6 @@ def perform(self, file_access, logger): logger.info("Changing motion set point PVs") with CachingFileAccess(file_access): - changer = ChangePVsInXML(file_access, logger) changer.change_pv_name("COORD1", "COORD0") @@ -92,7 +78,9 @@ def perform(self, file_access, logger): print("") print( "{} folder exists. Motion set point configuration has changed significantly" - " in this version and must be manually fixed".format(MOTION_SET_POINTS_FOLDER) + " in this version and must be manually fixed".format( + MOTION_SET_POINTS_FOLDER + ) ) print( "See https://github.com/ISISComputingGroup/ibex_developers_manual/" @@ -103,8 +91,12 @@ def perform(self, file_access, logger): # CoordX:MTR is gone, hard to automatically replace so just raise as issue if changer.get_number_of_instances_of_pv(["COORD0:MTR", "COORD1:MTR"]) > 0: - print("The PV COORDX:MTR has been found in a config/synoptic but no longer exists") - print("Manually replace with a reference to the underlying axis and rerun the upgrade") + print( + "The PV COORDX:MTR has been found in a config/synoptic but no longer exists" + ) + print( + "Manually replace with a reference to the underlying axis and rerun the upgrade" + ) raise RuntimeError("Underlying motor references") return SUCCESS_CODE @@ -115,13 +107,11 @@ def perform(self, file_access, logger): class ChangeReflOPITarget(UpgradeStep): - REFL_OPI_TARGET_OLD = "Reflectometry Front Panel" REFL_OPI_TARGET_NEW = "Reflectometry OPI" def perform(self, file_access, logger): - """ - Perform the upgrade step + """Perform the upgrade step Args: file_access (FileAccess): file access logger (LocalLogger): logger diff --git a/src/upgrade_step_from_7p4p0.py b/src/upgrade_step_from_7p4p0.py index 1ae7701..94bdf40 100644 --- a/src/upgrade_step_from_7p4p0.py +++ b/src/upgrade_step_from_7p4p0.py @@ -4,10 +4,9 @@ from src.common_upgrades.utils.macro import Macro from src.upgrade_step import UpgradeStep + class SetISOBUSForILM200(UpgradeStep): - """ - Set the ILM200 ISOBUS value to None for IMAT as they are the first to not use ISOBUS on the ILM200. - """ + """Set the ILM200 ISOBUS value to None for IMAT as they are the first to not use ISOBUS on the ILM200.""" def perform(self, file_access, logger): try: @@ -16,7 +15,13 @@ def perform(self, file_access, logger): if hostname == "NDXIMAT": ioc_name = "ILM200" change_macros_in_xml = ChangeMacrosInXML(file_access, logger) - change_macros_in_xml.add_macro(ioc_name, Macro("USE_ISOBUS", "No"), "^(Yes|No)$", "Whether to use ISOBUS for communications (default: Yes)", "Yes") + change_macros_in_xml.add_macro( + ioc_name, + Macro("USE_ISOBUS", "No"), + "^(Yes|No)$", + "Whether to use ISOBUS for communications (default: Yes)", + "Yes", + ) return 0 except Exception as e: logger.error("Unable to perform upgrade, caught error: {}".format(e)) diff --git a/src/upgrade_step_from_9p0p0.py b/src/upgrade_step_from_9p0p0.py index 243f7aa..b94ff56 100644 --- a/src/upgrade_step_from_9p0p0.py +++ b/src/upgrade_step_from_9p0p0.py @@ -1,32 +1,43 @@ import socket + from src.upgrade_step import UpgradeStep class ChangeLETCollimatorCmd(UpgradeStep): - """ - Change the LET/MERLIN collimator code to load in the new LET/MERLIN-specific db file. - """ + """Change the LET/MERLIN collimator code to load in the new LET/MERLIN-specific db file.""" def perform(self, file_access, logger): try: hostname = socket.gethostname() if hostname == "NDXLET" or hostname == "NDXMERLIN": - file_access.write_file("configurations\\galil\\oscillatingCollimator.cmd", [r'dbLoadRecords("$(MOTOREXT)/db/oscillatingCollimator_Let.db", "P=$(MYPVPREFIX)MOT:, O=$(COLLIMATOR_PV_PREFIX), M=MTR$(CNUM_PADDED)$(MNUM_PADDED), D=DMC$(CNUM_PADDED), AXIS=$(MOTOR_NUMBER), OTYP=$(OTYP=asynFloat64)")\n'], mode="a") - + file_access.write_file( + "configurations\\galil\\oscillatingCollimator.cmd", + [ + r'dbLoadRecords("$(MOTOREXT)/db/oscillatingCollimator_Let.db", "P=$(MYPVPREFIX)MOT:, O=$(COLLIMATOR_PV_PREFIX), M=MTR$(CNUM_PADDED)$(MNUM_PADDED), D=DMC$(CNUM_PADDED), AXIS=$(MOTOR_NUMBER), OTYP=$(OTYP=asynFloat64)")\n' + ], + mode="a", + ) + return 0 except Exception as e: logger.error("Unable to perform upgrade, caught error: {}".format(e)) return 1 + class RenameGalilMulCmd(UpgradeStep): - """ - Rename all galilmul1.cmd -> galilmul01.cmd - """ - def perform(self, file_access, logger): + """Rename all galilmul1.cmd -> galilmul01.cmd""" + + def perform(self, file_access, logger): try: - file_access.rename_file("configurations\\galilmul\\galilmul1.cmd", "configurations\\galilmul\\galilmul01.cmd") - file_access.rename_file("configurations\\galilmul\\galilmul2.cmd", "configurations\\galilmul\\galilmul02.cmd") - + file_access.rename_file( + "configurations\\galilmul\\galilmul1.cmd", + "configurations\\galilmul\\galilmul01.cmd", + ) + file_access.rename_file( + "configurations\\galilmul\\galilmul2.cmd", + "configurations\\galilmul\\galilmul02.cmd", + ) + return 0 - except Exception as e: + except Exception: pass diff --git a/src/upgrade_step_noop.py b/src/upgrade_step_noop.py index a262f70..62428c4 100644 --- a/src/upgrade_step_noop.py +++ b/src/upgrade_step_noop.py @@ -1,16 +1,11 @@ from src.upgrade_step import UpgradeStep -from .file_access import FileAccess -from .local_logger import LocalLogger class UpgradeStepNoOp(UpgradeStep): - """ - An upgrade step that does nothing. This can be used to add a upgrade to the latest production version. - """ + """An upgrade step that does nothing. This can be used to add a upgrade to the latest production version.""" def perform(self, file_access, logger): - """ - No nothing return sucess + """No nothing return sucess Args: file_access (FileAccess): file access diff --git a/test/mother.py b/test/mother.py index 6f58c4e..b94c115 100644 --- a/test/mother.py +++ b/test/mother.py @@ -1,14 +1,10 @@ -""" -Mother for test objects -""" +"""Mother for test objects""" from xml.dom import minidom class LoggingStub(object): - """ - Stub for logging - """ + """Stub for logging""" def __init__(self): self.log = [] @@ -23,9 +19,7 @@ def info(self, message): class FileAccessStub(object): - """ - Stub for file access - """ + """Stub for file access""" SYNOPTIC_FILENAME = "synoptic_file" @@ -78,17 +72,15 @@ def get_synoptic_files(self): def create_xml_with_iocs(iocs): - """ - Args: + """Args: iocs (list): A list of IOC names Returns: str: xml containing the supplied IOCs """ - doc = minidom.Document() top = doc.createElement("iocs") for ioc in iocs: - child = doc.createElement('ioc') + child = doc.createElement("ioc") child.setAttribute("name", ioc) top.appendChild(child) doc.appendChild(top) diff --git a/test/test_add_to_base_iocs.py b/test/test_add_to_base_iocs.py index fb16a96..ab44a34 100644 --- a/test/test_add_to_base_iocs.py +++ b/test/test_add_to_base_iocs.py @@ -1,11 +1,16 @@ import unittest -from hamcrest import * -from mock import MagicMock as Mock from xml.parsers.expat import ExpatError -from src.common_upgrades.add_to_base_iocs import AddToBaseIOCs, ALREADY_CONTAINS, FILE_TO_CHECK_STR, ADD_AFTER_MISSING -from test.mother import LoggingStub, FileAccessStub, create_xml_with_iocs +from hamcrest import * +from mock import MagicMock as Mock +from src.common_upgrades.add_to_base_iocs import ( + ADD_AFTER_MISSING, + ALREADY_CONTAINS, + FILE_TO_CHECK_STR, + AddToBaseIOCs, +) +from test.mother import FileAccessStub, LoggingStub, create_xml_with_iocs TEST_XML_TO_ADD = """\ @@ -38,7 +43,10 @@ def test_GIVEN_invalid_xml_WHEN_adding_ioc_THEN_error(self): result = adder.perform(self.file_access, self.logger) assert_that(result, is_(-1), "result") - assert_that(self.logger.log_err, has_item("IOC file appears not be valid XML, error '{}'".format(error_str))) + assert_that( + self.logger.log_err, + has_item("IOC file appears not be valid XML, error '{}'".format(error_str)), + ) def test_GIVEN_xml_with_no_iocs_WHEN_get_ioc_names_THEN_empty_list(self): xml = create_xml_with_iocs(list()) @@ -75,31 +83,43 @@ def test_GIVEN_xml_already_containing_ioc_to_add_WHEN_checking_prerequisites_THE adder = AddToBaseIOCs(ioc_to_add, "", "") - self._assert_prerequiste_fails(adder, xml, ALREADY_CONTAINS.format(FILE_TO_CHECK_STR, ioc_to_add)) + self._assert_prerequiste_fails( + adder, xml, ALREADY_CONTAINS.format(FILE_TO_CHECK_STR, ioc_to_add) + ) - def test_GIVEN_xml_already_containing_two_ioc_to_add_WHEN_checking_prerequisites_THEN_error(self): + def test_GIVEN_xml_already_containing_two_ioc_to_add_WHEN_checking_prerequisites_THEN_error( + self, + ): ioc_to_add = "TO_ADD" xml = create_xml_with_iocs([ioc_to_add, "ANOTHER_IOC", ioc_to_add]) adder = AddToBaseIOCs(ioc_to_add, "", "") - self._assert_prerequiste_fails(adder, xml, ALREADY_CONTAINS.format(FILE_TO_CHECK_STR, ioc_to_add)) + self._assert_prerequiste_fails( + adder, xml, ALREADY_CONTAINS.format(FILE_TO_CHECK_STR, ioc_to_add) + ) def test_GIVEN_xml_containing_no_ioc_to_add_after_WHEN_checking_prerequisites_THEN_error(self): after_ioc = "AFTER_THIS" xml = create_xml_with_iocs(["ANOTHER_IOC", "SECOND_IOC"]) adder = AddToBaseIOCs("", after_ioc, "") - self._assert_prerequiste_fails(adder, xml, ADD_AFTER_MISSING.format(FILE_TO_CHECK_STR, 0, after_ioc)) + self._assert_prerequiste_fails( + adder, xml, ADD_AFTER_MISSING.format(FILE_TO_CHECK_STR, 0, after_ioc) + ) def test_GIVEN_xml_containing_two_ioc_to_add_after_WHEN_checking_prerequisites_THEN_error(self): after_ioc = "AFTER_THIS" xml = create_xml_with_iocs([after_ioc, "ANOTHER_IOC", after_ioc]) adder = AddToBaseIOCs("", after_ioc, "") - self._assert_prerequiste_fails(adder, xml, ADD_AFTER_MISSING.format(FILE_TO_CHECK_STR, 2, after_ioc)) + self._assert_prerequiste_fails( + adder, xml, ADD_AFTER_MISSING.format(FILE_TO_CHECK_STR, 2, after_ioc) + ) - def test_GIVEN_xml_containing_ioc_to_add_after_and_no_ioc_to_add_WHEN_checking_prerequisites_THEN_passes(self): + def test_GIVEN_xml_containing_ioc_to_add_after_and_no_ioc_to_add_WHEN_checking_prerequisites_THEN_passes( + self, + ): ioc_to_add = "TO_ADD" after_ioc = "AFTER_THIS" xml = create_xml_with_iocs(["ANOTHER_IOC", after_ioc, "SECOND_IOC"]) @@ -118,8 +138,12 @@ def test_GIVEN_xml_containing_no_ioc_to_add_after_WHEN_add_ioc_THEN_error(self): result = adder._add_ioc(xml, self.logger) assert_that(result, is_(xml), "xml changed despite error") - assert_that(self.logger.log_err, has_item("Could not find {0} ioc in file so no {1} ioc added.".format( - after_ioc, ioc_to_add))) + assert_that( + self.logger.log_err, + has_item( + "Could not find {0} ioc in file so no {1} ioc added.".format(after_ioc, ioc_to_add) + ), + ) def test_GIVEN_xml_containing_ioc_to_add_after_WHEN_add_ioc_THEN_ioc_added(self): ioc_to_add = "TO_ADD" @@ -135,8 +159,11 @@ def test_GIVEN_xml_containing_ioc_to_add_after_WHEN_add_ioc_THEN_ioc_added(self) assert_that(len(iocs), is_(3), "ioc added") assert_that(iocs[1].getAttribute("name"), is_(ioc_to_add), "correctly named ioc added") - [assert_that(iocs[1].hasAttribute(attr), "correct xml added") for attr in ["restart", "autostart", "simlevel"]] + [ + assert_that(iocs[1].hasAttribute(attr), "correct xml added") + for attr in ["restart", "autostart", "simlevel"] + ] -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/test_globals_macro_changing.py b/test/test_globals_macro_changing.py index 080e6af..fbd84a7 100644 --- a/test/test_globals_macro_changing.py +++ b/test/test_globals_macro_changing.py @@ -1,14 +1,14 @@ import unittest + from hamcrest import assert_that + from src.common_upgrades.change_macro_in_globals import ChangeMacroInGlobals -from test.mother import LoggingStub, FileAccessStub, EXAMPLE_GLOBALS_FILE -import os -from src.common_upgrades.utils.macro import Macro from src.common_upgrades.utils.constants import GLOBALS_FILENAME +from src.common_upgrades.utils.macro import Macro +from test.mother import EXAMPLE_GLOBALS_FILE, FileAccessStub, LoggingStub class TestFindingIOC(unittest.TestCase): - def setUp(self): self.file_access = FileAccessStub() self.file_access.existing_files = {GLOBALS_FILENAME: GLOBALS_FILENAME} @@ -22,7 +22,9 @@ def test_that_WHEN_asked_to_load_globals_file_THEN_the_default_globals_file_is_l assert_that(result, reference) - def test_that_GIVEN_globals_file_with_no_requested_iocs_WHEN_filtering_THEN_no_iocs_are_returned(self): + def test_that_GIVEN_globals_file_with_no_requested_iocs_WHEN_filtering_THEN_no_iocs_are_returned( + self, + ): ioc_to_change = "CHANGE_ME" matching_indices = [] @@ -31,7 +33,9 @@ def test_that_GIVEN_globals_file_with_no_requested_iocs_WHEN_filtering_THEN_no_i self.assertEqual(len(matching_indices), 0) - def test_that_GIVEN_globals_file_with_requested_iocs_WHEN_filtering_THEN_the_expected_ioc_is_returned(self): + def test_that_GIVEN_globals_file_with_requested_iocs_WHEN_filtering_THEN_the_expected_ioc_is_returned( + self, + ): ioc_to_change = "BINS" matching_indices = [] @@ -45,7 +49,6 @@ def test_that_GIVEN_globals_file_with_requested_iocs_WHEN_filtering_THEN_the_exp class TestChangingMacro(unittest.TestCase): - def setUp(self): self.file_access = FileAccessStub() self.file_access.existing_files = {GLOBALS_FILENAME: GLOBALS_FILENAME} @@ -54,35 +57,36 @@ def setUp(self): def test_that_GIVEN_globals_file_with_old_macro_THEN_all_old_macros_are_changed(self): ioc_to_change = "GALIL" - macros_to_change = [ - (Macro("CHANGEME"), Macro("CHANGED")) - ] + macros_to_change = [(Macro("CHANGEME"), Macro("CHANGED"))] self.macro_changer.change_macros(ioc_to_change, macros_to_change) - testfile = EXAMPLE_GLOBALS_FILE.replace("CHANGEME", - "CHANGED") + testfile = EXAMPLE_GLOBALS_FILE.replace("CHANGEME", "CHANGED") self.assertEqual(self.file_access.write_file_contents, testfile) self.assertEqual(self.file_access.write_filename, GLOBALS_FILENAME) - def test_that_GIVEN_two_macros_with_only_in_the_globals_file_THEN_only_the_macro_in_the_globals_file_is_changed(self): + def test_that_GIVEN_two_macros_with_only_in_the_globals_file_THEN_only_the_macro_in_the_globals_file_is_changed( + self, + ): ioc_to_change = "GALIL" - macros_to_change = [(Macro("DONTCHANGE"), Macro("CHANGED0")), - (Macro("CHANGEME"), Macro("CHANGED1"))] + macros_to_change = [ + (Macro("DONTCHANGE"), Macro("CHANGED0")), + (Macro("CHANGEME"), Macro("CHANGED1")), + ] self.macro_changer.change_macros(ioc_to_change, macros_to_change) self.assertEqual(self.file_access.write_filename, GLOBALS_FILENAME) - self.assertTrue('CHANGED1' in self.file_access.write_file_contents) - self.assertFalse('CHANGED0' in self.file_access.write_file_contents) + self.assertTrue("CHANGED1" in self.file_access.write_file_contents) + self.assertFalse("CHANGED0" in self.file_access.write_file_contents) - def test_GIVEN_macro_to_change_with_name_and_value_THEN_the_only_macro_matching_both_the_name_and_value_are_changed(self): + def test_GIVEN_macro_to_change_with_name_and_value_THEN_the_only_macro_matching_both_the_name_and_value_are_changed( + self, + ): ioc_to_change = "GALIL" - macros_to_change = [ - (Macro("CHANGEME", "01"), Macro("CHANGED", "001")) - ] + macros_to_change = [(Macro("CHANGEME", "01"), Macro("CHANGED", "001"))] self.macro_changer.change_macros(ioc_to_change, macros_to_change) @@ -94,9 +98,7 @@ def test_GIVEN_macro_to_change_with_name_and_value_THEN_the_only_macro_matching_ def test_that_GIVEN_macro_value_to_change_THEN_the_only_macro_value_is_changed(self): ioc_to_change = "GALIL" - macros_to_change = [ - (Macro("CHANGEME", "01"), Macro("CHANGEME", "001")) - ] + macros_to_change = [(Macro("CHANGEME", "01"), Macro("CHANGEME", "001"))] self.macro_changer.change_macros(ioc_to_change, macros_to_change) @@ -108,14 +110,15 @@ def test_that_GIVEN_macro_value_to_change_THEN_the_only_macro_value_is_changed(s class TestFilteringIOCs(unittest.TestCase): - def setUp(self): self.file_access = FileAccessStub() self.file_access.existing_files = {GLOBALS_FILENAME: GLOBALS_FILENAME} self.logger = LoggingStub() self.macro_changer = ChangeMacroInGlobals(self.file_access, self.logger) - def test_GIVEN_globals_file_with_numbered_iocs_requested_WHEN_filtering_THEN_expected_iocs_returned(self): + def test_GIVEN_globals_file_with_numbered_iocs_requested_WHEN_filtering_THEN_expected_iocs_returned( + self, + ): root_ioc_name = "GALIL" ioc_to_change = root_ioc_name + "_03" @@ -128,7 +131,9 @@ def test_GIVEN_globals_file_with_numbered_iocs_requested_WHEN_filtering_THEN_exp self.assertTrue(all(ioc_to_change in x for x in matched_lines)) self.assertNotEqual(len(matching_indices), 0) - def test_GIVEN_globals_file_with_ioc_containing_requested_WHEN_filtering_THEN_nothing_returned(self): + def test_GIVEN_globals_file_with_ioc_containing_requested_WHEN_filtering_THEN_nothing_returned( + self, + ): root_ioc_name = "GALIL" ioc_name = "PRE-{}-POST".format(root_ioc_name) @@ -142,24 +147,27 @@ def test_GIVEN_globals_file_with_ioc_containing_requested_WHEN_filtering_THEN_no class TestChangingIOCName(unittest.TestCase): - def setUp(self): self.file_access = FileAccessStub() self.file_access.existing_files = {GLOBALS_FILENAME: GLOBALS_FILENAME} self.logger = LoggingStub() self.macro_changer = ChangeMacroInGlobals(self.file_access, self.logger) - def test_GIVEN_IOC_name_in_globals_file_WHEN_name_changed_THEN_all_instances_of_IOC_changed(self): + def test_GIVEN_IOC_name_in_globals_file_WHEN_name_changed_THEN_all_instances_of_IOC_changed( + self, + ): ioc_to_change = "GALIL" new_ioc_name = "CHANGED" self.macro_changer.change_ioc_name(ioc_to_change, new_ioc_name) self.assertEqual(self.file_access.write_filename, GLOBALS_FILENAME) - self.assertTrue('CHANGED' in self.file_access.write_file_contents) - self.assertFalse('GALIL' in self.file_access.write_file_contents) + self.assertTrue("CHANGED" in self.file_access.write_file_contents) + self.assertFalse("GALIL" in self.file_access.write_file_contents) - def test_GIVEN_ioc_in_globals_file_WHEN_name_changed_THEN_all_macro_values_remain_the_same(self): + def test_GIVEN_ioc_in_globals_file_WHEN_name_changed_THEN_all_macro_values_remain_the_same( + self, + ): ioc_to_change = "GALIL" new_ioc_name = "CHANGED" @@ -171,16 +179,18 @@ def test_GIVEN_ioc_in_globals_file_WHEN_name_changed_THEN_all_macro_values_remai self.assertEqual(self.file_access.write_file_contents, testfile) self.assertEqual(self.file_access.write_filename, GLOBALS_FILENAME) - def test_GIVEN_different_iocs_in_globals_file_WHEN_ioc_name_changed_THEN_only_the_desired_name_changed(self): + def test_GIVEN_different_iocs_in_globals_file_WHEN_ioc_name_changed_THEN_only_the_desired_name_changed( + self, + ): ioc_to_change = "GALIL" new_ioc_name = "CHANGED" self.macro_changer.change_ioc_name(ioc_to_change, new_ioc_name) self.assertEqual(self.file_access.write_filename, GLOBALS_FILENAME) - self.assertTrue('CHANGED' in self.file_access.write_file_contents) - self.assertTrue('BINS' in self.file_access.write_file_contents) + self.assertTrue("CHANGED" in self.file_access.write_file_contents) + self.assertTrue("BINS" in self.file_access.write_file_contents) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/test_sql_utils.py b/test/test_sql_utils.py index b02495b..9e3a27c 100644 --- a/test/test_sql_utils.py +++ b/test/test_sql_utils.py @@ -1,32 +1,38 @@ import unittest -from mock import patch, MagicMock -from src.common_upgrades.sql_utilities import SqlConnection, run_sql import mysql.connector +from mock import MagicMock, patch + +from src.common_upgrades.sql_utilities import SqlConnection, run_sql + class TestSQLUtils(unittest.TestCase): def setUp(self): SqlConnection._connection = None - @patch('src.common_upgrades.sql_utilities.getpass') - @patch('src.common_upgrades.sql_utilities.mysql.connector', autospec=mysql.connector) - def test_GIVEN_no_connection_WHEN_connection_created_THEN_no_password_prompted(self, mysql, getpass): + @patch("src.common_upgrades.sql_utilities.getpass") + @patch("src.common_upgrades.sql_utilities.mysql.connector", autospec=mysql.connector) + def test_GIVEN_no_connection_WHEN_connection_created_THEN_no_password_prompted( + self, mysql, getpass + ): with SqlConnection() as s: pass mysql.assert_not_called() getpass.assert_not_called() - @patch('src.common_upgrades.sql_utilities.getpass') - @patch('src.common_upgrades.sql_utilities.mysql.connector', autospec=mysql.connector) + @patch("src.common_upgrades.sql_utilities.getpass") + @patch("src.common_upgrades.sql_utilities.mysql.connector", autospec=mysql.connector) def test_GIVEN_no_connection_WHEN_run_sql_called_THEN_password_prompted(self, mysql, getpass): with SqlConnection() as s: run_sql(MagicMock(), MagicMock()) getpass.assert_called_once() mysql.connect.assert_called_once() - @patch('src.common_upgrades.sql_utilities.getpass') - @patch('src.common_upgrades.sql_utilities.mysql.connector', autospec=mysql.connector) - def test_GIVEN_a_pre_existing_connection_WHEN_run_sql_called_THEN_password_not_prompted(self, mysql, getpass): + @patch("src.common_upgrades.sql_utilities.getpass") + @patch("src.common_upgrades.sql_utilities.mysql.connector", autospec=mysql.connector) + def test_GIVEN_a_pre_existing_connection_WHEN_run_sql_called_THEN_password_not_prompted( + self, mysql, getpass + ): with SqlConnection() as s: run_sql(MagicMock(), MagicMock()) @@ -38,8 +44,8 @@ def test_GIVEN_a_pre_existing_connection_WHEN_run_sql_called_THEN_password_not_p getpass.assert_not_called() mysql.connect.assert_not_called() - @patch('src.common_upgrades.sql_utilities.getpass') - @patch('src.common_upgrades.sql_utilities.mysql.connector', autospec=mysql.connector) + @patch("src.common_upgrades.sql_utilities.getpass") + @patch("src.common_upgrades.sql_utilities.mysql.connector", autospec=mysql.connector) def test_WHEN_run_sql_called_THEN_changes_committed_and_cursor_closed(self, mysql, getpass): with SqlConnection() as s: run_sql(MagicMock(), MagicMock()) @@ -47,11 +53,13 @@ def test_WHEN_run_sql_called_THEN_changes_committed_and_cursor_closed(self, mysq SqlConnection.get_session(MagicMock()).commit.assert_called() SqlConnection.get_session(MagicMock()).cursor().close.assert_called() - @patch('src.common_upgrades.sql_utilities.getpass') - @patch('src.common_upgrades.sql_utilities.mysql.connector', autospec=mysql.connector) + @patch("src.common_upgrades.sql_utilities.getpass") + @patch("src.common_upgrades.sql_utilities.mysql.connector", autospec=mysql.connector) def test_WHEN_run_sql_called_THEN_sql_executed(self, mysql, getpass): with SqlConnection() as s: my_SQL_string = "TEST SQL" run_sql(MagicMock(), my_SQL_string) - SqlConnection.get_session(MagicMock()).cursor().execute.assert_called_with(my_SQL_string) + SqlConnection.get_session(MagicMock()).cursor().execute.assert_called_with( + my_SQL_string + ) diff --git a/test/test_upgrade_base.py b/test/test_upgrade_base.py index dab83f2..a1ba123 100644 --- a/test/test_upgrade_base.py +++ b/test/test_upgrade_base.py @@ -1,14 +1,14 @@ import unittest + from hamcrest import * from mock import MagicMock as Mock +from mother import FileAccessStub, LoggingStub from src.upgrade import Upgrade, UpgradeError from src.upgrade_step import UpgradeStep -from mother import LoggingStub, FileAccessStub class TestUpgradeBase(unittest.TestCase): - @unittest.mock.patch("git.Repo", autospec=True) def setUp(self, repo): self.file_access = FileAccessStub() @@ -21,7 +21,6 @@ def upgrade(self, upgrade_steps=None): upgrade_steps = [(self.first_version, None)] return Upgrade(self.file_access, self.logger, upgrade_steps, self.git_repo) - def test_GIVEN_config_contains_no_version_number_WHEN_load_THEN_version_number_added(self): self.file_access.open_file = Mock(side_effect=IOError("No configs Exist")) @@ -30,8 +29,9 @@ def test_GIVEN_config_contains_no_version_number_WHEN_load_THEN_version_number_a assert_that(result, is_(self.first_version), "Version number") assert_that(self.file_access.wrote_version, is_(self.first_version)) - - def test_GIVEN_config_contains_known_version_number_WHEN_load_THEN_version_number_returned_and_not_written(self): + def test_GIVEN_config_contains_known_version_number_WHEN_load_THEN_version_number_returned_and_not_written( + self, + ): expected_version = self.first_version self.file_access.open_file = Mock(return_value=[expected_version]) @@ -41,17 +41,21 @@ def test_GIVEN_config_contains_known_version_number_WHEN_load_THEN_version_numbe assert_that(self.file_access.wrote_version, none()) def test_GIVEN_config_contains_unknown_version_number_WHEN_upgrade_THEN_error(self): - unknown_version = "unknown" self.file_access.open_file = Mock(return_value=[unknown_version]) result = self.upgrade().upgrade() assert_that(result, is_not(0), "Success") - assert_that(self.logger.log_err, contains_exactly("Unknown version number {0}".format(unknown_version))) + assert_that( + self.logger.log_err, + contains_exactly("Unknown version number {0}".format(unknown_version)), + ) self.git_repo.index.commit.assert_not_called() - def test_GIVEN_config_contains_latest_version_number_WHEN_load_THEN_program_exits_successfully(self): + def test_GIVEN_config_contains_latest_version_number_WHEN_load_THEN_program_exits_successfully( + self, + ): expected_version = "3.3.0" self.file_access.open_file = Mock(return_value=[expected_version]) upgrade_steps = [(expected_version, None)] @@ -59,9 +63,13 @@ def test_GIVEN_config_contains_latest_version_number_WHEN_load_THEN_program_exit result = self.upgrade(upgrade_steps).upgrade() assert_that(result, is_(0), "Success exit") - assert_that(self.logger.log, has_item("Current config is on latest version, no upgrade needed")) + assert_that( + self.logger.log, has_item("Current config is on latest version, no upgrade needed") + ) - def test_GIVEN_config_contains_older_version_number_WHEN_upgrade_THEN_upgrade_done_and_program_exits_successfully(self): + def test_GIVEN_config_contains_older_version_number_WHEN_upgrade_THEN_upgrade_done_and_program_exits_successfully( + self, + ): original_version = "3.2.1" final_version = "3.2.3" @@ -69,20 +77,27 @@ def test_GIVEN_config_contains_older_version_number_WHEN_upgrade_THEN_upgrade_do upgrade_step = Mock(UpgradeStep) upgrade_step.perform = Mock(return_value=0) - upgrade_steps = [(original_version, upgrade_step), - (final_version, None)] + upgrade_steps = [(original_version, upgrade_step), (final_version, None)] result = self.upgrade(upgrade_steps).upgrade() assert_that(result, is_(0), "Success exit") upgrade_step.perform.assert_called_once() - assert_that(self.logger.log, has_item("Finished upgrade. Now on version {0}".format(final_version))) - assert_that(self.file_access.wrote_version, is_(final_version), "Version written to file at the end") - expected_commit_calls = [unittest.mock.call(f'IBEX Upgrade from {original_version}'), - unittest.mock.call(f'IBEX Upgrade to {final_version}')] + assert_that( + self.logger.log, has_item("Finished upgrade. Now on version {0}".format(final_version)) + ) + assert_that( + self.file_access.wrote_version, is_(final_version), "Version written to file at the end" + ) + expected_commit_calls = [ + unittest.mock.call(f"IBEX Upgrade from {original_version}"), + unittest.mock.call(f"IBEX Upgrade to {final_version}"), + ] self.git_repo.index.commit.assert_has_calls(expected_commit_calls, any_order=False) - def test_GIVEN_config_contains_older_version_number_but_not_earliest_and_multiple_steps_WHEN_upgrade_THEN_all_upgrade_steps_equal_to_or_later_than_current_steps_are_done(self): + def test_GIVEN_config_contains_older_version_number_but_not_earliest_and_multiple_steps_WHEN_upgrade_THEN_all_upgrade_steps_equal_to_or_later_than_current_steps_are_done( + self, + ): original_version = "3.2.1" final_version = "3.2.3" @@ -103,7 +118,8 @@ def test_GIVEN_config_contains_older_version_number_but_not_earliest_and_multipl (original_version, upgrade_step_to_do_1), (original_version, upgrade_step_to_do_2), (original_version, upgrade_step_to_do_3), - (final_version, None)] + (final_version, None), + ] result = self.upgrade(upgrade_steps).upgrade() @@ -112,7 +128,9 @@ def test_GIVEN_config_contains_older_version_number_but_not_earliest_and_multipl upgrade_step_to_do_1.perform.assert_called_once() upgrade_step_to_do_2.perform.assert_called_once() upgrade_step_to_do_3.perform.assert_called_once() - assert_that(self.logger.log, has_item("Finished upgrade. Now on version {0}".format(final_version))) + assert_that( + self.logger.log, has_item("Finished upgrade. Now on version {0}".format(final_version)) + ) def test_GIVEN_empty_upgrade_steps_WHEN_init_THEN_error(self): try: @@ -137,16 +155,16 @@ def test_GIVEN_upgrade_step_failes_WHEN_upgrade_THEN_fail(self): upgrade_step = Mock(UpgradeStep) upgrade_step.perform = Mock(return_value=expect_error_code) - upgrade_steps = [(original_version, upgrade_step), - (final_version, None)] + upgrade_steps = [(original_version, upgrade_step), (final_version, None)] result = self.upgrade(upgrade_steps).upgrade() assert_that(result, is_(expect_error_code), "Fail exit") def test_GIVEN_version_number_THEN_upgrade_check_works(self): - import upgrade, check_version - + import check_version + import upgrade + result_not_match = check_version.compare_version_number(upgrade.UPGRADE_STEPS[-2][0]) result_match = check_version.compare_version_number(upgrade.UPGRADE_STEPS[-1][0]) @@ -154,5 +172,5 @@ def test_GIVEN_version_number_THEN_upgrade_check_works(self): assert_that(result_match, is_(0), "Did not pass with correct version numbers") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/test_upgrade_step_check_init_inst.py b/test/test_upgrade_step_check_init_inst.py index 256c0be..3257b85 100644 --- a/test/test_upgrade_step_check_init_inst.py +++ b/test/test_upgrade_step_check_init_inst.py @@ -1,21 +1,22 @@ +import sys import unittest + from hamcrest import assert_that, equal_to, is_not -from mock import MagicMock, patch, mock_open -from mother import LoggingStub, FileAccessStub +from mock import mock_open, patch +from mother import FileAccessStub, LoggingStub + from src.upgrade_step_check_init_inst import UpgradeStepCheckInitInst -import os -import sys module_ = "builtins" -module_ = module_ if module_ in sys.modules else 'builtins' +module_ = module_ if module_ in sys.modules else "builtins" try: import unittest.mock as mock -except (ImportError,) as e: - import mock +except (ImportError,): + pass -class TestUpgradeStepCheckInitInst(unittest.TestCase): +class TestUpgradeStepCheckInitInst(unittest.TestCase): def setUp(self): self.file_access = FileAccessStub() self.upgrade_step = UpgradeStepCheckInitInst() @@ -32,85 +33,141 @@ def test_GIVEN_no_file_called_init_inst_WHEN_search_files_THEN_zero_returned(sel file_names = ["init", "my_init", "another_file"] root = "myfolder" # Act and Assert - assert_that(self.upgrade_step.search_files(file_names, root, self.file_access), - equal_to(0), "File starting with init_inst is not in filenames, so no pre or post cmd would be found by genie") - - @patch('builtins.open', mock_open(read_data=""), create=True) - def test_GIVEN_file_with_name_none_containing_pre_post_cmd_WHEN_search_files_THEN_zero_returned(self): + assert_that( + self.upgrade_step.search_files(file_names, root, self.file_access), + equal_to(0), + "File starting with init_inst is not in filenames, so no pre or post cmd would be found by genie", + ) + + @patch("builtins.open", mock_open(read_data=""), create=True) + def test_GIVEN_file_with_name_none_containing_pre_post_cmd_WHEN_search_files_THEN_zero_returned( + self, + ): # Arrange file_names = ["init", "init_larmor", "another_file"] root = "myfolder" # Act and Assert - assert_that(self.upgrade_step.search_files(file_names, root, self.file_access), - equal_to(0), "pre or post cmd not in init_larmor file, therefore this is ok") + assert_that( + self.upgrade_step.search_files(file_names, root, self.file_access), + equal_to(0), + "pre or post cmd not in init_larmor file, therefore this is ok", + ) - @patch('builtins.open', mock_open(read_data="precmd"), create=True) + @patch("builtins.open", mock_open(read_data="precmd"), create=True) def test_GIVEN_file_with_name_containing_precmd_WHEN_search_files_THEN_error_returned(self): # Arrange file_names = ["init", "init_zoom", "another_file"] root = "myfolder" # Act and Assert - assert_that(self.upgrade_step.search_files(file_names, root, self.file_access), - is_not(equal_to(0)), "pre cmd in init_zoom file, therefore error message should be returned") + assert_that( + self.upgrade_step.search_files(file_names, root, self.file_access), + is_not(equal_to(0)), + "pre cmd in init_zoom file, therefore error message should be returned", + ) - @patch('builtins.open', mock_open(read_data="postcmd")) + @patch("builtins.open", mock_open(read_data="postcmd")) def test_GIVEN_file_with_name_containing_postcmd_WHEN_search_files_THEN_error_returned(self): # Arrange file_names = ["init", "init_inst", "another_file"] root = "myfolder" # Act and Assert - assert_that(self.upgrade_step.search_files(file_names, root, self.file_access), - is_not(equal_to(0)), "postcmd in init_inst file, therefore error message should be returned") + assert_that( + self.upgrade_step.search_files(file_names, root, self.file_access), + is_not(equal_to(0)), + "postcmd in init_inst file, therefore error message should be returned", + ) - @patch('builtins.open', mock_open(read_data="postcmd precmd")) - def test_GIVEN_file_with_name_containing_pre_and_post_cmd_WHEN_search_files_THEN_error_returned(self): + @patch("builtins.open", mock_open(read_data="postcmd precmd")) + def test_GIVEN_file_with_name_containing_pre_and_post_cmd_WHEN_search_files_THEN_error_returned( + self, + ): # Arrange file_names = ["init", "init_iris", "another_file"] root = "myfolder" # Act and Assert - assert_that(self.upgrade_step.search_files(file_names, root, self.file_access), - is_not(equal_to(0)), "pre and post cmd in init_iris file, therefore error message should be returned") + assert_that( + self.upgrade_step.search_files(file_names, root, self.file_access), + is_not(equal_to(0)), + "pre and post cmd in init_iris file, therefore error message should be returned", + ) - def test_GIVEN_directory_structure_and_no_cmd_WHEN_search_folders_THEN_files_and_folders_walked_and_zero_returned(self): + def test_GIVEN_directory_structure_and_no_cmd_WHEN_search_folders_THEN_files_and_folders_walked_and_zero_returned( + self, + ): # Arrange - file_search_returns = [0, 0, 0, 0] # One return for each of file1, 2, 3 and 4 - with patch('os.walk', return_value=self.directory_structure) as mocked_walk,\ - patch('src.upgrade_step_check_init_inst.UpgradeStepCheckInitInst.search_files', side_effect=file_search_returns) as mocked_search_files: + file_search_returns = [0, 0, 0, 0] # One return for each of file1, 2, 3 and 4 + with patch("os.walk", return_value=self.directory_structure) as mocked_walk, patch( + "src.upgrade_step_check_init_inst.UpgradeStepCheckInitInst.search_files", + side_effect=file_search_returns, + ) as mocked_search_files: # Act search_return = self.upgrade_step.search_folder("any", self.file_access) # Assert - assert_that(mocked_search_files.call_count, equal_to(4), "Four files to search, each should be called") + assert_that( + mocked_search_files.call_count, + equal_to(4), + "Four files to search, each should be called", + ) assert_that(search_return, equal_to(0), "Should pass successfully") - - def test_GIVEN_directory_structure_and_file_at_top_level_contains_precmd_WHEN_search_folders_THEN_error_returned(self): + + def test_GIVEN_directory_structure_and_file_at_top_level_contains_precmd_WHEN_search_folders_THEN_error_returned( + self, + ): # Arrange - file_search_returns = ["Error precmd", 0, 0, 0] # One return for each of file1, 2, 3 and 4 - with patch('os.walk', return_value=self.directory_structure) as mocked_walk,\ - patch('src.upgrade_step_check_init_inst.UpgradeStepCheckInitInst.search_files', side_effect=file_search_returns) as mocked_search_files: + file_search_returns = ["Error precmd", 0, 0, 0] # One return for each of file1, 2, 3 and 4 + with patch("os.walk", return_value=self.directory_structure) as mocked_walk, patch( + "src.upgrade_step_check_init_inst.UpgradeStepCheckInitInst.search_files", + side_effect=file_search_returns, + ) as mocked_search_files: # Act search_return = self.upgrade_step.search_folder("any", self.file_access) # Assert assert_that(mocked_search_files.call_count, equal_to(4), "Should search all files") - assert_that(search_return, equal_to(file_search_returns[0]+"\n"), "Return error from searched file") + assert_that( + search_return, + equal_to(file_search_returns[0] + "\n"), + "Return error from searched file", + ) - def test_GIVEN_directory_structure_and_file_at_second_level_contains_postcmd_WHEN_search_folders_THEN_error_returned(self): + def test_GIVEN_directory_structure_and_file_at_second_level_contains_postcmd_WHEN_search_folders_THEN_error_returned( + self, + ): # Arrange - file_search_returns = [0, 0, "Error postcmd", 0] # One return for each of file1, 2, 3 and 4 - with patch('os.walk', return_value=self.directory_structure) as mocked_walk,\ - patch('src.upgrade_step_check_init_inst.UpgradeStepCheckInitInst.search_files', side_effect=file_search_returns) as mocked_search_files: + file_search_returns = [0, 0, "Error postcmd", 0] # One return for each of file1, 2, 3 and 4 + with patch("os.walk", return_value=self.directory_structure) as mocked_walk, patch( + "src.upgrade_step_check_init_inst.UpgradeStepCheckInitInst.search_files", + side_effect=file_search_returns, + ) as mocked_search_files: # Act search_return = self.upgrade_step.search_folder("any", self.file_access) # Assert assert_that(mocked_search_files.call_count, equal_to(4), "Should search all files") - assert_that(search_return, equal_to(file_search_returns[2]+"\n"), "Return error from searched file") + assert_that( + search_return, + equal_to(file_search_returns[2] + "\n"), + "Return error from searched file", + ) - def test_GIVEN_directory_structure_and_two_files_contain_cmd_WHEN_search_folders_THEN_error_returned(self): + def test_GIVEN_directory_structure_and_two_files_contain_cmd_WHEN_search_folders_THEN_error_returned( + self, + ): # Arrange - file_search_returns = ["Error precmd", 0, "Error postcmd", 0] # One return for each of file1, 2, 3 and 4 - with patch('os.walk', return_value=self.directory_structure) as mocked_walk,\ - patch('src.upgrade_step_check_init_inst.UpgradeStepCheckInitInst.search_files', side_effect=file_search_returns) as mocked_search_files: + file_search_returns = [ + "Error precmd", + 0, + "Error postcmd", + 0, + ] # One return for each of file1, 2, 3 and 4 + with patch("os.walk", return_value=self.directory_structure) as mocked_walk, patch( + "src.upgrade_step_check_init_inst.UpgradeStepCheckInitInst.search_files", + side_effect=file_search_returns, + ) as mocked_search_files: # Act search_return = self.upgrade_step.search_folder("any", self.file_access) # Assert assert_that(mocked_search_files.call_count, equal_to(4), "Should search all files") - assert_that(search_return, equal_to(file_search_returns[0]+"\n"+file_search_returns[2]+"\n"), "Return error from searched file") + assert_that( + search_return, + equal_to(file_search_returns[0] + "\n" + file_search_returns[2] + "\n"), + "Return error from searched file", + ) diff --git a/test/test_upgrade_step_from_10p0p0.py b/test/test_upgrade_step_from_10p0p0.py index 7c56dfe..4d098ea 100644 --- a/test/test_upgrade_step_from_10p0p0.py +++ b/test/test_upgrade_step_from_10p0p0.py @@ -1,6 +1,8 @@ import os import unittest -from mother import LoggingStub, FileAccessStub + +from mother import LoggingStub + from src.file_access import FileAccess from src.upgrade_step_from_10p0p0 import RemoveReflDeviceScreen @@ -42,7 +44,6 @@ class TestRemoveReflDeviceScreen(unittest.TestCase): - def setUp(self): self.upgrade_step = RemoveReflDeviceScreen() self.logger = LoggingStub() @@ -52,12 +53,16 @@ def test_GIVEN_refl_device_screen_in_config_WHEN_upgrade_performed_THEN_screen_r # Given self.file_access.create_directories(SCREEN_FILE_PATH) self.file_access.write_file(SCREEN_FILE_PATH, SCREENS_FILE, file_full=True) - self.assertTrue(self.file_access.file_contains(SCREEN_FILE_PATH, "Reflectometry OPI")) + self.assertTrue( + self.file_access.file_contains(SCREEN_FILE_PATH, "Reflectometry OPI") + ) self.assertTrue(self.file_access.exists(SCREEN_FILE_PATH)) # When self.upgrade_step.perform(self.file_access, self.logger) # Then - self.assertFalse(self.file_access.file_contains(SCREEN_FILE_PATH, "Reflectometry OPI")) + self.assertFalse( + self.file_access.file_contains(SCREEN_FILE_PATH, "Reflectometry OPI") + ) # Cleanup created files self.file_access.remove_file(os.path.join(CONFIG_ROOT, SCREEN_FILE_PATH)) @@ -74,5 +79,5 @@ def test_GIVEN_no_device_screen_in_config_THEN_nothing_happens_without_errors(se self.upgrade_step.perform(self.file_access, self.logger) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/test_upgrade_step_from_12p0p0.py b/test/test_upgrade_step_from_12p0p0.py index ab67e38..37c7a39 100644 --- a/test/test_upgrade_step_from_12p0p0.py +++ b/test/test_upgrade_step_from_12p0p0.py @@ -1,86 +1,118 @@ import unittest -import mock +import mock from mother import FileAccessStub, LoggingStub + from src.upgrade_step_from_12p0p0 import UpgradeJawsForPositionAutosave class TestUpgradeJawsForPositionAutosave(unittest.TestCase): - def setUp(self): self.logger = LoggingStub() self.file_access = FileAccessStub() self.upgrade_step = UpgradeJawsForPositionAutosave() - - - def _perform(self, substitution_files: tuple[str], batch_files: tuple[str], matches: list[bool], batch_files_contents: list[list[str]]): + + def _perform( + self, + substitution_files: tuple[str], + batch_files: tuple[str], + matches: list[bool], + batch_files_contents: list[list[str]], + ): self.file_access.get_file_paths = mock.Mock(side_effect=[substitution_files, batch_files]) self.file_access.file_contains = mock.Mock(side_effect=matches) self.file_access.open_file = mock.Mock(side_effect=batch_files_contents) self.file_access.write_file = mock.Mock() return self.upgrade_step.perform(self.file_access, self.logger) - def test_GIVEN_files_with_correct_db_and_no_macros_WHEN_upgrade_THEN_macros_added_correctly(self): + def test_GIVEN_files_with_correct_db_and_no_macros_WHEN_upgrade_THEN_macros_added_correctly( + self, + ): substitution_files = ("jaws.substitutions", "name.substitutions") batch_files = ("jaws.cmd", "other.cmd") matches = [True, True] - batch_files_contents= [ - ["""# Comment""", # Unrelated text. - """dbLoadRecords("\\jaws.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:,")""", # Trailing comma. - """dbLoadRecords("jaws.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:")"""], # Local db file path. - - ["""dbLoadRecordsList("\\name.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:")""", # Alternate load function. - """dbLoadRecords("/not_name.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:")""", # Partial match. - """"""]] # White space. - - self.assertEqual(self._perform(substitution_files, batch_files, matches, batch_files_contents), 0) - - self.file_access.write_file.assert_has_calls([ - mock.call( - "jaws.cmd", - ["""# Comment""", - """dbLoadRecords("\\jaws.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:,IFINIT_FROM_AS=$(IFINIT_JAWS_FROM_AS=#),IFNOTINIT_FROM_AS=$(IFNOTINIT_JAWS_FROM_AS=)")""", - """dbLoadRecords("jaws.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:,IFINIT_FROM_AS=$(IFINIT_JAWS_FROM_AS=#),IFNOTINIT_FROM_AS=$(IFNOTINIT_JAWS_FROM_AS=)")"""] - ), - mock.call( - "other.cmd", - ["""dbLoadRecordsList("\\name.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:,IFINIT_FROM_AS=$(IFINIT_JAWS_FROM_AS=#),IFNOTINIT_FROM_AS=$(IFNOTINIT_JAWS_FROM_AS=)")""", - """dbLoadRecords("/not_name.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:")""", - """"""] - )]) + batch_files_contents = [ + [ + """# Comment""", # Unrelated text. + """dbLoadRecords("\\jaws.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:,")""", # Trailing comma. + """dbLoadRecords("jaws.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:")""", + ], # Local db file path. + [ + """dbLoadRecordsList("\\name.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:")""", # Alternate load function. + """dbLoadRecords("/not_name.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:")""", # Partial match. + """""", + ], + ] # White space. + + self.assertEqual( + self._perform(substitution_files, batch_files, matches, batch_files_contents), 0 + ) + + self.file_access.write_file.assert_has_calls( + [ + mock.call( + "jaws.cmd", + [ + """# Comment""", + """dbLoadRecords("\\jaws.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:,IFINIT_FROM_AS=$(IFINIT_JAWS_FROM_AS=#),IFNOTINIT_FROM_AS=$(IFNOTINIT_JAWS_FROM_AS=)")""", + """dbLoadRecords("jaws.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:,IFINIT_FROM_AS=$(IFINIT_JAWS_FROM_AS=#),IFNOTINIT_FROM_AS=$(IFNOTINIT_JAWS_FROM_AS=)")""", + ], + ), + mock.call( + "other.cmd", + [ + """dbLoadRecordsList("\\name.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:,IFINIT_FROM_AS=$(IFINIT_JAWS_FROM_AS=#),IFNOTINIT_FROM_AS=$(IFNOTINIT_JAWS_FROM_AS=)")""", + """dbLoadRecords("/not_name.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:")""", + """""", + ], + ), + ] + ) def test_GIVEN_file_with_correct_db_and_macros_WHEN_upgrade_THEN_no_writes(self): substitution_files = ("jaws.substitutions",) batch_files = ("jaws.cmd",) matches = [True] - batch_files_contents= [ - ["""dbLoadRecords("/jaws.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:,IFINIT_FROM_AS=$(IFINIT_JAWS_FROM_AS=#),IFNOTINIT_FROM_AS=$(IFNOTINIT_JAWS_FROM_AS=)")"""] + batch_files_contents = [ + [ + """dbLoadRecords("/jaws.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:,IFINIT_FROM_AS=$(IFINIT_JAWS_FROM_AS=#),IFNOTINIT_FROM_AS=$(IFNOTINIT_JAWS_FROM_AS=)")""" + ] ] - self.assertEqual(self._perform(substitution_files, batch_files, matches, batch_files_contents), 0) - + self.assertEqual( + self._perform(substitution_files, batch_files, matches, batch_files_contents), 0 + ) + self.file_access.write_file.assert_not_called() def test_GIVEN_file_without_db_WHEN_upgrade_THEN_no_writes(self): substitution_files = ("jaws.substitutions",) batch_files = ("jaws.cmd",) matches = [False] - batch_files_contents= [ + batch_files_contents = [ ["""dbLoadRecords("/other.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:")"""] ] - self.assertEqual(self._perform(substitution_files, batch_files, matches, batch_files_contents), 0) - + self.assertEqual( + self._perform(substitution_files, batch_files, matches, batch_files_contents), 0 + ) + self.file_access.write_file.assert_not_called() - def test_GIVEN_file_with_a_match_but_no_changes_WHEN_upgrade_THEN_no_write_and_step_failed(self): + def test_GIVEN_file_with_a_match_but_no_changes_WHEN_upgrade_THEN_no_write_and_step_failed( + self, + ): substitution_files = ("jaws.substitutions",) batch_files = ("jaws.cmd",) matches = [True] - batch_files_contents= [ - ["""dbLoadRecords("/jaws.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:") # Load with a comment after."""] + batch_files_contents = [ + [ + """dbLoadRecords("/jaws.db","P=$(MYPVPREFIX)MOT:,MACRO=MACRO:") # Load with a comment after.""" + ] ] - self.assertNotEqual(self._perform(substitution_files, batch_files, matches, batch_files_contents), 0) - + self.assertNotEqual( + self._perform(substitution_files, batch_files, matches, batch_files_contents), 0 + ) + self.file_access.write_file.assert_not_called() diff --git a/test/test_upgrade_step_from_7p2p0.py b/test/test_upgrade_step_from_7p2p0.py index bcd6019..95a1801 100644 --- a/test/test_upgrade_step_from_7p2p0.py +++ b/test/test_upgrade_step_from_7p2p0.py @@ -1,33 +1,37 @@ -import os import unittest + from mock import MagicMock as Mock -from test.test_utils import test_changing_synoptics_and_blocks, test_action_does_not_write -from mother import LoggingStub, FileAccessStub +from mother import FileAccessStub, LoggingStub + from src.upgrade_step_from_7p2p0 import IgnoreRcpttSynoptics, UpgradeMotionSetPoints -from functools import partial +from test.test_utils import test_action_does_not_write, test_changing_synoptics_and_blocks class TestIgnoreRcpttSynoptics(unittest.TestCase): - def setUp(self): self.file_access = FileAccessStub() self.upgrade_step = IgnoreRcpttSynoptics() self.logger = LoggingStub() - def test_GIVEN_existing_file_with_no_existing_rpctt_WHEN_writing_content_THEN_append_content(self): + def test_GIVEN_existing_file_with_no_existing_rpctt_WHEN_writing_content_THEN_append_content( + self, + ): self.file_access.exists = Mock(return_value=True) self.file_access.write_file = Mock() self.file_access.line_exists = Mock(return_value=False) self.upgrade_step.perform(self.file_access, self.logger) - self.file_access.write_file.assert_called_once_with(self.upgrade_step.file_name, ["rcptt_*"], "a") + self.file_access.write_file.assert_called_once_with( + self.upgrade_step.file_name, ["rcptt_*"], "a" + ) def test_GIVEN_no_file_WHEN_writing_content_THEN_write_to_new_file(self): self.file_access.exists = Mock(return_value=False) self.file_access.write_file = Mock() # self.file_access.line_exists = Mock(return_value=False) self.upgrade_step.perform(self.file_access, self.logger) - self.file_access.write_file.assert_called_once_with(self.upgrade_step.file_name, - self.upgrade_step.text_content, "w") + self.file_access.write_file.assert_called_once_with( + self.upgrade_step.file_name, self.upgrade_step.text_content, "w" + ) def test_GVEN_existing_file_with_rpctt_WHEN_writing_content_THEN_do_nothing(self): self.file_access.exists = Mock(return_value=True) @@ -38,7 +42,6 @@ def test_GVEN_existing_file_with_rpctt_WHEN_writing_content_THEN_do_nothing(self class TestUpgradeMotionSetPoints(unittest.TestCase): - def setUp(self): self.file_access = FileAccessStub() self.upgrade_step = UpgradeMotionSetPoints() @@ -51,7 +54,9 @@ def test_GIVEN_coord_1_WHEN_step_performed_THEN_convert_to_coord_0(self): def action(): self.upgrade_step.perform(self.file_access, self.logger) - test_changing_synoptics_and_blocks(self.file_access, action, starting_blocks, expected_blocks) + test_changing_synoptics_and_blocks( + self.file_access, action, starting_blocks, expected_blocks + ) def test_GIVEN_coord_2_WHEN_step_performed_THEN_convert_to_coord_1(self): starting_blocks = [("BLOCK_NAME", "COORD2:SOMETHING"), ("BLOCK_NAME_2", "COORD2")] @@ -60,33 +65,57 @@ def test_GIVEN_coord_2_WHEN_step_performed_THEN_convert_to_coord_1(self): def action(): self.upgrade_step.perform(self.file_access, self.logger) - test_changing_synoptics_and_blocks(self.file_access, action, starting_blocks, expected_blocks) + test_changing_synoptics_and_blocks( + self.file_access, action, starting_blocks, expected_blocks + ) def test_GIVEN_coord_2_renamed_PVs_WHEN_step_performed_THEN_convert_to_coord_1(self): - starting_blocks = [("BLOCK_NAME", "COORD2:NO_OFFSET"), ("BLOCK_NAME_2", "COORD2:RBV:OFFSET"), - ("BLOCK_NAME_3", "COORD2:LOOKUP:SET:RBV")] - expected_blocks = [("BLOCK_NAME", "COORD1:NO_OFF"), ("BLOCK_NAME_2", "COORD1:RBV:OFF"), - ("BLOCK_NAME_3", "COORD1:SET:RBV")] + starting_blocks = [ + ("BLOCK_NAME", "COORD2:NO_OFFSET"), + ("BLOCK_NAME_2", "COORD2:RBV:OFFSET"), + ("BLOCK_NAME_3", "COORD2:LOOKUP:SET:RBV"), + ] + expected_blocks = [ + ("BLOCK_NAME", "COORD1:NO_OFF"), + ("BLOCK_NAME_2", "COORD1:RBV:OFF"), + ("BLOCK_NAME_3", "COORD1:SET:RBV"), + ] def action(): self.upgrade_step.perform(self.file_access, self.logger) - test_changing_synoptics_and_blocks(self.file_access, action, starting_blocks, expected_blocks) + test_changing_synoptics_and_blocks( + self.file_access, action, starting_blocks, expected_blocks + ) def test_GIVEN_coord_1_renamed_PVs_WHEN_step_performed_THEN_convert_to_coord_0(self): - starting_blocks = [("BLOCK_NAME", "COORD1:NO_OFFSET"), ("BLOCK_NAME_2", "COORD1:RBV:OFFSET"), - ("BLOCK_NAME_3", "COORD1:LOOKUP:SET:RBV")] - expected_blocks = [("BLOCK_NAME", "COORD0:NO_OFF"), ("BLOCK_NAME_2", "COORD0:RBV:OFF"), - ("BLOCK_NAME_3", "COORD0:SET:RBV")] + starting_blocks = [ + ("BLOCK_NAME", "COORD1:NO_OFFSET"), + ("BLOCK_NAME_2", "COORD1:RBV:OFFSET"), + ("BLOCK_NAME_3", "COORD1:LOOKUP:SET:RBV"), + ] + expected_blocks = [ + ("BLOCK_NAME", "COORD0:NO_OFF"), + ("BLOCK_NAME_2", "COORD0:RBV:OFF"), + ("BLOCK_NAME_3", "COORD0:SET:RBV"), + ] def action(): self.upgrade_step.perform(self.file_access, self.logger) - test_changing_synoptics_and_blocks(self.file_access, action, starting_blocks, expected_blocks) + test_changing_synoptics_and_blocks( + self.file_access, action, starting_blocks, expected_blocks + ) - def test_GIVEN_config_contains_underlying_mtr_WHEN_step_performed_THEN_error_code_returned_and_(self): - starting_blocks = [("BLOCK_NAME", "COORD1:NO_OFFSET"), ("BLOCK_NAME_2", "COORD1:RBV:OFFSET"), - ("BLOCK_NAME_3", "COORD1:LOOKUP:SET:RBV"), ("BLOCK_NAME_4", "COORD1:MTR")] + def test_GIVEN_config_contains_underlying_mtr_WHEN_step_performed_THEN_error_code_returned_and_( + self, + ): + starting_blocks = [ + ("BLOCK_NAME", "COORD1:NO_OFFSET"), + ("BLOCK_NAME_2", "COORD1:RBV:OFFSET"), + ("BLOCK_NAME_3", "COORD1:LOOKUP:SET:RBV"), + ("BLOCK_NAME_4", "COORD1:MTR"), + ] def action(): self.upgrade_step.perform(self.file_access, self.logger) @@ -94,5 +123,5 @@ def action(): test_action_does_not_write(self.file_access, action, starting_blocks) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/test/test_utils.py b/test/test_utils.py index a2d73dc..69a0824 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -1,4 +1,5 @@ from hamcrest import * + from src.common_upgrades.utils.constants import BLOCK_FILE BLOCK_NAMESPACE = "http://epics.isis.rl.ac.uk/schema/blocks/1.0" @@ -57,6 +58,7 @@ def open_file(filename): return create_pv_xml(SYNOPTIC_FILE_XML, SYNOPTIC_XML, starting_blocks) elif filename == BLOCK_FILE: return create_pv_xml(BLOCK_FILE_XML, BLOCK_XML, starting_blocks) + file_access.open_file = open_file diff --git a/test/test_xml_macro_changer.py b/test/test_xml_macro_changer.py index 37c01ad..cd1bfa9 100644 --- a/test/test_xml_macro_changer.py +++ b/test/test_xml_macro_changer.py @@ -1,13 +1,18 @@ import unittest -from hamcrest import * +import xml.etree.ElementTree as ET from functools import partial -from src.common_upgrades.change_macros_in_xml import ChangeMacrosInXML, change_macro_name, change_macro_value -from src.common_upgrades.utils.macro import Macro -from test.mother import LoggingStub, FileAccessStub, create_xml_with_iocs from xml.dom import minidom + +from hamcrest import * from mock import MagicMock as Mock -import xml.etree.ElementTree as ET +from src.common_upgrades.change_macros_in_xml import ( + ChangeMacrosInXML, + change_macro_name, + change_macro_value, +) +from src.common_upgrades.utils.macro import Macro +from test.mother import FileAccessStub, LoggingStub, create_xml_with_iocs NAMESPACE = "http://epics.isis.rl.ac.uk/schema/iocs/1.0" @@ -65,7 +70,9 @@ def generate_many_iocs(configs): def create_galil_ioc(galil_num, macros): - macro_xml = "".join([MACRO_XML.format(name=name, value=value) for name, value in macros.items()]) + macro_xml = "".join( + [MACRO_XML.format(name=name, value=value) for name, value in macros.items()] + ) return IOC_XML.format(name="GALIL_{:0>2}".format(galil_num), macros=macro_xml) @@ -119,11 +126,16 @@ def test_that_GIVEN_xml_with_requested_iocs_WHEN_filtering_THEN_expected_ioc_ret assert_that(len(result), is_(1)) assert_that(result[0].getAttribute("name"), is_(ioc_to_change)) - def test_that_GIVEN_one_xml_with_requested_iocs_and_one_without_WHEN_filtering_THEN_only_expected_ioc_returned(self): + def test_that_GIVEN_one_xml_with_requested_iocs_and_one_without_WHEN_filtering_THEN_only_expected_ioc_returned( + self, + ): # Given: ioc_to_change = "CHANGE_ME" good_config = "CONFIG_1" - configs = {good_config: [ioc_to_change, "ANOTHER_ONE"], "CONFIG_2": ["DONT_CHANGE", "ANOTHER_ONE"]} + configs = { + good_config: [ioc_to_change, "ANOTHER_ONE"], + "CONFIG_2": ["DONT_CHANGE", "ANOTHER_ONE"], + } # When: self.macro_changer._ioc_file_generator = partial(generate_many_iocs, configs) @@ -152,7 +164,9 @@ def test_that_GIVEN_xml_with_numbered_ioc_WHEN_filtering_THEN_expected_ioc_retur assert_that(len(result), is_(1)) assert_that(result[0].getAttribute("name"), is_(ioc_name)) - def test_that_GIVEN_xml_with_ioc_containing_requested_WHEN_filtering_THEN_nothing_returned(self): + def test_that_GIVEN_xml_with_ioc_containing_requested_WHEN_filtering_THEN_nothing_returned( + self, + ): # Given: root_ioc_name = "CHANGE_ME" ioc_name = "PRE-{}-POST".format(root_ioc_name) @@ -170,7 +184,6 @@ def test_that_GIVEN_xml_with_ioc_containing_requested_WHEN_filtering_THEN_nothin class TestChangMacroName(unittest.TestCase): - def setUp(self): self.file_access = FileAccessStub() self.logger = LoggingStub() @@ -211,7 +224,7 @@ def test_that_GIVEN_xml_with_macro_matching_a_REGEX_THEN_macros_are_updated(self test_macro_xml_string = MACRO_XML.format(name="GALILADDR01", value="None") test_macro_xml = minidom.parseString(test_macro_xml_string) macro_node = test_macro_xml.getElementsByTagName("macro")[0] - old_macro = Macro(r'GALILADDR([\d]{2})') + old_macro = Macro(r"GALILADDR([\d]{2})") new_macro = Macro("GALILADDR") # When: @@ -223,7 +236,6 @@ def test_that_GIVEN_xml_with_macro_matching_a_REGEX_THEN_macros_are_updated(self class TestChangMacroValue(unittest.TestCase): - def setUp(self): self.file_access = FileAccessStub() self.logger = LoggingStub() @@ -278,23 +290,24 @@ def test_that_GIVEN_new_macro_without_a_value_THEN_macro_values_are_not_updated( class TestMacroChangesWithMultipleInputs(unittest.TestCase): - def setUp(self): self.file_access = FileAccessStub() self.logger = LoggingStub() self.macro_changer = ChangeMacrosInXML(self.file_access, self.logger) - def test_that_GIVEN_xml_with_single_macro_WHEN_calling_change_macros_THEN_the_single_macro_is_updated(self): + def test_that_GIVEN_xml_with_single_macro_WHEN_calling_change_macros_THEN_the_single_macro_is_updated( + self, + ): # Given: xml = IOC_FILE_XML.format(iocs=create_galil_ioc(1, {"GALILADDRXX": ""})) ioc_name = "GALIL" - macro_to_change = [ - (Macro("GALILADDRXX"), Macro("GALILADDR")) - ] + macro_to_change = [(Macro("GALILADDRXX"), Macro("GALILADDR"))] self.file_access.open_file = Mock(return_value=xml) self.file_access.write_file = Mock() - self.file_access.get_config_files = Mock(return_value=[("file1.xml", self.file_access.open_xml_file(None))]) + self.file_access.get_config_files = Mock( + return_value=[("file1.xml", self.file_access.open_xml_file(None))] + ) # When: self.macro_changer.change_macros(ioc_name, macro_to_change) @@ -310,20 +323,22 @@ def test_that_GIVEN_xml_with_multiple_macros_THEN_only_value_of_named_macro_is_c # Given: xml = IOC_FILE_XML.format(iocs=create_galil_ioc(1, {"GALILADDR": "0", "MTRCTRL": "0"})) ioc_name = "GALIL" - macro_to_change = [ - (Macro("GALILADDR"), Macro("GALILADDR", "1")) - ] + macro_to_change = [(Macro("GALILADDR"), Macro("GALILADDR", "1"))] self.file_access.open_file = Mock(return_value=xml) self.file_access.write_file = Mock() - self.file_access.get_config_files = Mock(return_value=[("file1.xml", self.file_access.open_xml_file(None))]) + self.file_access.get_config_files = Mock( + return_value=[("file1.xml", self.file_access.open_xml_file(None))] + ) # When: self.macro_changer.change_macros(ioc_name, macro_to_change) # Then: written_xml = ET.fromstring(self.file_access.write_file_contents) - result_galiladdr = written_xml.findall(".//ns:macros/*[@name='GALILADDR']", {"ns": NAMESPACE}) + result_galiladdr = written_xml.findall( + ".//ns:macros/*[@name='GALILADDR']", {"ns": NAMESPACE} + ) result_mtrctrl = written_xml.findall(".//ns:macros/*[@name='MTRCTRL']", {"ns": NAMESPACE}) assert_that(result_galiladdr, has_length(1), "changed macro count") @@ -334,27 +349,30 @@ def test_that_GIVEN_xml_with_multiple_macros_THEN_only_value_of_named_macro_is_c assert_that(result_mtrctrl[0].get("name"), is_("MTRCTRL")) assert_that(result_mtrctrl[0].get("value"), is_("0")) - def test_that_GIVEN_xml_with_multiple_old_ioc_macros_THEN_all_macros_are_updated(self): # Given: xml = IOC_FILE_XML.format(iocs=create_galil_ioc(1, {"GALILADDRXX": "", "MTRCTRLXX": ""})) ioc_name = "GALIL" macros_to_change = [ (Macro("GALILADDRXX", ""), Macro("GALILADDR", "1")), - (Macro("MTRCTRLXX", ""), Macro("MTRCTRL", "1")) + (Macro("MTRCTRLXX", ""), Macro("MTRCTRL", "1")), ] self.file_access.write_file_contents = xml self.file_access.open_file = Mock(return_value=self.file_access.write_file_contents) self.file_access.write_file = Mock() - self.file_access.get_config_files = Mock(return_value=[("file1.xml", self.file_access.open_xml_file(None))]) + self.file_access.get_config_files = Mock( + return_value=[("file1.xml", self.file_access.open_xml_file(None))] + ) # When: self.macro_changer.change_macros(ioc_name, macros_to_change) # Then: written_xml = ET.fromstring(self.file_access.write_file_contents) - result_galiladdr = written_xml.findall(".//ns:macros/*[@name='GALILADDR']", {"ns": NAMESPACE}) + result_galiladdr = written_xml.findall( + ".//ns:macros/*[@name='GALILADDR']", {"ns": NAMESPACE} + ) result_mtrctrl = written_xml.findall(".//ns:macros/*[@name='MTRCTRL']", {"ns": NAMESPACE}) assert_that(result_mtrctrl, has_length(1), "changed macro count") @@ -367,15 +385,14 @@ def test_that_GIVEN_xml_with_multiple_old_ioc_macros_THEN_all_macros_are_updated class TestChangeIOCName(unittest.TestCase): - def setUp(self): self.file_access = FileAccessStub() self.logger = LoggingStub() self.macro_changer = ChangeMacrosInXML(self.file_access, self.logger) def create_synoptic_file_with_multiple_IOCs(self, iocs): - """ - Mocks out a synoptic file with multiple IOCs in it. + """Mocks out a synoptic file with multiple IOCs in it. + Args: iocs: List of strings with the IOC names in it @@ -383,7 +400,6 @@ def create_synoptic_file_with_multiple_IOCs(self, iocs): formatted_synoptic_file: A mock XML document containing a sample synoptic """ - synoptics = "".join([SYOPTIC_XML.format(ioc) for ioc in iocs]) formatted_synoptic_file = SYNOPTIC_FILE_XML.format(synoptics=synoptics) @@ -396,7 +412,9 @@ def test_GIVEN_an_ioc_name_WHEN_IOC_change_asked_THEN_ioc_name_is_changed(self): self.file_access.open_file = Mock(return_value=xml) self.file_access.write_file = Mock() - self.file_access.get_config_files = Mock(return_value=[("file1.xml", self.file_access.open_xml_file(None))]) + self.file_access.get_config_files = Mock( + return_value=[("file1.xml", self.file_access.open_xml_file(None))] + ) # When: self.macro_changer.change_ioc_name("GALIL", "CHANGED") @@ -409,21 +427,29 @@ def test_GIVEN_an_ioc_name_WHEN_IOC_change_asked_THEN_ioc_name_is_changed(self): assert_that(iocs[0].get("name"), is_("CHANGED_{:02}".format(ioc_suffix_digit))) - def test_GIVEN_more_than_one_IOC_in_config_WHEN_its_name_is_changed_THEN_IOC_suffix_digits_are_preserved(self): + def test_GIVEN_more_than_one_IOC_in_config_WHEN_its_name_is_changed_THEN_IOC_suffix_digits_are_preserved( + self, + ): # Given: number_of_repeated_iocs = 3 ioc_to_change = "CHANGEME" new_ioc_name = "CHANGED" - ioc_names = ["{}_{:02d}".format(ioc_to_change, i) for i in range(1, number_of_repeated_iocs + 1)] - new_ioc_names = ["{}_{:02d}".format(new_ioc_name, i) for i in range(1, number_of_repeated_iocs + 1)] + ioc_names = [ + "{}_{:02d}".format(ioc_to_change, i) for i in range(1, number_of_repeated_iocs + 1) + ] + new_ioc_names = [ + "{}_{:02d}".format(new_ioc_name, i) for i in range(1, number_of_repeated_iocs + 1) + ] xml_contents = create_xml_with_iocs(ioc_names).toxml() self.file_access.open_file = Mock(return_value=xml_contents) self.file_access.write_file = Mock() - self.file_access.get_config_files = Mock(return_value=[("file1.xml", self.file_access.open_xml_file(None))]) + self.file_access.get_config_files = Mock( + return_value=[("file1.xml", self.file_access.open_xml_file(None))] + ) # When: self.macro_changer.change_ioc_name(ioc_to_change, new_ioc_name) @@ -436,7 +462,9 @@ def test_GIVEN_more_than_one_IOC_in_config_WHEN_its_name_is_changed_THEN_IOC_suf for repeated_ioc_index, ioc in enumerate(iocs): assert_that(ioc.get("name"), is_(new_ioc_names[repeated_ioc_index])) - def test_GIVEN_multiple_different_IOCs_in_configuration_WHEN_ones_name_is_changed_THEN_only_that_ioc_changes(self): + def test_GIVEN_multiple_different_IOCs_in_configuration_WHEN_ones_name_is_changed_THEN_only_that_ioc_changes( + self, + ): # Given: number_of_repeated_iocs = 3 @@ -461,7 +489,9 @@ def test_GIVEN_multiple_different_IOCs_in_configuration_WHEN_ones_name_is_change self.file_access.open_file = Mock(return_value=xml_contents) self.file_access.write_file = Mock() - self.file_access.get_config_files = Mock(return_value=[("file1.xml", self.file_access.open_xml_file(None))]) + self.file_access.get_config_files = Mock( + return_value=[("file1.xml", self.file_access.open_xml_file(None))] + ) # When: self.macro_changer.change_ioc_name(ioc_to_change, new_ioc_name) @@ -474,7 +504,9 @@ def test_GIVEN_multiple_different_IOCs_in_configuration_WHEN_ones_name_is_change for i, ioc in enumerate(iocs): assert_that(ioc.get("name"), is_(new_ioc_names[i])) - def test_GIVEN_synoptic_xml_file_WHEN_IOC_name_changed_THEN_only_the_ioc_synoptics_are_changed(self): + def test_GIVEN_synoptic_xml_file_WHEN_IOC_name_changed_THEN_only_the_ioc_synoptics_are_changed( + self, + ): # Given: ioc_to_change = "CHANGEME" new_ioc_name = "CHANGED" @@ -482,7 +514,9 @@ def test_GIVEN_synoptic_xml_file_WHEN_IOC_name_changed_THEN_only_the_ioc_synopti suffix = "_01" - synoptic_file = self.create_synoptic_file_with_multiple_IOCs([ioc_to_change+suffix, unchanged_ioc+suffix]) + synoptic_file = self.create_synoptic_file_with_multiple_IOCs( + [ioc_to_change + suffix, unchanged_ioc + suffix] + ) self.file_access.open_file = Mock(return_value=synoptic_file) self.file_access.is_dir = Mock(return_value=True) @@ -516,14 +550,18 @@ def test_GIVEN_one_ioc_THEN_add_macro(self): self.file_access.open_file = Mock(return_value=xml) self.file_access.write_file = Mock() - self.file_access.get_config_files = Mock(return_value=[("file1.xml", self.file_access.open_xml_file(None))]) + self.file_access.get_config_files = Mock( + return_value=[("file1.xml", self.file_access.open_xml_file(None))] + ) # When: self.macro_changer.add_macro(ioc_name, macro_to_add, pattern, description, default) # Then: written_xml = ET.fromstring(self.file_access.write_file_contents) - result_galiladdr = written_xml.findall(".//ns:macros/*[@name='GALILADDR']", {"ns": NAMESPACE}) + result_galiladdr = written_xml.findall( + ".//ns:macros/*[@name='GALILADDR']", {"ns": NAMESPACE} + ) result_mtrctrl = written_xml.findall(".//ns:macros/*[@name='MTRCTRL']", {"ns": NAMESPACE}) result_test = written_xml.findall(".//ns:macros/*[@name='TEST']", {"ns": NAMESPACE}) @@ -541,7 +579,9 @@ def test_GIVEN_one_ioc_THEN_add_macro(self): def test_GIVEN_one_ioc_that_already_has_macro_THEN_dont_add_macro(self): # Given: - xml = IOC_FILE_XML.format(iocs=create_galil_ioc(1, {"GALILADDR": "0", "MTRCTRL": "0", "TEST": "0"})) + xml = IOC_FILE_XML.format( + iocs=create_galil_ioc(1, {"GALILADDR": "0", "MTRCTRL": "0", "TEST": "0"}) + ) ioc_name = "GALIL" macro_to_add = Macro("TEST", "1") pattern = "^(0|1)$" @@ -550,14 +590,18 @@ def test_GIVEN_one_ioc_that_already_has_macro_THEN_dont_add_macro(self): self.file_access.open_file = Mock(return_value=xml) self.file_access.write_file = Mock() - self.file_access.get_config_files = Mock(return_value=[("file1.xml", self.file_access.open_xml_file(None))]) + self.file_access.get_config_files = Mock( + return_value=[("file1.xml", self.file_access.open_xml_file(None))] + ) # When: self.macro_changer.add_macro(ioc_name, macro_to_add, pattern, description, default) # Then: written_xml = ET.fromstring(self.file_access.write_file_contents) - result_galiladdr = written_xml.findall(".//ns:macros/*[@name='GALILADDR']", {"ns": NAMESPACE}) + result_galiladdr = written_xml.findall( + ".//ns:macros/*[@name='GALILADDR']", {"ns": NAMESPACE} + ) result_mtrctrl = written_xml.findall(".//ns:macros/*[@name='MTRCTRL']", {"ns": NAMESPACE}) result_test = written_xml.findall(".//ns:macros/*[@name='TEST']", {"ns": NAMESPACE}) @@ -574,6 +618,5 @@ def test_GIVEN_one_ioc_that_already_has_macro_THEN_dont_add_macro(self): assert_that(result_test[0].get("value"), is_("0")) -if __name__ == '__main__': - +if __name__ == "__main__": unittest.main() diff --git a/test/test_xml_pv_changer.py b/test/test_xml_pv_changer.py index 6fee5e7..23ef2f2 100644 --- a/test/test_xml_pv_changer.py +++ b/test/test_xml_pv_changer.py @@ -1,7 +1,9 @@ import unittest + from hamcrest import * + from src.common_upgrades.change_pvs_in_xml import ChangePVsInXML -from test.mother import LoggingStub, FileAccessStub +from test.mother import FileAccessStub, LoggingStub from test.test_utils import create_xml_with_starting_blocks, test_changing_synoptics_and_blocks @@ -15,27 +17,48 @@ def action(): pv_changer = ChangePVsInXML(self.file_access, self.logger) pv_changer.change_pv_name(pv_to_change, new_pv) - test_changing_synoptics_and_blocks(self.file_access, action, starting_blocks, expected_blocks) + test_changing_synoptics_and_blocks( + self.file_access, action, starting_blocks, expected_blocks + ) - def test_GIVEN_multiple_different_blocks_in_configuration_WHEN_ones_pv_is_changed_THEN_only_that_block_changes(self): - self._test_changing_pv([("BLOCKNAME", "BLOCK_PV"), ("BLOCKNAME_2", "CHANGEME")], "CHANGEME", "CHANGED", - [("BLOCKNAME", "BLOCK_PV"), ("BLOCKNAME_2", "CHANGED")]) + def test_GIVEN_multiple_different_blocks_in_configuration_WHEN_ones_pv_is_changed_THEN_only_that_block_changes( + self, + ): + self._test_changing_pv( + [("BLOCKNAME", "BLOCK_PV"), ("BLOCKNAME_2", "CHANGEME")], + "CHANGEME", + "CHANGED", + [("BLOCKNAME", "BLOCK_PV"), ("BLOCKNAME_2", "CHANGED")], + ) def test_GIVEN_block_with_part_of_PV_WHEN_pv_is_changed_THEN_only_part_of_PV_changes(self): - self._test_changing_pv([("BLOCKNAME", "CHANGEME:BUT:NOT:ME")], "CHANGEME", "CHANGED", - [("BLOCKNAME", "CHANGED:BUT:NOT:ME")]) + self._test_changing_pv( + [("BLOCKNAME", "CHANGEME:BUT:NOT:ME")], + "CHANGEME", + "CHANGED", + [("BLOCKNAME", "CHANGED:BUT:NOT:ME")], + ) def test_GIVEN_two_blocks_that_need_changing_WHEN_pv_is_changed_THEN_both_change(self): - self._test_changing_pv([("BLOCKNAME", "CHANGEME:BUT:NOT:ME"), ("BLOCKNAME_1", "ALSO:CHANGEME:BUT:NOT:ME")], - "CHANGEME", "CHANGED", - [("BLOCKNAME", "CHANGED:BUT:NOT:ME"), ("BLOCKNAME_1", "ALSO:CHANGED:BUT:NOT:ME")]) + self._test_changing_pv( + [("BLOCKNAME", "CHANGEME:BUT:NOT:ME"), ("BLOCKNAME_1", "ALSO:CHANGEME:BUT:NOT:ME")], + "CHANGEME", + "CHANGED", + [("BLOCKNAME", "CHANGED:BUT:NOT:ME"), ("BLOCKNAME_1", "ALSO:CHANGED:BUT:NOT:ME")], + ) def test_GIVEN_block_with_name_that_could_be_changed_WHEN_pv_is_changed_THEN_name_is_not(self): - self._test_changing_pv([("CHANGEME", "BLAH")], "CHANGEME", "CHANGED", - [("CHANGEME", "BLAH")]) - - def GIVEN_two_blocks_with_pvs_that_obey_filter_WHEN_pv_counted_THEN_returns_two_and_xml_unchanged(self): - starting_blocks = [("BLOCKNAME", "CHANGEME:BUT:NOT:ME"), ("BLOCKNAME_1", "ALSO:CHANGEME:BUT:NOT:ME")] + self._test_changing_pv( + [("CHANGEME", "BLAH")], "CHANGEME", "CHANGED", [("CHANGEME", "BLAH")] + ) + + def GIVEN_two_blocks_with_pvs_that_obey_filter_WHEN_pv_counted_THEN_returns_two_and_xml_unchanged( + self, + ): + starting_blocks = [ + ("BLOCKNAME", "CHANGEME:BUT:NOT:ME"), + ("BLOCKNAME_1", "ALSO:CHANGEME:BUT:NOT:ME"), + ] create_xml_with_starting_blocks(self.file_access, starting_blocks) pv_changer = ChangePVsInXML(self.file_access, self.logger) @@ -44,7 +67,9 @@ def GIVEN_two_blocks_with_pvs_that_obey_filter_WHEN_pv_counted_THEN_returns_two_ assert_that(number_of_pvs, is_(2)) self.file_access.write_file.assert_not_called() - def GIVEN_block_with_name_that_obeys_filter_WHEN_pv_counted_THEN_returns_zero_and_xml_unchanged(self): + def GIVEN_block_with_name_that_obeys_filter_WHEN_pv_counted_THEN_returns_zero_and_xml_unchanged( + self, + ): starting_blocks = [("CHANGEME", "BLAH")] create_xml_with_starting_blocks(self.file_access, starting_blocks) @@ -55,5 +80,5 @@ def GIVEN_block_with_name_that_obeys_filter_WHEN_pv_counted_THEN_returns_zero_an self.file_access.write_file.assert_not_called() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/upgrade.py b/upgrade.py index f518ecf..945adc3 100644 --- a/upgrade.py +++ b/upgrade.py @@ -5,8 +5,13 @@ from src.git_utils import RepoFactory from src.local_logger import LocalLogger from src.upgrade import Upgrade +from src.upgrade_step_add_meta_tag import UpgradeStepAddMetaXmlElement from src.upgrade_step_from_6p0p0 import SetDanfysikDisableAutoonoffMacros -from src.upgrade_step_from_7p2p0 import IgnoreRcpttSynoptics, UpgradeMotionSetPoints, ChangeReflOPITarget +from src.upgrade_step_from_7p2p0 import ( + ChangeReflOPITarget, + IgnoreRcpttSynoptics, + UpgradeMotionSetPoints, +) from src.upgrade_step_from_7p4p0 import SetISOBUSForILM200 from src.upgrade_step_from_9p0p0 import ChangeLETCollimatorCmd from src.upgrade_step_from_10p0p0 import RemoveReflDeviceScreen @@ -16,8 +21,6 @@ from src.upgrade_step_from_12p0p2 import UpgradeFrom12p0p2 from src.upgrade_step_from_12p0p3 import UpgradeFrom12p0p3 from src.upgrade_step_noop import UpgradeStepNoOp -from src.upgrade_step_add_meta_tag import UpgradeStepAddMetaXmlElement - # A list of upgrade step tuples tuple is name of version to apply the upgrade to and upgrade class. # The last step should have an upgrade class of None (this is how it knows it has reached the end) @@ -41,7 +44,10 @@ ("7.0.0", UpgradeStepNoOp()), ("7.1.0", UpgradeStepNoOp()), ("7.2.0", IgnoreRcpttSynoptics()), - ("7.2.1", UpgradeStepNoOp()), # This is in the correct order as 7.2.1 happened before the upgrade of the motion setpoints + ( + "7.2.1", + UpgradeStepNoOp(), + ), # This is in the correct order as 7.2.1 happened before the upgrade of the motion setpoints ("7.2.0.1", UpgradeMotionSetPoints()), ("7.2.0.2", UpgradeStepNoOp()), ("7.2.1.1", ChangeReflOPITarget()), @@ -76,5 +82,7 @@ file_access = FileAccess(logger, config_root) git_repo = RepoFactory.get_repo(config_root) - upgrade = Upgrade(file_access=file_access, logger=logger, upgrade_steps=UPGRADE_STEPS, git_repo=git_repo) + upgrade = Upgrade( + file_access=file_access, logger=logger, upgrade_steps=UPGRADE_STEPS, git_repo=git_repo + ) sys.exit(upgrade.upgrade())