Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor rule tree #416

Merged
merged 14 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 73 additions & 53 deletions logprep/filter/expression/filter_expression.py
ekneg54 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import re
from abc import ABC, abstractmethod
from itertools import chain, zip_longest
from typing import List, Any
from typing import List, Any, Tuple


class FilterExpressionError(BaseException):
Expand All @@ -17,6 +17,25 @@ class KeyDoesNotExistError(FilterExpressionError):
class FilterExpression(ABC):
"""Base class for all filter expression used for matching rules."""

__slots__ = ["children"]

children: Tuple["FilterExpression"]

def __init__(self, *children: "FilterExpression"):
"""Initializes children for filter expression.

Filter expression can contain multiple child filter expression,
i.e. a 'Not' expression could contain a child that gets negated,
or an 'And' expression could contain multiple children that must all match.

Parameters
----------
children : FilterExpression
Child expression of this expression.

"""
self.children = children

def matches(self, document: dict) -> bool:
"""Receives a document and returns True if it is matched by the expression.

Expand Down Expand Up @@ -62,10 +81,9 @@ def does_match(self, document: dict) -> bool:

"""

# Return the value for the given key from
# the document.
@staticmethod
def _get_value(key: List[str], document: dict) -> Any:
"""Return the value for the given key from the document."""
if not key:
raise KeyDoesNotExistError

Expand All @@ -83,28 +101,12 @@ def __eq__(self, other):
return False
return True

@staticmethod
def as_dotted_string(key_list: List[str]) -> str:
"""Converts list of keys to dotted string.

Parameters
----------
key_list : List[str]
List of keys.

Returns
-------
str
Returns dotted string.

"""
return ".".join([str(i) for i in key_list])


class Always(FilterExpression):
"""Filter expression that can be set to match always or never."""

def __init__(self, value: Any):
super().__init__()
self._value = value

def __repr__(self):
Expand All @@ -120,21 +122,18 @@ class Not(FilterExpression):
"""Filter expression that negates a match."""

def __init__(self, expression: FilterExpression):
self.expression = expression
super().__init__(expression)

def __repr__(self) -> str:
return f"NOT ({str(self.expression)})"
return f"NOT ({repr(self.children[0])})"

def does_match(self, document: dict) -> bool:
return not self.expression.matches(document)
return not self.children[0].matches(document)


class CompoundFilterExpression(FilterExpression):
"""Base class of filter expressions that combine other filter expressions."""

def __init__(self, *args: FilterExpression):
self.expressions = args

def does_match(self, document: dict):
raise NotImplementedError

Expand All @@ -143,31 +142,58 @@ class And(CompoundFilterExpression):
"""Compound filter expression that is a logical conjunction."""

def __repr__(self) -> str:
return f'({" AND ".join([str(exp) for exp in self.expressions])})'
return f'({" AND ".join([str(exp) for exp in self.children])})'

def does_match(self, document: dict) -> bool:
return all((expression.matches(document) for expression in self.expressions))
return all((expression.matches(document) for expression in self.children))


class Or(CompoundFilterExpression):
"""Compound filter expression that is a logical disjunction."""

def __repr__(self) -> str:
return f'({" OR ".join([str(exp) for exp in self.expressions])})'
return f'({" OR ".join([str(exp) for exp in self.children])})'

def does_match(self, document: dict) -> bool:
return any((expression.matches(document) for expression in self.expressions))
return any((expression.matches(document) for expression in self.children))


class KeyValueBasedFilterExpression(FilterExpression):
class KeyBasedFilterExpression(FilterExpression):
"""Base class of filter expressions that match a certain value on a given key."""

def __init__(self, key: List[str], expected_value: Any):
def __init__(self, key: List[str]):
super().__init__()
self.key = key
self._key_as_dotted_string = ".".join([str(i) for i in self.key])

def __repr__(self) -> str:
return f"{self.key_as_dotted_string}"

def does_match(self, document):
raise NotImplementedError

@property
def key_as_dotted_string(self) -> str:
"""Converts key of expression to dotted string.

Returns
-------
str
Returns dotted string.

"""
return self._key_as_dotted_string


class KeyValueBasedFilterExpression(KeyBasedFilterExpression):
"""Base class of filter expressions that match a certain value on a given key."""

def __init__(self, key: List[str], expected_value: Any):
super().__init__(key)
self._expected_value = expected_value

def __repr__(self) -> str:
return f"{self.as_dotted_string(self.key)}:{str(self._expected_value)}"
return f"{self.key_as_dotted_string}:{str(self._expected_value)}"

def does_match(self, document):
raise NotImplementedError
Expand All @@ -184,7 +210,7 @@ def does_match(self, document: dict) -> bool:
return str(value) == self._expected_value

def __repr__(self) -> str:
return f'{self.as_dotted_string(self.key)}:"{str(self._expected_value)}"'
return f'{self.key_as_dotted_string}:"{str(self._expected_value)}"'


class WildcardStringFilterExpression(KeyValueBasedFilterExpression):
Expand Down Expand Up @@ -236,7 +262,7 @@ def _replace_wildcard(expected, matches, symbol, wildcard):
return "".join([x for x in chain.from_iterable(zip_longest(split, matches)) if x])

def __repr__(self) -> str:
return f'{self.as_dotted_string(self.key)}:"{self._expected_value}"'
return f'{self.key_as_dotted_string}:"{self._expected_value}"'


class SigmaFilterExpression(WildcardStringFilterExpression):
Expand All @@ -263,16 +289,16 @@ def does_match(self, document: dict) -> bool:
return value == self._expected_value


class RangeBasedFilterExpression(FilterExpression):
class RangeBasedFilterExpression(KeyBasedFilterExpression):
"""Base class of filter expressions that match for a range of values."""

def __init__(self, key: List[str], lower_bound: float, upper_bound: float):
self.key = key
super().__init__(key)
self._lower_bound = lower_bound
self._upper_bound = upper_bound

def __repr__(self) -> str:
return f"{self.as_dotted_string(self.key)}:[{self._lower_bound} TO {self._upper_bound}]"
return f"{self.key_as_dotted_string}:[{self._lower_bound} TO {self._upper_bound}]"

def does_match(self, document: dict):
raise NotImplementedError
Expand All @@ -296,19 +322,19 @@ def does_match(self, document: dict) -> bool:
return self._lower_bound <= value <= self._upper_bound


class RegExFilterExpression(FilterExpression):
class RegExFilterExpression(KeyBasedFilterExpression):
"""Filter expression that matches a value using regex."""

match_escaping_pattern = re.compile(r".*?(?P<escaping>\\*)\$$")
match_parts_pattern = re.compile(r"^(?P<flag>\(\?\w\))?(?P<start>\^)?(?P<pattern>.*)")

def __init__(self, key: List[str], regex: str):
self.key = key
super().__init__(key)
self._regex = self._normalize_regex(regex)
self._matcher = re.compile(self._regex)

def __repr__(self) -> str:
return f"{self.as_dotted_string(self.key)}:/{self._regex.strip('^$')}/"
return f"{self.key_as_dotted_string}:/{self._regex.strip('^$')}/"

@staticmethod
def _normalize_regex(regex: str) -> str:
Expand All @@ -331,22 +357,19 @@ def does_match(self, document: dict) -> bool:
return self._matcher.match(str(value)) is not None


class Exists(FilterExpression):
class Exists(KeyBasedFilterExpression):
"""Filter expression that returns true if a given field exists."""

def __init__(self, value: list):
self.split_field = value

def __repr__(self) -> str:
return f"{self.as_dotted_string(self.split_field)}: *"
return f"{self.key_as_dotted_string}: *"

def does_match(self, document: dict) -> bool:
if not self.split_field:
if not self.key:
return False

try:
current = document
for sub_field in self.split_field:
for sub_field in self.key:
if (
sub_field not in current.keys()
): # .keys() is important as it is used to "check" for dict
Expand All @@ -361,14 +384,11 @@ def does_match(self, document: dict) -> bool:
return True


class Null(FilterExpression):
class Null(KeyBasedFilterExpression):
"""Filter expression that returns true if a given field is set to null."""

def __init__(self, key: List[str]):
self.key = key

def __repr__(self) -> str:
return f"{self.as_dotted_string(self.key)}:{None}"
return f"{self.key_as_dotted_string}:{None}"

def does_match(self, document: dict) -> bool:
value = self._get_value(self.key, document)
Expand Down
73 changes: 73 additions & 0 deletions logprep/framework/rule_tree/demorgan_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Module implements functionality to apply De Morgan's law on rule filter expressions"""
from logprep.filter.expression.filter_expression import (
Not,
And,
Or,
FilterExpression,
CompoundFilterExpression,
)


class DeMorganResolverException(Exception):
"""Raise if demorgan resolver encounters a problem."""


class DeMorganResolver:
"""Used to apply De Morgan's law on rule filter expressions"""

def resolve(self, expression: FilterExpression) -> FilterExpression:
"""Parse NOT-expressions in given filter expression.

This function resolves NOT-expressions found in the given filter expression according to
De Morgan's law.

Parameters
----------
expression: FilterExpression
Given filter expression to be parsed.

Returns
-------
result: FilterExpression
Resulting filter expression created by resolving NOT-expressions in the given filter
expression.

"""
if isinstance(expression, Not):
return self._resolve_not_expression(expression)
if isinstance(expression, CompoundFilterExpression):
return self._resolve_compound_expression(expression)

return expression

def _resolve_not_expression(self, not_expression: Not) -> FilterExpression:
if not isinstance(not_expression, Not):
raise DeMorganResolverException(
f'Can\'t resolve expression "{not_expression}", since it\'s not of the type "NOT."'
)

if not isinstance(not_expression.children[0], CompoundFilterExpression):
return not_expression

compound_expression = not_expression.children[0]
negated_children = (Not(expression) for expression in compound_expression.children)

if isinstance(compound_expression, Or):
expression = And(*negated_children)
elif isinstance(compound_expression, And):
expression = Or(*negated_children)
else:
raise DeMorganResolverException(
f'Could not resolve expression "{not_expression}", '
f'since its child is neither of the type "AND" nor "OR".'
)

return self._resolve_compound_expression(expression)

def _resolve_compound_expression(
self, compound_expression: CompoundFilterExpression
) -> CompoundFilterExpression:
compound_expression.children = tuple(
self.resolve(expression) for expression in compound_expression.children
)
return compound_expression
29 changes: 9 additions & 20 deletions logprep/framework/rule_tree/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
class Node:
ekneg54 marked this conversation as resolved.
Show resolved Hide resolved
"""Tree node for rule tree model."""

def __init__(self, expression: FilterExpression):
__slots__ = ("_expression", "_children", "matching_rules")

_expression: FilterExpression
_children: list
ekneg54 marked this conversation as resolved.
Show resolved Hide resolved
matching_rules: list

def __init__(self, expression: Optional[FilterExpression]):
"""Node initialization function.

Initializes a new node with a given expression and empty lists of children and matching
Expand Down Expand Up @@ -63,25 +69,6 @@ def add_child(self, node: "Node"):
"""
self._children.append(node)

def has_child_with_expression(self, expression: FilterExpression) -> Optional["Node"]:
"""Check if node has child with given expression.

This function checks if a node has a child with the given filter expression.
It is used to iterate through a tree in the process of adding a new rule to a tree.

Parameters
----------
expression: FilterExpression
Filter expression to check for.

Returns
-------
has_child: bool
Decision if the node has a child with the given expression.

"""
return self.get_child_with_expression(expression)

def get_child_with_expression(self, expression: FilterExpression) -> Optional["Node"]:
"""Get child of node with given expression.

Expand All @@ -107,8 +94,10 @@ def get_child_with_expression(self, expression: FilterExpression) -> Optional["N

@property
def expression(self) -> FilterExpression:
"""Filter expression of the node."""
return self._expression

@property
def children(self) -> List["Node"]:
"""Children of the node."""
return self._children
Loading
Loading