From d8e8ddd3cfe92400b98436820d56903d34491022 Mon Sep 17 00:00:00 2001 From: Cristian Le Date: Fri, 25 Aug 2023 09:37:00 +0200 Subject: [PATCH 1/6] Add type-hints Signed-off-by: Cristian Le --- fmf/base.py | 105 ++++++++++++++++++++++++++++++++++--------------- fmf/context.py | 100 ++++++++++++++++++++++++---------------------- fmf/utils.py | 16 ++++++-- 3 files changed, 139 insertions(+), 82 deletions(-) diff --git a/fmf/base.py b/fmf/base.py index c9c7a454..abf0b769 100644 --- a/fmf/base.py +++ b/fmf/base.py @@ -1,11 +1,16 @@ """ Base Metadata Classes """ +from __future__ import annotations + import copy import os import re import subprocess +from collections.abc import Iterator, Mapping from io import open from pprint import pformat as pretty +# TODO: py3.10: typing.Optional, typing.Union -> '|' operator +from typing import Any, Optional, TypeAlias, Union import jsonschema from ruamel.yaml import YAML @@ -24,6 +29,17 @@ MAIN = "main" + SUFFIX IGNORED_DIRECTORIES = ['/dev', '/proc', '/sys'] +# TypeHints +RawDataType: TypeAlias = Union[None, int, float, str, bool] +ListDataType: TypeAlias = list[Union[RawDataType, 'ListDataType', 'DictDataType']] +DictDataType: TypeAlias = dict[str, Union[RawDataType, ListDataType, 'DictDataType']] +# Equivalent to: +# JSON: TypeAlias = dict[str, "JSON"] | list["JSON"] | str | int | float | bool | None +DataType: TypeAlias = Union[RawDataType, ListDataType, DictDataType] +TreeData: TypeAlias = dict[str, DataType] +TreeDataPath: TypeAlias = Union[TreeData, str] # Either TreeData or path +JsonSchema: TypeAlias = Mapping[str, Any] + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Metadata @@ -31,8 +47,22 @@ class Tree: """ Metadata Tree """ - - def __init__(self, data, name=None, parent=None): + parent: Optional[Tree] + children: dict[str, Tree] + data: TreeData + sources: list[str] + root: Optional[str] + version: int + original_data: TreeData + _commit: Optional[Union[str, bool]] + _raw_data: TreeData + _updated: bool + _directives: TreeData + _symlinkdirs: list[str] + + def __init__(self, data: TreeDataPath, + name: Optional[str] = None, + parent: Optional[Tree] = None): """ Initialize metadata tree from directory path or data dictionary @@ -95,7 +125,7 @@ def __init__(self, data, name=None, parent=None): log.debug("New tree '{0}' created.".format(self)) @property - def commit(self): + def commit(self) -> Union[str, bool]: """ Commit hash if tree grows under a git repo, False otherwise @@ -124,7 +154,7 @@ def __str__(self): """ Use tree name as identifier """ return self.name - def _initialize(self, path): + def _initialize(self, path: str) -> None: """ Find metadata tree root, detect format version """ # Find the tree root root = os.path.abspath(path) @@ -150,7 +180,8 @@ def _initialize(self, path): except ValueError: raise utils.FormatError("Invalid version format") - def _merge_plus(self, data, key, value, prepend=False): + def _merge_plus(self, data: TreeData, key: str, + value: DataType, prepend: bool = False) -> None: """ Handle extending attributes using the '+' suffix """ # Nothing to do if key not in parent if key not in data: @@ -171,7 +202,7 @@ def _merge_plus(self, data, key, value, prepend=False): "MergeError: Key '{0}' in {1} ({2}).".format( key, self.name, str(error))) - def _merge_minus(self, data, key, value): + def _merge_minus(self, data: TreeData, key: str, value: DataType) -> None: """ Handle reducing attributes using the '-' suffix """ # Cannot reduce attribute if key is not present in parent if key not in data: @@ -197,7 +228,7 @@ def _merge_minus(self, data, key, value): "MergeError: Key '{0}' in {1} (wrong type).".format( key, self.name)) - def _merge_special(self, data, source): + def _merge_special(self, data: TreeData, source: TreeData) -> None: """ Merge source dict into data, handle special suffixes """ for key, value in sorted(source.items()): # Handle special attribute merging @@ -211,10 +242,10 @@ def _merge_special(self, data, source): else: data[key] = value - def _process_directives(self, directives): + def _process_directives(self, directives: TreeData) -> None: """ Check and process special fmf directives """ - def check(value, type_, name=None): + def check(value: DataType, type_: type, name: Optional[str] = None): """ Check for correct type """ if not isinstance(value, type_): name = f" '{name}'" if name else "" @@ -239,7 +270,7 @@ def check(value, type_, name=None): self._directives.update(directives) @staticmethod - def init(path): + def init(path: str) -> str: """ Create metadata tree root under given path """ root = os.path.abspath(os.path.join(path, ".fmf")) if os.path.exists(root): @@ -254,7 +285,7 @@ def init(path): root, error)) return root - def merge(self, parent=None): + def merge(self, parent: Optional[Tree] = None) -> None: """ Merge parent data """ # Check parent, append source files if parent is None: @@ -270,7 +301,7 @@ def merge(self, parent=None): self._merge_special(data, self.data) self.data = data - def inherit(self): + def inherit(self) -> None: """ Apply inheritance """ # Preserve original data and merge parent # (original data needed for custom inheritance extensions) @@ -282,7 +313,7 @@ def inherit(self): for child in self.children.values(): child.inherit() - def update(self, data): + def update(self, data: Optional[TreeData]) -> None: """ Update metadata, handle virtual hierarchy """ # Make a note that the data dictionary has been updated # None is handled in the same way as an empty dictionary @@ -320,7 +351,10 @@ def update(self, data): log.debug("Data for '{0}' updated.".format(self)) log.data(pretty(self.data)) - def adjust(self, context, key='adjust', undecided='skip'): + def adjust(self, + context: fmf.context.Context, + key: str = 'adjust', + undecided: str = 'skip') -> None: """ Adjust tree data based on provided context and rules @@ -402,7 +436,8 @@ class describing the environment context. By default, the key for child in self.children.values(): child.adjust(context, key, undecided) - def get(self, name=None, default=None): + def get(self, name: Optional[Union[list[str], str]] + = None, default: DataType = None) -> DataType: """ Get attribute value or return default @@ -431,7 +466,8 @@ def get(self, name=None, default=None): return default return data - def child(self, name, data, source=None): + def child(self, name: str, data: Optional[TreeDataPath], + source: Optional[str] = None) -> None: """ Create or update child with given data """ try: # Update data from a dictionary (handle empty nodes) @@ -447,7 +483,7 @@ def child(self, name, data, source=None): self.children[name].sources.append(source) self.children[name]._raw_data = copy.deepcopy(data) - def grow(self, path): + def grow(self, path: str) -> None: """ Grow the metadata tree for the given directory path @@ -527,7 +563,7 @@ def grow(self, path): del self.children[name] log.debug("Empty tree '{0}' removed.".format(child.name)) - def climb(self, whole=False): + def climb(self, whole: bool = False) -> Iterator[Tree]: """ Climb through the tree (iterate leaf/all nodes) """ if whole or not self.children: yield self @@ -535,15 +571,16 @@ def climb(self, whole=False): for node in child.climb(whole): yield node - def find(self, name): + def find(self, name: str) -> Optional[Tree]: """ Find node with given name """ for node in self.climb(whole=True): if node.name == name: return node return None - def prune(self, whole=False, keys=None, names=None, filters=None, - conditions=None, sources=None): + def prune(self, whole: bool = False, keys: Optional[list[str]] = None, + names: Optional[list[str]] = None, filters: Optional[list[str]] = None, + conditions: Optional[list[str]] = None, sources: Optional[list[str]] = None): """ Filter tree nodes based on given criteria """ keys = keys or [] names = names or [] @@ -579,16 +616,20 @@ def prune(self, whole=False, keys=None, names=None, filters=None, # All criteria met, thus yield the node yield node - def show(self, brief=False, formatting=None, values=None): + def show( + self, + brief: bool = False, + formatting: Optional[str] = None, + values: Optional[list] = None) -> str: """ Show metadata """ values = values or [] # Custom formatting if formatting is not None: formatting = re.sub("\\\\n", "\n", formatting) - name = self.name # noqa: F841 - data = self.data # noqa: F841 - root = self.root # noqa: F841 + name = self.name # noqa: F841 + data = self.data # noqa: F841 + root = self.root # noqa: F841 sources = self.sources # noqa: F841 evaluated = [] for value in values: @@ -609,11 +650,10 @@ def show(self, brief=False, formatting=None, values=None): output += utils.listed(value) else: output += pretty(value) - output return output + "\n" @staticmethod - def node(reference): + def node(reference: TreeData) -> Tree: """ Return Tree node referenced by the fmf identifier @@ -648,7 +688,7 @@ def node(reference): "No tree node found for '{0}' reference".format(reference)) return found_node - def copy(self): + def copy(self) -> Tree: """ Create and return a deep copy of the node and its subtree @@ -663,7 +703,8 @@ def copy(self): self.parent = duplicate.parent = original_parent return duplicate - def validate(self, schema, schema_store=None): + def validate(self, schema: JsonSchema, + schema_store: Optional[dict] = None) -> utils.JsonSchemaValidationResult: """ Validate node data with given JSON Schema and schema references. @@ -705,7 +746,7 @@ def validate(self, schema, schema_store=None): raise utils.JsonSchemaError( f'Errors found in provided schema: {error}') - def _locate_raw_data(self): + def _locate_raw_data(self) -> tuple[TreeData, TreeData, str]: """ Detect location of raw data from which the node has been created @@ -752,7 +793,7 @@ def _locate_raw_data(self): # The full raw data were read from the last source return node_data, full_data, node.sources[-1] - def __enter__(self): + def __enter__(self) -> TreeData: """ Experimental: Modify metadata and store changes to disk @@ -783,7 +824,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): with open(source, "w", encoding='utf-8') as file: file.write(dict_to_yaml(full_data)) - def __getitem__(self, key): + def __getitem__(self, key: str) -> Union[DataType, Tree]: """ Dictionary method to get child node or data item diff --git a/fmf/context.py b/fmf/context.py index 66001e76..636b0c7e 100644 --- a/fmf/context.py +++ b/fmf/context.py @@ -16,7 +16,15 @@ See https://fmf.readthedocs.io/en/latest/modules.html#fmf.Tree.adjust """ +from __future__ import annotations + import re +from collections.abc import Callable +# TODO: py3.10: typing.Optional, typing.Union -> '|' operator +from typing import Optional, TypeAlias, Union + +# TypeHints +ExpressionType: TypeAlias = tuple[Optional[str], Union[str, bool], Optional[list[str]]] class CannotDecide(Exception): @@ -34,7 +42,7 @@ class InvalidContext(Exception): class ContextValue: """ Value for dimension """ - def __init__(self, origin): + def __init__(self, origin: Union[str, tuple[str, ...]]): """ ContextValue("foo-1.2.3") ContextValue(["foo", "1", "2", "3"]) @@ -44,13 +52,13 @@ def __init__(self, origin): else: self._to_compare = self._split_to_version(origin) - def __eq__(self, other): + def __eq__(self, other: ContextValue): if isinstance(other, self.__class__): return self._to_compare == other._to_compare else: return False - def __ne__(self, other): + def __ne__(self, other: ContextValue): return not self.__eq__(other) def __str__(self): @@ -59,7 +67,11 @@ def __str__(self): def __repr__(self): return "{}({})".format(self.__class__.__name__, repr(self._to_compare)) - def version_cmp(self, other, minor_mode=False, ordered=True): + def version_cmp( + self, + other: ContextValue, + minor_mode: bool = False, + ordered: bool = True) -> int: """ Comparing two ContextValue objects @@ -138,7 +150,7 @@ def version_cmp(self, other, minor_mode=False, ordered=True): return -1 # other is larger (more pars) @staticmethod - def compare(first, second): + def compare(first: str, second: str): """ compare two version parts """ # Ideally use `from packaging import version` but we need older # python support too so very rough @@ -155,7 +167,7 @@ def compare(first, second): (first_version < second_version)) @staticmethod - def _split_to_version(text): + def _split_to_version(text: str) -> tuple[str, ...]: """ Try to split text into name + version parts @@ -173,7 +185,6 @@ def _split_to_version(text): :param text: original value :return: tuple of name followed by version parts - :rtype: tuple """ return tuple(re.split(r":|-|\.", text)) @@ -183,119 +194,121 @@ def __hash__(self): class Context: """ Represents https://fmf.readthedocs.io/en/latest/context.html """ + # Operators' definitions - def _op_defined(self, dimension_name, values): + def _op_defined(self, dimension_name: str, values: list[ContextValue]): """ 'is defined' operator """ return dimension_name in self._dimensions - def _op_not_defined(self, dimension_name, values): + def _op_not_defined(self, dimension_name: str, values: list[ContextValue]): """ 'is not defined' operator """ return dimension_name not in self._dimensions - def _op_eq(self, dimension_name, values): + def _op_eq(self, dimension_name: str, values: list[ContextValue]): """ '=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp(it_val, ordered=False) == 0 return self._op_core(dimension_name, values, comparator) - def _op_not_eq(self, dimension_name, values): + def _op_not_eq(self, dimension_name: str, values: list[ContextValue]): """ '!=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp(it_val, ordered=False) != 0 return self._op_core(dimension_name, values, comparator) - def _op_minor_eq(self, dimension_name, values): + def _op_minor_eq(self, dimension_name: str, values: list[ContextValue]): """ '~=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp( it_val, minor_mode=True, ordered=False) == 0 return self._op_core(dimension_name, values, comparator) - def _op_minor_not_eq(self, dimension_name, values): + def _op_minor_not_eq(self, dimension_name: str, values: list[ContextValue]): """ '~!=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp( it_val, minor_mode=True, ordered=False) != 0 return self._op_core(dimension_name, values, comparator) - def _op_minor_less_or_eq(self, dimension_name, values): + def _op_minor_less_or_eq(self, dimension_name: str, values: list[ContextValue]): """ '~<=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp( it_val, minor_mode=True, ordered=True) <= 0 return self._op_core(dimension_name, values, comparator) - def _op_minor_less(self, dimension_name, values): + def _op_minor_less(self, dimension_name: str, values: list[ContextValue]): """ '~<' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp( it_val, minor_mode=True, ordered=True) < 0 return self._op_core(dimension_name, values, comparator) - def _op_less(self, dimension_name, values): + def _op_less(self, dimension_name: str, values: list[ContextValue]): """ '<' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp(it_val, ordered=True) < 0 return self._op_core(dimension_name, values, comparator) - def _op_less_or_equal(self, dimension_name, values): + def _op_less_or_equal(self, dimension_name: str, values: list[ContextValue]): """ '<=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp(it_val, ordered=True) <= 0 return self._op_core(dimension_name, values, comparator) - def _op_greater_or_equal(self, dimension_name, values): + def _op_greater_or_equal(self, dimension_name: str, values: list[ContextValue]): """ '>=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp(it_val, ordered=True) >= 0 return self._op_core(dimension_name, values, comparator) - def _op_minor_greater_or_equal(self, dimension_name, values): + def _op_minor_greater_or_equal(self, dimension_name: str, values: list[ContextValue]): """ '~>=' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp( it_val, minor_mode=True, ordered=True) >= 0 return self._op_core(dimension_name, values, comparator) - def _op_greater(self, dimension_name, values): + def _op_greater(self, dimension_name: str, values: list[ContextValue]): """ '>' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp(it_val, ordered=True) > 0 return self._op_core(dimension_name, values, comparator) - def _op_minor_greater(self, dimension_name, values): + def _op_minor_greater(self, dimension_name: str, values: list[ContextValue]): """ '~>' operator """ - def comparator(dimension_value, it_val): + def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return dimension_value.version_cmp( it_val, minor_mode=True, ordered=True) > 0 return self._op_core(dimension_name, values, comparator) - def _op_core(self, dimension_name, values, comparator): + def _op_core(self, dimension_name: str, values: list[ContextValue], + comparator: Callable[[ContextValue, ContextValue], bool]): """ Evaluate value from dimension vs target values combination @@ -362,6 +375,7 @@ def _op_core(self, dimension_name, values, comparator): # To split by 'or' operator re_or_split = re.compile(r'\bor\b') + _dimensions: dict[str] def __init__(self, *args, **kwargs): """ @@ -394,7 +408,7 @@ def __init__(self, *args, **kwargs): ) @staticmethod - def parse_rule(rule): + def parse_rule(rule: Union[str, bool]) -> list[list[ExpressionType]]: """ Parses rule into expressions @@ -408,7 +422,6 @@ def parse_rule(rule): expr_6 and expr_7 is returned as [[expr_6, expr_7]] :param rule: rule to parse - :type rule: str | bool :return: nested list of expressions from the rule :raises InvalidRule: Syntax error in the rule """ @@ -437,19 +450,18 @@ def parse_rule(rule): return parsed_rule @staticmethod - def parse_value(value): + def parse_value(value: str) -> ContextValue: """ Single place to convert to ContextValue """ return ContextValue(str(value)) @staticmethod - def split_rule_to_groups(rule): + def split_rule_to_groups(rule: str) -> list[list[str]]: """ Split rule into nested lists, no real parsing expr0 and expr1 or expr2 is split into [[expr0, expr1], [expr2]] :param rule: rule to split - :type rule: str :raises InvalidRule: Syntax error in the rule """ rule_parts = [] @@ -467,7 +479,7 @@ def split_rule_to_groups(rule): return rule_parts @staticmethod - def split_expression(expression): + def split_expression(expression: str) -> ExpressionType: """ Split expression to dimension name, operator and values @@ -475,10 +487,8 @@ def split_expression(expression): of the list of values. :param expression: expression to split - :type expression: str :raises InvalidRule: When expression cannot be split, e.g. syntax error :return: tuple(dimension name, operator, list of values) - :rtype: tuple(str|None, str|bool, list|None) """ # true/false match = Context.re_boolean.match(expression) @@ -500,7 +510,7 @@ def split_expression(expression): return (match.group(1), match.group(2), None) raise InvalidRule("Cannot parse expression '{}'.".format(expression)) - def matches(self, rule): + def matches(self, rule: Union[str, bool]) -> bool: """ Does the rule match the current Context? @@ -512,8 +522,6 @@ def matches(self, rule): CannotDecide or False == False or CannotDecide == CannotDecide :param rule: Single rule to decide - :type rule: str | bool - :rtype: bool :raises CannotDecide: Impossible to decide the rule wrt current Context, e.g. dimension is missing :raises InvalidRule: Syntax error in the rule @@ -570,7 +578,7 @@ def matches(self, rule): else: raise CannotDecide() # It's up to callee how to treat this - def evaluate(self, expression): + def evaluate(self, expression: ExpressionType) -> bool: dimension_name, operator, values = expression if isinstance(operator, bool): return operator diff --git a/fmf/utils.py b/fmf/utils.py index a6e53730..204794e8 100644 --- a/fmf/utils.py +++ b/fmf/utils.py @@ -1,5 +1,7 @@ """ Logging, config, constants & utilities """ +from __future__ import annotations + import copy import logging import os @@ -10,7 +12,8 @@ import time import warnings from io import StringIO -from typing import Any, List, NamedTuple +# TODO: py3.10: typing.Optional, typing.Union -> '|' operator +from typing import Any, NamedTuple, Optional from filelock import FileLock, Timeout from ruamel.yaml import YAML, scalarstring @@ -89,6 +92,7 @@ class ReferenceError(GeneralError): class FetchError(GeneralError): """ Fatal error in helper command while fetching """ + # Keep previously used format of the message def __str__(self): @@ -194,7 +198,8 @@ def info(message, newline=True): # Filtering # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def evaluate(expression, data, _node=None): +def evaluate(expression: str, data: fmf.base.TreeData, + _node: Optional[fmf.base.Tree] = None) -> Any: """ Evaluate arbitrary Python expression against given data @@ -210,7 +215,8 @@ def evaluate(expression, data, _node=None): raise FilterError("Internal key is not defined: {}".format(error)) -def filter(filter, data, sensitive=True, regexp=False): +def filter(filter: str, data: fmf.base.TreeData, + sensitive: bool = True, regexp: bool = False) -> bool: """ Return true if provided filter matches given dictionary of values @@ -323,6 +329,7 @@ def check_clause(clause): return any([check_clause(clause) for clause in re.split(r"\s*\|\s*", filter)]) + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Logging # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -620,6 +627,7 @@ def invalidate_cache(): if issues: # pragma: no cover raise GeneralError("\n".join(issues)) + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Fetch Tree from the Remote Repository # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -852,4 +860,4 @@ class JsonSchemaValidationResult(NamedTuple): """ Represents JSON Schema validation result """ result: bool - errors: List[Any] + errors: list[Any] From 2a9fc87016f0109239226886cfe17bcd56e642ca Mon Sep 17 00:00:00 2001 From: Cristian Le Date: Wed, 9 Aug 2023 18:31:32 +0200 Subject: [PATCH 2/6] Add more dict-like interface to `Tree` Signed-off-by: Cristian Le --- fmf/base.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/fmf/base.py b/fmf/base.py index abf0b769..9d436a35 100644 --- a/fmf/base.py +++ b/fmf/base.py @@ -45,7 +45,7 @@ # Metadata # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -class Tree: +class Tree(Mapping[str, Union['Tree', DataType]]): """ Metadata Tree """ parent: Optional[Tree] children: dict[str, Tree] @@ -71,7 +71,7 @@ def __init__(self, data: TreeDataPath, """ # Bail out if no data and no parent given - if not data and not parent: + if not data and parent is None: raise utils.GeneralError( "No data or parent provided to initialize the tree.") @@ -835,3 +835,18 @@ def __getitem__(self, key: str) -> Union[DataType, Tree]: return self.children[key[1:]] else: return self.data[key] + + def __len__(self) -> int: + return len(self.children) + len(self.data) + + def __iter__(self) -> Iterator[str]: + for c in self.children: + yield f"/{c}" + for d in self.data: + yield d + + def __contains__(self, item: str): + if item.startswith("/"): + return item[1:] in self.children + else: + return item in self.data From ec52f6e2ecf4c9bc6ab3aee0feb54e0fcbb02296 Mon Sep 17 00:00:00 2001 From: Cristian Le Date: Thu, 10 Aug 2023 11:54:36 +0200 Subject: [PATCH 3/6] Add mypy pre-commit Signed-off-by: Cristian Le --- .pre-commit-config.yaml | 6 ++++++ fmf/base.py | 8 +++++++- fmf/context.py | 8 +++++++- pyproject.toml | 8 ++++++++ setup.py | 1 + 5 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 pyproject.toml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1ec413a8..55efc3cb 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -38,3 +38,9 @@ repos: - id: end-of-file-fixer - id: mixed-line-ending - id: trailing-whitespace + + - repo: https://github.com/pre-commit/mirrors-mypy + rev: "v1.4.1" + hooks: + - id: mypy + files: ^(fmf) diff --git a/fmf/base.py b/fmf/base.py index 9d436a35..94b9ad6a 100644 --- a/fmf/base.py +++ b/fmf/base.py @@ -6,11 +6,17 @@ import os import re import subprocess +import sys from collections.abc import Iterator, Mapping from io import open from pprint import pformat as pretty # TODO: py3.10: typing.Optional, typing.Union -> '|' operator -from typing import Any, Optional, TypeAlias, Union +from typing import Any, Optional, Union + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias import jsonschema from ruamel.yaml import YAML diff --git a/fmf/context.py b/fmf/context.py index 636b0c7e..73b02f0a 100644 --- a/fmf/context.py +++ b/fmf/context.py @@ -19,9 +19,15 @@ from __future__ import annotations import re +import sys from collections.abc import Callable # TODO: py3.10: typing.Optional, typing.Union -> '|' operator -from typing import Optional, TypeAlias, Union +from typing import Optional, Union + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias # TypeHints ExpressionType: TypeAlias = tuple[Optional[str], Union[str, bool], Optional[list[str]]] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..5d241552 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,8 @@ +[tool.mypy] +strict = true +files = ["fmf"] +python_version = "3.9" +warn_unused_configs = true +show_error_codes = true +disallow_untyped_defs = false +follow_imports = "normal" diff --git a/setup.py b/setup.py index fac0b39d..820c803d 100755 --- a/setup.py +++ b/setup.py @@ -24,6 +24,7 @@ 'ruamel.yaml', 'filelock', 'jsonschema', + 'typing-extensions ; python_version<"3.10"', ] extras_require = { 'docs': ['sphinx>=3', 'sphinx_rtd_theme'], From 2b76e3a24b85e156199da2d9285252dda12c96ca Mon Sep 17 00:00:00 2001 From: Cristian Le Date: Fri, 25 Aug 2023 09:57:08 +0200 Subject: [PATCH 4/6] More type-hints Signed-off-by: Cristian Le --- .pre-commit-config.yaml | 2 + fmf/base.py | 168 +++++++++++++++++++++++----------------- fmf/cli.py | 24 +++--- fmf/context.py | 68 ++++++++-------- fmf/utils.py | 164 ++++++++++++++++++++++++--------------- 5 files changed, 249 insertions(+), 177 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 55efc3cb..ed530795 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -44,3 +44,5 @@ repos: hooks: - id: mypy files: ^(fmf) + additional_dependencies: + - types-jsonschema diff --git a/fmf/base.py b/fmf/base.py index 94b9ad6a..cf14d426 100644 --- a/fmf/base.py +++ b/fmf/base.py @@ -51,7 +51,9 @@ # Metadata # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -class Tree(Mapping[str, Union['Tree', DataType]]): +# Cannot specify class Tree(Mapping[str, Tree | DataType]]): +# This has a different .get method interface incompatible with mypy +class Tree: """ Metadata Tree """ parent: Optional[Tree] children: dict[str, Tree] @@ -60,13 +62,14 @@ class Tree(Mapping[str, Union['Tree', DataType]]): root: Optional[str] version: int original_data: TreeData + name: str _commit: Optional[Union[str, bool]] _raw_data: TreeData _updated: bool _directives: TreeData _symlinkdirs: list[str] - def __init__(self, data: TreeDataPath, + def __init__(self, data: Optional[TreeDataPath], name: Optional[str] = None, parent: Optional[Tree] = None): """ @@ -109,11 +112,13 @@ def __init__(self, data: TreeDataPath, if self.parent is None: self.name = "/" if not isinstance(data, dict): + assert data is not None self._initialize(path=data) data = self.root # Handle child node creation else: self.root = self.parent.root + assert name is not None self.name = os.path.join(self.parent.name, name) # Update data from a dictionary (handle empty nodes) @@ -152,9 +157,10 @@ def commit(self) -> Union[str, bool]: output, _ = utils.run( ['git', 'rev-parse', '--verify', 'HEAD'], cwd=self.root) self._commit = output.strip() + return self._commit except subprocess.CalledProcessError: self._commit = False - return self._commit + return self._commit def __str__(self): """ Use tree name as identifier """ @@ -182,27 +188,30 @@ def _initialize(self, path: str) -> None: log.info("Format version detected: {0}".format(self.version)) except IOError as error: raise utils.FormatError( - "Unable to detect format version: {0}".format(error)) + f"Unable to detect format version: {error}") except ValueError: raise utils.FormatError("Invalid version format") def _merge_plus(self, data: TreeData, key: str, value: DataType, prepend: bool = False) -> None: """ Handle extending attributes using the '+' suffix """ - # Nothing to do if key not in parent - if key not in data: - data[key] = value - return - # Use the special merge for merging dictionaries - if type(data[key]) == type(value) == dict: - self._merge_special(data[key], value) - return - # Attempt to apply the plus operator try: + # Nothing to do if key not in parent + if key not in data: + data[key] = value + return + # Use the special merge for merging dictionaries + data_val = data[key] + if isinstance(data_val, dict) and isinstance(value, (dict, Mapping)): + self._merge_special(data_val, value) + data[key] = data_val + return + # Attempt to apply the plus operator if prepend: - data[key] = value + data[key] + data_val = value + data_val # type: ignore else: - data[key] = data[key] + value + data_val = data_val + value # type: ignore + data[key] = data_val except TypeError as error: raise utils.MergeError( "MergeError: Key '{0}' in {1} ({2}).".format( @@ -210,29 +219,35 @@ def _merge_plus(self, data: TreeData, key: str, def _merge_minus(self, data: TreeData, key: str, value: DataType) -> None: """ Handle reducing attributes using the '-' suffix """ - # Cannot reduce attribute if key is not present in parent - if key not in data: - data[key] = value - raise utils.MergeError( - "MergeError: Key '{0}' in {1} (not inherited).".format( - key, self.name)) - # Subtract numbers - if type(data[key]) == type(value) in [int, float]: - data[key] = data[key] - value - # Replace matching regular expression with empty string - elif type(data[key]) == type(value) == type(""): - data[key] = re.sub(value, '', data[key]) - # Remove given values from the parent list - elif type(data[key]) == type(value) == list: - data[key] = [item for item in data[key] if item not in value] - # Remove given key from the parent dictionary - elif isinstance(data[key], dict) and isinstance(value, list): - for item in value: - data[key].pop(item, None) - else: + try: + # Cannot reduce attribute if key is not present in parent + if key not in data: + data[key] = value + raise utils.MergeError( + "MergeError: Key '{0}' in {1} (not inherited).".format( + key, self.name)) + # Subtract numbers + data_val = data[key] + if type(data_val) == type(value) in [int, float]: + data_val -= value # type: ignore + # Replace matching regular expression with empty string + elif isinstance(data_val, str) and isinstance(value, str): + data_val = re.sub(value, '', data_val) + # Remove given values from the parent list + elif isinstance(data_val, list) and isinstance(value, list): + data_val = [item for item in data_val if item not in value] + # Remove given key from the parent dictionary + elif isinstance(data_val, dict) and isinstance(value, list): + for item in value: + assert isinstance(item, str) + data_val.pop(item, None) + else: + raise TypeError(f"Incompatible types: {type(data_val)} - {type(value)}") + data[key] = data_val + except TypeError as err: raise utils.MergeError( "MergeError: Key '{0}' in {1} (wrong type).".format( - key, self.name)) + key, self.name)) from err def _merge_special(self, data: TreeData, source: TreeData) -> None: """ Merge source dict into data, handle special suffixes """ @@ -251,7 +266,7 @@ def _merge_special(self, data: TreeData, source: TreeData) -> None: def _process_directives(self, directives: TreeData) -> None: """ Check and process special fmf directives """ - def check(value: DataType, type_: type, name: Optional[str] = None): + def check(value: DataType, type_: type, name: Optional[str] = None) -> None: """ Check for correct type """ if not isinstance(value, type_): name = f" '{name}'" if name else "" @@ -331,7 +346,7 @@ def update(self, data: Optional[TreeData]) -> None: # Handle fmf directives first try: directives = data.pop("/") - self._process_directives(directives) + self._process_directives(directives) # type: ignore except KeyError: pass @@ -350,6 +365,7 @@ def update(self, data: Optional[TreeData]) -> None: name = match.groups()[0] value = {match.groups()[1]: value} # Update existing child or create a new one + assert isinstance(value, dict) or isinstance(value, str) or value is None self.child(name, value) # Update regular attributes else: @@ -385,7 +401,7 @@ class describing the environment context. By default, the key try: rules = copy.deepcopy(self.data[key]) log.debug("Applying adjust rules for '{}'.".format(self)) - log.data(rules) + log.data(str(rules)) if isinstance(rules, dict): rules = [rules] if not isinstance(rules, list): @@ -409,6 +425,7 @@ class describing the environment context. By default, the key except KeyError: condition = True + assert isinstance(condition, str) or isinstance(condition, bool) # The optional 'continue' key should be a bool continue_ = rule.pop('continue', True) if not isinstance(continue_, bool): @@ -442,8 +459,8 @@ class describing the environment context. By default, the key for child in self.children.values(): child.adjust(context, key, undecided) - def get(self, name: Optional[Union[list[str], str]] - = None, default: DataType = None) -> DataType: + def get(self, name: Optional[Union[list[str], str]] = None, + default: DataType = None) -> DataType: """ Get attribute value or return default @@ -467,7 +484,7 @@ def get(self, name: Optional[Union[list[str], str]] data = self.data try: for key in name: - data = data[key] + data = data[key] # type: ignore except KeyError: return default return data @@ -487,7 +504,11 @@ def child(self, name: str, data: Optional[TreeDataPath], # Save source file if source is not None: self.children[name].sources.append(source) - self.children[name]._raw_data = copy.deepcopy(data) + if data is None: + self.children[name]._raw_data = {} + else: + assert isinstance(data, dict) + self.children[name]._raw_data = copy.deepcopy(data) def grow(self, path: str) -> None: """ @@ -584,9 +605,12 @@ def find(self, name: str) -> Optional[Tree]: return node return None - def prune(self, whole: bool = False, keys: Optional[list[str]] = None, - names: Optional[list[str]] = None, filters: Optional[list[str]] = None, - conditions: Optional[list[str]] = None, sources: Optional[list[str]] = None): + def prune(self, whole: bool = False, + keys: Optional[list[str]] = None, + names: Optional[list[str]] = None, + filters: Optional[list[str]] = None, + conditions: Optional[list[str]] = None, + sources: Optional[list[str]] = None) -> Iterator[Tree]: """ Filter tree nodes based on given criteria """ keys = keys or [] names = names or [] @@ -594,8 +618,9 @@ def prune(self, whole: bool = False, keys: Optional[list[str]] = None, conditions = conditions or [] # Expand paths to absolute + sources_set = set() if sources: - sources = {os.path.abspath(src) for src in sources} + sources_set = {os.path.abspath(src) for src in sources} for node in self.climb(whole): # Select only nodes with key content @@ -606,7 +631,7 @@ def prune(self, whole: bool = False, keys: Optional[list[str]] = None, [re.search(name, node.name) for name in names]): continue # Select nodes defined by any of the source files - if sources and not sources.intersection(node.sources): + if sources_set and not sources_set.intersection(node.sources): continue # Apply filters and conditions if given try: @@ -626,7 +651,7 @@ def show( self, brief: bool = False, formatting: Optional[str] = None, - values: Optional[list] = None) -> str: + values: Optional[list[str]] = None) -> str: """ Show metadata """ values = values or [] @@ -638,8 +663,8 @@ def show( root = self.root # noqa: F841 sources = self.sources # noqa: F841 evaluated = [] - for value in values: - evaluated.append(eval(value)) + for str_v in values: + evaluated.append(eval(str_v)) return formatting.format(*evaluated) # Show the name @@ -647,15 +672,14 @@ def show( if brief or not self.data: return output + "\n" # List available attributes - for key, value in sorted(self.data.items()): + for key, val in sorted(self.data.items()): output += "\n{0}: ".format(utils.color(key, 'green')) - if isinstance(value, type("")): - output += value.rstrip("\n") - elif isinstance(value, list) and all( - [isinstance(item, type("")) for item in value]): - output += utils.listed(value) + if isinstance(val, str): + output += val.rstrip("\n") + elif isinstance(val, list) and all(isinstance(item, str) for item in val): + output += utils.listed(val) # type: ignore else: - output += pretty(value) + output += pretty(val) return output + "\n" @staticmethod @@ -678,20 +702,21 @@ def node(reference: TreeData) -> Tree: # Fetch remote git repository if 'url' in reference: tree = utils.fetch_tree( - reference.get('url'), - reference.get('ref'), - reference.get('path', '.').lstrip('/')) + str(reference.get('url')), + reference.get('ref'), # type: ignore + str(reference.get('path', '.')).lstrip('/')) # Use local files else: - root = reference.get('path', '.') + root = str(reference.get('path', '.')) if not root.startswith('/') and root != '.': raise utils.ReferenceError( 'Relative path "%s" specified.' % root) tree = Tree(root) - found_node = tree.find(reference.get('name', '/')) + found_node = tree.find(str(reference.get('name', '/'))) if found_node is None: raise utils.ReferenceError( "No tree node found for '{0}' reference".format(reference)) + assert isinstance(found_node, Tree) return found_node def copy(self) -> Tree: @@ -709,8 +734,10 @@ def copy(self) -> Tree: self.parent = duplicate.parent = original_parent return duplicate - def validate(self, schema: JsonSchema, - schema_store: Optional[dict] = None) -> utils.JsonSchemaValidationResult: + def validate(self, + schema: JsonSchema, + schema_store: Optional[dict[str, + Any]] = None) -> utils.JsonSchemaValidationResult: """ Validate node data with given JSON Schema and schema references. @@ -768,7 +795,7 @@ def _locate_raw_data(self) -> tuple[TreeData, TreeData, str]: """ # List of node names in the virtual hierarchy - hierarchy = list() + hierarchy: list[str] = [] # Find the closest parent with raw data defined node = self @@ -790,11 +817,12 @@ def _locate_raw_data(self) -> tuple[TreeData, TreeData, str]: for key in hierarchy: # Create a virtual hierarchy level if missing if key not in node_data: - node_data[key] = dict() + node_data[key] = {} # Initialize as an empty dict if leaf node is empty if node_data[key] is None: - node_data[key] = dict() - node_data = node_data[key] + node_data[key] = {} + assert isinstance(node_data, dict) + node_data = node_data[key] # type: ignore # The full raw data were read from the last source return node_data, full_data, node.sources[-1] @@ -851,7 +879,7 @@ def __iter__(self) -> Iterator[str]: for d in self.data: yield d - def __contains__(self, item: str): + def __contains__(self, item: str) -> bool: if item.startswith("/"): return item[1:] in self.children else: diff --git a/fmf/cli.py b/fmf/cli.py index 7fd1804d..b6611739 100644 --- a/fmf/cli.py +++ b/fmf/cli.py @@ -21,6 +21,7 @@ import os.path import shlex import sys +from typing import Optional import fmf import fmf.utils as utils @@ -32,8 +33,9 @@ class Parser: """ Command line options parser """ + arguments: list[str] - def __init__(self, arguments=None, path=None): + def __init__(self, arguments: Optional[list[str]] = None, path: Optional[str] = None): """ Prepare the parser. """ # Change current working directory (used for testing) if path is not None: @@ -69,7 +71,7 @@ def __init__(self, arguments=None, path=None): self.output = "" getattr(self, "command_" + self.command)() - def options_select(self): + def options_select(self) -> None: """ Select by name, filter """ group = self.parser.add_argument_group("Select") group.add_argument( @@ -92,7 +94,7 @@ def options_select(self): "--whole", dest="whole", action="store_true", help="Consider the whole tree (leaves only by default)") - def options_formatting(self): + def options_formatting(self) -> None: """ Formating options """ group = self.parser.add_argument_group("Format") group.add_argument( @@ -102,7 +104,7 @@ def options_formatting(self): "--value", dest="values", action="append", default=[], help="Values for the custom formatting string") - def options_utils(self): + def options_utils(self) -> None: """ Utilities """ group = self.parser.add_argument_group("Utils") group.add_argument( @@ -115,7 +117,7 @@ def options_utils(self): "--debug", action="store_true", help="Turn on debugging output, do not catch exceptions") - def command_ls(self): + def command_ls(self) -> None: """ List names """ self.parser = argparse.ArgumentParser( description="List names of available objects") @@ -124,13 +126,13 @@ def command_ls(self): self.options = self.parser.parse_args(self.arguments[2:]) self.show(brief=True) - def command_clean(self): + def command_clean(self) -> None: """ Clean cache """ self.parser = argparse.ArgumentParser( description="Remove cache directory and its content") self.clean() - def command_show(self): + def command_show(self) -> None: """ Show metadata """ self.parser = argparse.ArgumentParser( description="Show metadata of available objects") @@ -140,7 +142,7 @@ def command_show(self): self.options = self.parser.parse_args(self.arguments[2:]) self.show(brief=False) - def command_init(self): + def command_init(self) -> None: """ Initialize tree """ self.parser = argparse.ArgumentParser( description="Initialize a new metadata tree") @@ -151,7 +153,7 @@ def command_init(self): root = fmf.Tree.init(path) print("Metadata tree '{0}' successfully initialized.".format(root)) - def show(self, brief=False): + def show(self, brief: bool = False) -> None: """ Show metadata for each path given """ output = [] for path in self.options.paths or ["."]: @@ -190,7 +192,7 @@ def show(self, brief=False): utils.listed(len(output), "object"))) self.output = joined - def clean(self): + def clean(self) -> None: """ Remove cache directory """ try: cache = utils.get_cache_directory(create=False) @@ -205,7 +207,7 @@ def clean(self): # Main # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def main(arguments=None, path=None): +def main(arguments: Optional[list[str]] = None, path: Optional[str] = None) -> str: """ Parse options, do what is requested """ parser = Parser(arguments, path) return parser.output diff --git a/fmf/context.py b/fmf/context.py index 73b02f0a..c8015c98 100644 --- a/fmf/context.py +++ b/fmf/context.py @@ -22,7 +22,7 @@ import sys from collections.abc import Callable # TODO: py3.10: typing.Optional, typing.Union -> '|' operator -from typing import Optional, Union +from typing import Any, Optional, Union if sys.version_info >= (3, 10): from typing import TypeAlias @@ -30,7 +30,8 @@ from typing_extensions import TypeAlias # TypeHints -ExpressionType: TypeAlias = tuple[Optional[str], Union[str, bool], Optional[list[str]]] +ExpressionType: TypeAlias = tuple[Optional[str], Union[str, bool], Optional[list['ContextValue']]] +ExpressionType_raw: TypeAlias = tuple[Optional[str], Union[str, bool], Optional[list[str]]] class CannotDecide(Exception): @@ -58,19 +59,19 @@ def __init__(self, origin: Union[str, tuple[str, ...]]): else: self._to_compare = self._split_to_version(origin) - def __eq__(self, other: ContextValue): + def __eq__(self, other: object) -> bool: if isinstance(other, self.__class__): return self._to_compare == other._to_compare else: return False - def __ne__(self, other: ContextValue): + def __ne__(self, other: object) -> bool: return not self.__eq__(other) - def __str__(self): + def __str__(self) -> str: return str(self._to_compare) - def __repr__(self): + def __repr__(self) -> str: return "{}({})".format(self.__class__.__name__, repr(self._to_compare)) def version_cmp( @@ -156,7 +157,7 @@ def version_cmp( return -1 # other is larger (more pars) @staticmethod - def compare(first: str, second: str): + def compare(first: str, second: str) -> int: """ compare two version parts """ # Ideally use `from packaging import version` but we need older # python support too so very rough @@ -164,13 +165,12 @@ def compare(first: str, second: str): # convert to int first_version = int(first) second_version = int(second) + return ( + (first_version > second_version) - + (first_version < second_version)) except ValueError: # fallback to compare as strings - first_version = first - second_version = second - return ( - (first_version > second_version) - - (first_version < second_version)) + return (first > second) - (first < second) @staticmethod def _split_to_version(text: str) -> tuple[str, ...]: @@ -203,15 +203,15 @@ class Context: # Operators' definitions - def _op_defined(self, dimension_name: str, values: list[ContextValue]): + def _op_defined(self, dimension_name: str, values: Any) -> bool: """ 'is defined' operator """ return dimension_name in self._dimensions - def _op_not_defined(self, dimension_name: str, values: list[ContextValue]): + def _op_not_defined(self, dimension_name: str, values: Any) -> bool: """ 'is not defined' operator """ return dimension_name not in self._dimensions - def _op_eq(self, dimension_name: str, values: list[ContextValue]): + def _op_eq(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '=' operator """ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: @@ -219,7 +219,7 @@ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return self._op_core(dimension_name, values, comparator) - def _op_not_eq(self, dimension_name: str, values: list[ContextValue]): + def _op_not_eq(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '!=' operator """ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: @@ -227,7 +227,7 @@ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return self._op_core(dimension_name, values, comparator) - def _op_minor_eq(self, dimension_name: str, values: list[ContextValue]): + def _op_minor_eq(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '~=' operator """ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: @@ -236,7 +236,7 @@ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return self._op_core(dimension_name, values, comparator) - def _op_minor_not_eq(self, dimension_name: str, values: list[ContextValue]): + def _op_minor_not_eq(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '~!=' operator """ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: @@ -245,7 +245,7 @@ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return self._op_core(dimension_name, values, comparator) - def _op_minor_less_or_eq(self, dimension_name: str, values: list[ContextValue]): + def _op_minor_less_or_eq(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '~<=' operator """ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: @@ -254,7 +254,7 @@ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return self._op_core(dimension_name, values, comparator) - def _op_minor_less(self, dimension_name: str, values: list[ContextValue]): + def _op_minor_less(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '~<' operator """ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: @@ -263,7 +263,7 @@ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return self._op_core(dimension_name, values, comparator) - def _op_less(self, dimension_name: str, values: list[ContextValue]): + def _op_less(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '<' operator """ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: @@ -271,7 +271,7 @@ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return self._op_core(dimension_name, values, comparator) - def _op_less_or_equal(self, dimension_name: str, values: list[ContextValue]): + def _op_less_or_equal(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '<=' operator """ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: @@ -279,7 +279,7 @@ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return self._op_core(dimension_name, values, comparator) - def _op_greater_or_equal(self, dimension_name: str, values: list[ContextValue]): + def _op_greater_or_equal(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '>=' operator """ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: @@ -287,7 +287,7 @@ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return self._op_core(dimension_name, values, comparator) - def _op_minor_greater_or_equal(self, dimension_name: str, values: list[ContextValue]): + def _op_minor_greater_or_equal(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '~>=' operator """ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: @@ -296,7 +296,7 @@ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return self._op_core(dimension_name, values, comparator) - def _op_greater(self, dimension_name: str, values: list[ContextValue]): + def _op_greater(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '>' operator """ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: @@ -304,7 +304,7 @@ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return self._op_core(dimension_name, values, comparator) - def _op_minor_greater(self, dimension_name: str, values: list[ContextValue]): + def _op_minor_greater(self, dimension_name: str, values: list[ContextValue]) -> bool: """ '~>' operator """ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: @@ -314,7 +314,7 @@ def comparator(dimension_value: ContextValue, it_val: ContextValue) -> bool: return self._op_core(dimension_name, values, comparator) def _op_core(self, dimension_name: str, values: list[ContextValue], - comparator: Callable[[ContextValue, ContextValue], bool]): + comparator: Callable[[ContextValue, ContextValue], bool]) -> bool: """ Evaluate value from dimension vs target values combination @@ -381,7 +381,7 @@ def _op_core(self, dimension_name: str, values: list[ContextValue], # To split by 'or' operator re_or_split = re.compile(r'\bor\b') - _dimensions: dict[str] + _dimensions: dict[str, set[ContextValue]] def __init__(self, *args, **kwargs): """ @@ -404,10 +404,13 @@ def __init__(self, *args, **kwargs): for dim, op, values in definition[0]: if op != "==": raise InvalidContext() + assert dim is not None + assert values is not None self._dimensions[dim] = set(values) # Initialized with dimension=value(s) for dimension_name, values in kwargs.items(): if not isinstance(values, list): + assert values is not None values = [values] self._dimensions[dimension_name] = set( [self.parse_value(val) for val in values] @@ -456,7 +459,7 @@ def parse_rule(rule: Union[str, bool]) -> list[list[ExpressionType]]: return parsed_rule @staticmethod - def parse_value(value: str) -> ContextValue: + def parse_value(value: Any) -> ContextValue: """ Single place to convert to ContextValue """ return ContextValue(str(value)) @@ -485,7 +488,7 @@ def split_rule_to_groups(rule: str) -> list[list[str]]: return rule_parts @staticmethod - def split_expression(expression: str) -> ExpressionType: + def split_expression(expression: str) -> ExpressionType_raw: """ Split expression to dimension name, operator and values @@ -501,9 +504,9 @@ def split_expression(expression: str) -> ExpressionType: if match: # convert to bool and return expression tuple if match.group(1)[0].lower() == 't': - return (None, True, None) + return None, True, None else: - return (None, False, None) + return None, False, None # Triple expressions match = Context.re_expression_triple.match(expression) if match: @@ -588,4 +591,5 @@ def evaluate(self, expression: ExpressionType) -> bool: dimension_name, operator, values = expression if isinstance(operator, bool): return operator + assert dimension_name is not None return self.operator_map[operator](self, dimension_name, values) diff --git a/fmf/utils.py b/fmf/utils.py index 204794e8..e68b3e47 100644 --- a/fmf/utils.py +++ b/fmf/utils.py @@ -11,9 +11,11 @@ import sys import time import warnings +from collections.abc import Callable from io import StringIO +from logging import Logger as _Logger # TODO: py3.10: typing.Optional, typing.Union -> '|' operator -from typing import Any, NamedTuple, Optional +from typing import Any, NamedTuple, Optional, Union from filelock import FileLock, Timeout from ruamel.yaml import YAML, scalarstring @@ -103,11 +105,23 @@ class JsonSchemaError(GeneralError): """ Invalid JSON Schema """ +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Type hints +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class Logger(_Logger): + DATA: int + CACHE: int + ALL: int + cache: Callable[[str], None] + data: Callable[[str], None] + all: Callable[[str], None] + + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Utils # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def pluralize(singular=None): +def pluralize(singular: str) -> str: """ Naively pluralize words """ if singular.endswith("y") and not singular.endswith("ay"): plural = singular[:-1] + "ies" @@ -118,7 +132,11 @@ def pluralize(singular=None): return plural -def listed(items, singular=None, plural=None, max=None, quote="", join="and"): +def listed(items: Union[int, list[Union[int, str]]], + singular: Optional[str] = None, + plural: Optional[str] = None, + max: Optional[int] = None, + quote: str = "", join: str = "and") -> str: """ Convert an iterable into a nice, human readable list or description:: @@ -137,7 +155,7 @@ def listed(items, singular=None, plural=None, max=None, quote="", join="and"): """ # Convert items to list if necessary - items = range(items) if isinstance(items, int) else list(items) + items = list(range(items)) if isinstance(items, int) else list(items) more = " more" # Description mode expected when singular provided but no maximum set if singular is not None and max is None: @@ -147,29 +165,30 @@ def listed(items, singular=None, plural=None, max=None, quote="", join="and"): if singular is not None and plural is None: plural = pluralize(singular) # Convert to strings and optionally quote each item - items = ["{0}{1}{0}".format(quote, item) for item in items] + items_str = ["{0}{1}{0}".format(quote, item) for item in items] # Select the maximum of items and describe the rest if max provided if max is not None: # Special case when the list is empty (0 items) - if max == 0 and len(items) == 0: - return "0 {0}".format(plural) + if max == 0 and len(items_str) == 0: + return f"0 {plural}" # Cut the list if maximum exceeded - if len(items) > max: - rest = len(items[max:]) - items = items[:max] + if len(items_str) > max: + rest = len(items_str[max:]) + items_str = items_str[:max] if singular is not None: more += " {0}".format(singular if rest == 1 else plural) - items.append("{0}{1}".format(rest, more)) + items_str.append("{0}{1}".format(rest, more)) # For two and more items use 'and' instead of the last comma - if len(items) < 2: - return "".join(items) + if len(items_str) < 2: + return "".join(items_str) else: - return ", ".join(items[0:-2] + [' {} '.format(join).join(items[-2:])]) + return ", ".join(items_str[0:-2] + [f" {join} ".join(items_str[-2:])]) -def split(values, separator=re.compile("[ ,]+")): +def split(values: Union[str, list[str]], separator: re.Pattern[str] + = re.compile("[ ,]+")) -> list[str]: """ Convert space-or-comma-separated values into a single list @@ -189,7 +208,7 @@ def split(values, separator=re.compile("[ ,]+")): return sum([separator.split(value) for value in values], []) -def info(message, newline=True): +def info(message: str, newline: bool = True) -> None: """ Log provided info message to the standard error output """ sys.stderr.write(message + ("\n" if newline else "")) @@ -244,14 +263,14 @@ def filter(filter: str, data: fmf.base.TreeData, True, regular expressions can be used in the filter values as well. """ - def match_value(pattern, text): + def match_value(pattern: str, text: str) -> bool: """ Match value against data (simple or regexp) """ if regexp: - return re.match("^{0}$".format(pattern), text) + return bool(re.match("^{0}$".format(pattern), text)) else: return pattern == text - def check_value(dimension, value): + def check_value(dimension: str, value: str) -> bool: """ Check whether the value matches data """ # E.g. value = 'A, B' or value = "C" or value = "-D" # If there are multiple values, at least one must match @@ -260,7 +279,10 @@ def check_value(dimension, value): if atom.startswith("-"): atom = atom[1:] # Check each value for given dimension - for dato in data[dimension]: + dim_data = data_copy[dimension] + assert isinstance(dim_data, list) + for dato in dim_data: + assert isinstance(dato, str) if match_value(atom, dato): break # Pattern not found ---> good @@ -269,27 +291,30 @@ def check_value(dimension, value): # Handle positive values (return True upon first successful match) else: # Check each value for given dimension - for dato in data[dimension]: + dim_data = data_copy[dimension] + assert isinstance(dim_data, list) + for dato in dim_data: + assert isinstance(dato, str) if match_value(atom, dato): # Pattern found ---> good return True # No value matched the data return False - def check_dimension(dimension, values): + def check_dimension(dimension: str, values: list[str]) -> bool: """ Check whether all values for given dimension match data """ # E.g. dimension = 'tag', values = ['A, B', 'C', '-D'] # Raise exception upon unknown dimension - if dimension not in data: + if dimension not in data_copy: raise FilterError("Invalid filter '{0}'".format(dimension)) # Every value must match at least one value for data - return all([check_value(dimension, value) for value in values]) + return all(check_value(dimension, value) for value in values) - def check_clause(clause): + def check_clause(clause: str) -> bool: """ Split into literals and check whether all match """ # E.g. clause = 'tag: A, B & tag: C & tag: -D' # Split into individual literals by dimension - literals = dict() + literals: dict[str, list[str]] = {} for literal in re.split(r"\s*&\s*", clause): # E.g. literal = 'tag: A, B' # Make sure the literal matches dimension:value format @@ -311,19 +336,21 @@ def check_clause(clause): raise FilterError("Invalid data type '{0}'".format(type(data))) # Make sure that data dictionary contains lists of strings - data = copy.deepcopy(data) + data_copy = copy.deepcopy(data) for key in data: - if isinstance(data[key], list): - data[key] = [str(item) for item in data[key]] + data_val = data_copy[key] + if isinstance(data_val, list): + data_copy[key] = [str(item) for item in data_val] else: - data[key] = [str(data[key])] + data_copy[key] = [str(data_val)] # Turn all data into lowercase if sensitivity is off if not sensitive: filter = filter.lower() - lowered = dict() - for key, values in data.items(): - lowered[key.lower()] = [value.lower() for value in values] - data = lowered + lowered: fmf.base.TreeData = {} + for key, values in data_copy.items(): + assert isinstance(values, list) and all(isinstance(value, str) for value in values) + lowered[key.lower()] = [value.lower() for value in values] # type: ignore + data_copy = lowered # At least one clause must be true return any([check_clause(clause) @@ -363,9 +390,9 @@ class Logging: _level = LOG_WARN # Already initialized loggers by their name - _loggers = dict() + _loggers: dict[str, Logger] = {} - def __init__(self, name='fmf'): + def __init__(self, name: str = 'fmf'): # Use existing logger if already initialized try: self.logger = Logging._loggers[name] @@ -401,7 +428,7 @@ def format(self, record): return u"{0} {1}".format(level, record.getMessage()) @staticmethod - def _create_logger(name='fmf', level=None): + def _create_logger(name: str = 'fmf', level: Optional[str] = None) -> Logger: """ Create fmf logger """ # Create logger, handler and formatter logger = logging.getLogger(name) @@ -409,18 +436,18 @@ def _create_logger(name='fmf', level=None): handler.setFormatter(Logging.ColoredFormatter()) logger.addHandler(handler) # Save log levels in the logger itself (backward compatibility) - for level in Logging.LEVELS: - setattr(logger, level, getattr(logging, level)) + for lev in Logging.LEVELS: + setattr(logger, lev, getattr(logging, lev)) # Additional logging constants and methods for cache and xmlrpc - logger.DATA = LOG_DATA - logger.CACHE = LOG_CACHE - logger.ALL = LOG_ALL - logger.cache = lambda message: logger.log(LOG_CACHE, message) # NOQA - logger.data = lambda message: logger.log(LOG_DATA, message) # NOQA - logger.all = lambda message: logger.log(LOG_ALL, message) # NOQA - return logger - - def set(self, level=None): + logger.DATA = LOG_DATA # type: ignore + logger.CACHE = LOG_CACHE # type: ignore + logger.ALL = LOG_ALL # type: ignore + logger.cache = lambda message: logger.log(LOG_CACHE, message) # type: ignore # NOQA + logger.data = lambda message: logger.log(LOG_DATA, message) # type: ignore # NOQA + logger.all = lambda message: logger.log(LOG_ALL, message) # type: ignore # NOQA + return logger # type: ignore + + def set(self, level: Optional[int] = None) -> None: """ Set the default log level @@ -445,7 +472,7 @@ def set(self, level=None): Logging._level = logging.WARN self.logger.setLevel(Logging._level) - def get(self): + def get(self) -> int: """ Get the current log level """ return self.logger.level @@ -454,7 +481,9 @@ def get(self): # Coloring # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def color(text, color=None, background=None, light=False, enabled="auto"): +def color(text: str, color: Optional[str] = None, + background: Optional[str] = None, + light: bool = False, enabled: Union[str, bool] = "auto") -> str: """ Return text in desired color if coloring enabled @@ -474,9 +503,8 @@ def color(text, color=None, background=None, light=False, enabled="auto"): color = color[5:] color = color and ";{0}".format(colors[color]) or "" background = background and ";{0}".format(colors[background] + 10) or "" - light = light and 1 or 0 # Starting and finishing sequence - start = "\033[{0}{1}{2}m".format(light, color, background) + start = "\033[{0}{1}{2}m".format(int(light), color, background) finish = "\033[1;m" return "".join([start, text, finish]) @@ -485,7 +513,7 @@ class Coloring: """ Coloring configuration """ # Default color mode is auto-detected from the terminal presence - _mode = None + _mode: Optional[int] = None MODES = ["COLOR_OFF", "COLOR_ON", "COLOR_AUTO"] # We need only a single config instance _instance = None @@ -496,7 +524,7 @@ def __new__(cls, *args, **kwargs): cls._instance = super(Coloring, cls).__new__(cls, *args, **kwargs) return cls._instance - def __init__(self, mode=None): + def __init__(self, mode: Optional[int] = None): """ Initialize the coloring mode """ # Nothing to do if already initialized if self._mode is not None: @@ -504,7 +532,7 @@ def __init__(self, mode=None): # Set the mode self.set(mode) - def set(self, mode=None): + def set(self, mode: Optional[int] = None) -> None: """ Set the coloring mode @@ -537,11 +565,12 @@ def set(self, mode=None): "enabled" if self.enabled() else "disabled", self.MODES[self._mode])) - def get(self): + def get(self) -> int: """ Get the current color mode """ + assert self._mode is not None return self._mode - def enabled(self): + def enabled(self) -> bool: """ True if coloring is currently enabled """ # In auto-detection mode color enabled when terminal attached if self._mode == COLOR_AUTO: @@ -553,7 +582,7 @@ def enabled(self): # Cache directory # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def get_cache_directory(create=True): +def get_cache_directory(create: bool = True) -> str: """ Return cache directory, created by this call if necessary @@ -593,7 +622,7 @@ def set_cache_expiration(seconds): CACHE_EXPIRATION = int(seconds) -def clean_cache_directory(): +def clean_cache_directory() -> None: """ Delete used cache directory if it exists """ cache = get_cache_directory(create=False) if os.path.isdir(cache): @@ -633,7 +662,7 @@ def invalidate_cache(): # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def fetch_tree(url, ref=None, path='.'): +def fetch_tree(url: str, ref: Optional[str] = None, path: str = '.') -> fmf.base.Tree: """ Get initialized Tree from a remote git repository @@ -686,7 +715,7 @@ def fetch(url, ref=None, destination=None, env=None): # Fetch Remote Repository # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def default_branch(repository, remote="origin"): +def default_branch(repository: str, remote: str = "origin") -> str: """ Detect default branch from given local git repository """ head = os.path.join(repository, f".git/refs/remotes/{remote}/HEAD") # Make sure the HEAD reference is available @@ -697,7 +726,10 @@ def default_branch(repository, remote="origin"): return ref.read().strip().split('/')[-1] -def fetch_repo(url, ref=None, destination=None, env=None): +def fetch_repo(url: str, + ref: Optional[str] = None, + destination: Optional[str] = None, + env: Optional[dict[str, str]] = None) -> str: """ Fetch remote git repository and return local directory @@ -788,7 +820,9 @@ def fetch_repo(url, ref=None, destination=None, env=None): # Run command # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def run(command, cwd=None, check_exit_code=True, env=None): +def run(command: Union[str, list[str]], cwd: Optional[str] = None, + check_exit_code: bool = True, + env: Optional[dict[str, str]] = None) -> tuple[str, str]: """ Run command and return a (stdout, stderr) tuple @@ -825,7 +859,9 @@ def run(command, cwd=None, check_exit_code=True, env=None): # Convert dict to yaml # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -def dict_to_yaml(data, width=None, sort=False): +def dict_to_yaml(data: fmf.base.TreeData, + width: Optional[int] = None, + sort: bool = False) -> str: """ Convert dictionary into yaml """ output = StringIO() From c1eebc1739000a7cf7915df115b845f0b3e039d1 Mon Sep 17 00:00:00 2001 From: Cristian Le Date: Fri, 11 Aug 2023 18:16:40 +0200 Subject: [PATCH 5/6] Add typing-extensions to spec file and drop epel8 Signed-off-by: Cristian Le --- .packit.yaml | 3 --- fmf.spec | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/.packit.yaml b/.packit.yaml index 00ab97ec..934640a9 100644 --- a/.packit.yaml +++ b/.packit.yaml @@ -20,14 +20,12 @@ jobs: trigger: pull_request targets: - fedora-all - - epel-8 - epel-9 - job: tests trigger: pull_request targets: - fedora-all - - epel-8 - epel-9 - job: copr_build @@ -35,7 +33,6 @@ jobs: branch: main targets: - fedora-all - - epel-8 - epel-9 list_on_homepage: True preserve_project: True diff --git a/fmf.spec b/fmf.spec index 8bfdcaf6..91527302 100644 --- a/fmf.spec +++ b/fmf.spec @@ -31,6 +31,7 @@ BuildRequires: python%{python3_pkgversion}-pytest BuildRequires: python%{python3_pkgversion}-ruamel-yaml BuildRequires: python%{python3_pkgversion}-filelock BuildRequires: python%{python3_pkgversion}-jsonschema +BuildRequires: python%{python3_pkgversion}-typing-extensions BuildRequires: git-core %{?python_provide:%python_provide python%{python3_pkgversion}-%{name}} Requires: git-core From 7cf428c1c6b4e3bec69b09051e45a911ece0b0e6 Mon Sep 17 00:00:00 2001 From: Cristian Le Date: Fri, 11 Aug 2023 19:38:08 +0200 Subject: [PATCH 6/6] Add py.typed Signed-off-by: Cristian Le --- fmf/py.typed | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 fmf/py.typed diff --git a/fmf/py.typed b/fmf/py.typed new file mode 100644 index 00000000..e69de29b