Skip to content

Commit

Permalink
♻️ Update MelusineRegex tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
HugoPerrier committed Dec 5, 2024
1 parent 18ab6e6 commit cc197e3
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 63 deletions.
3 changes: 3 additions & 0 deletions melusine/conf/pipelines/preprocessing_pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ preprocessing_pipeline:
- class_name: ContentTagger
config_key: content_tagger
module: melusine.processors
- class_name: RefinedTagger
config_key: refined_tagger
module: melusine.processors
- class_name: TransferredEmailProcessor
config_key: transferred_email_processor
module: melusine.processors
Expand Down
2 changes: 2 additions & 0 deletions melusine/conf/processors/refined_tagger.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
content_tagger:
default_tag: BODY
56 changes: 42 additions & 14 deletions melusine/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import re
from datetime import datetime
from typing import Iterable, List, Optional, Tuple
from typing import Any, Dict, Iterable, List, Optional, Tuple

from melusine import config

Expand All @@ -20,6 +20,8 @@ class Message:

DEFAULT_STR_LINE_LENGTH = 120
DEFAULT_STR_TAG_NAME_LENGTH = 22
MAIN_TAG_TYPE = "refined_tag"
FALLBACK_TAG_TYPE = "base_tag"

def __init__(
self,
Expand All @@ -29,7 +31,7 @@ def __init__(
date: Optional[datetime] = None,
text_from: str = "",
text_to: Optional[str] = None,
tags: Optional[List[Tuple[str, str]]] = None,
tags: Optional[List[Dict[str, Any]]] = None,
):
"""
Attributes initialization.
Expand Down Expand Up @@ -84,8 +86,11 @@ def str_line_length(self) -> int:
return config["message"].get("str_line_length", self.DEFAULT_STR_LINE_LENGTH)

def extract_parts(
self, target_tags: Optional[Iterable[str]] = None, stop_at: Optional[Iterable[str]] = None
) -> List[Tuple[str, str]]:
self,
target_tags: Optional[Iterable[str]] = None,
stop_at: Optional[Iterable[str]] = None,
tag_type: str = MAIN_TAG_TYPE,
) -> List[Dict[str, Any]]:
"""
Function to extract target tags from the message.
Expand All @@ -95,17 +100,23 @@ def extract_parts(
Tags to be extracted.
stop_at:
Tags for which extraction should stop.
tag_type:
Type of tags to consider.
Returns
-------
_: List[Tuple[str, str]]
List of extracted tags.
_: List of extracted tags.
"""
if not self.tags:
return []

# List of tags in the message
tag_name_list: List[str] = [x[0] for x in self.tags]
try:
tag_name_list: List[str] = [x[tag_type] for x in self.tags]
# If tag_type is not available, fall back on base_tag
except KeyError:
tag_type = self.FALLBACK_TAG_TYPE
tag_name_list: List[str] = [x[tag_type] for x in self.tags]

if target_tags is None:
target_tags = tag_name_list
Expand All @@ -122,29 +133,34 @@ def extract_parts(
else:
effective_tags = self.tags

return [x for x in effective_tags if x[0] in target_tags]
return [x for x in effective_tags if x[tag_type] in target_tags]

def extract_last_body(
self, target_tags: Iterable[str] = ("BODY",), stop_at: Iterable[str] = ("GREETINGS",)
) -> List[Tuple[str, str]]:
self,
target_tags: Iterable[str] = ("BODY",),
stop_at: Iterable[str] = ("GREETINGS",),
tag_type: str = MAIN_TAG_TYPE
) -> List[Dict[str, Any]]:
"""
Extract the BODY parts of the last message in the email.
Parameters
----------
target_tags: Iterable[str]
stop_at: Iterable[str]
tag_type: Type of tags to consider.
Returns
-------
_: List[Tuple[str, str]]
"""
return self.extract_parts(target_tags=target_tags, stop_at=stop_at)
return self.extract_parts(target_tags=target_tags, stop_at=stop_at, tag_type=tag_type)

def has_tags(
self,
target_tags: Iterable[str] = ("BODY",),
stop_at: Optional[Iterable[str]] = None,
tag_type: str = MAIN_TAG_TYPE,
) -> bool:
"""
Function to check if input tags are present in the message.
Expand All @@ -155,6 +171,8 @@ def has_tags(
Tags of interest.
stop_at:
Tags for which extraction should stop.
tag_type:
Type of tags to consider.
Returns
-------
Expand All @@ -168,7 +186,12 @@ def has_tags(
stop_at = set()

found: bool = False
for tag, _ in self.tags:
for tag_data in self.tags:
try:
tag = tag_data[tag_type]
except KeyError:
tag = tag_data[self.FALLBACK_TAG_TYPE]

# Check if tag in tags of interest
if tag in target_tags:
found = True
Expand All @@ -180,7 +203,7 @@ def has_tags(

return found

def format_tags(self) -> str:
def format_tags(self, tag_type: str = MAIN_TAG_TYPE) -> str:
"""
Create a pretty formatted representation of text and their associated tags.
Expand All @@ -192,7 +215,12 @@ def format_tags(self) -> str:
else:
tag_text_length = self.str_line_length - self.str_tag_name_length
text = ""
for tag_name, tag_text in self.tags:
for tag_data in self.tags:
try:
tag_name = tag_data[tag_type]
except KeyError:
tag_name = tag_data[self.FALLBACK_TAG_TYPE]
tag_text = tag_data["base_text"]
text += tag_text.ljust(tag_text_length, ".") + tag_name.rjust(self.str_tag_name_length, ".") + "\n"

return text.strip()
Expand Down
124 changes: 75 additions & 49 deletions melusine/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,6 @@ def __init__(
default_tag: str = "BODY",
valid_part_regex: str = r"[a-z0-9?]",
default_regex_flag: int = re.IGNORECASE,
post_process: bool = True,
text_attribute: str = "text",
):
"""
Expand Down Expand Up @@ -784,9 +783,6 @@ def __init__(
# Set text attribute
self.text_attribute = text_attribute

# Activate post-processing
self.post_process = post_process

# Pattern to split text into sentences (=parts)
self.split_pattern = self.compile_split_pattern()

Expand Down Expand Up @@ -933,8 +929,6 @@ def compile_tag_regex(self, tag: str) -> re.Pattern:
regex = re.compile(regex, flags=self.default_regex_flag)
except re.error:
raise ValueError(f"Invalid regex for tag {tag}:\n{regex}")
elif isinstance(regex, re.Pattern):
pass
else:
raise ValueError(
f"Tag {tag} does not return any of the supported types : "
Expand Down Expand Up @@ -965,10 +959,6 @@ def tag_text(self, text: str) -> list[tuple[str, str]]:
for part in parts:
tags.append(self.tag_part(part))

# Post process tags
if self.post_process:
tags = self.post_process_tags(tags)

return tags

def split_text(self, text: str) -> list[str]:
Expand Down Expand Up @@ -1045,7 +1035,7 @@ def clean_up_after_split(parts: list[str | None]) -> list[str]:

return clean_parts

def tag_part(self, part: str) -> tuple[str, str]:
def tag_part(self, part: str) -> dict[str, Any]:
"""
Method to apply tagging on a text chunk (sentence/part).
Expand All @@ -1056,20 +1046,39 @@ def tag_part(self, part: str) -> tuple[str, str]:
Returns
-------
match_tag: str
Output tag
part: str
Original text
_: tag data such as text, base_tag_list or base_tag
"""
match_tag = self.default_tag

match_tag_list = []
for tag, regex in self.regex_dict.items():
match = regex.match(part)
if match:
match_tag = tag
break
match_tag_list.append(tag)

if not match_tag_list:
match_tag_list.append(self.default_tag)

return match_tag, part
return {
"base_text": part,
"base_tag_list": match_tag_list,
"base_tag": self.get_base_tag(match_tag_list),
}

def get_base_tag(self, match_tag_list: list[str]) -> str:
"""
Given a list of tags, return the base tag using the hierarchy from the tag_list attribute.
Parameters
----------
match_tag_list: List of tags found in the text.
Returns
-------
_: Base tag
"""
for tag in self.tag_list:
if tag in match_tag_list:
return tag
return self.default_tag

@staticmethod
def word_block(n_words: int, word_character_only: bool = False) -> str:
Expand Down Expand Up @@ -1153,21 +1162,6 @@ def find_matching_regex_patterns(self, part: str, regex: TagPattern) -> list[str

return matching_regex_list

@abstractmethod
def post_process_tags(self, tags: list[tuple[str, str]]) -> list[tuple[str, str]]:
"""
Method to apply tagging rules posterior to the standard regex tagging.
Parameters
----------
tags: list[tuple[str, str]]
Original tags
Returns
-------
_: list[tuple[str, str]]
Post-processed tags
"""


class ContentTagger(BaseContentTagger):
Expand Down Expand Up @@ -1554,39 +1548,70 @@ def SIGNATURE(self) -> str | list[str] | re.Pattern:
r"^[A-Za-z]+(?: [A-Za-z]+)*, le \d{1,2} [A-Za-z]+ \d{4}.{,3}$",
]

def post_process_tags(self, tags: list[tuple[str, str]]) -> list[tuple[str, str]]:

class RefinedTagger(MelusineTransformer):
BASE_TAG_KEY = "base_tag"
REFINED_TAG_KEY = "refined_tag"

def __init__(
self,
input_columns: str = "messages",
output_columns: str = "messages",
default_tag: str = "BODY"
):
"""
Parameters
----------
input_columns: str
Input columns for the transform operation
output_columns: str
Outputs columns for the transform operation
default_tag: str
Default tag to apply to untagged text
"""
self.default_tag = default_tag

super().__init__(
input_columns=input_columns,
output_columns=output_columns,
func=self.post_process_messages,
)

def post_process_messages(self, messages: list[Message]) -> list[Message]:
"""
Method to apply tagging rules posterior to the standard regex tagging.
Parameters
----------
tags: list[tuple[str, str]]
Original tags
messages: list of messages
Returns
-------
_: list[tuple[str, str]]
Post-processed tags
messages: list of messages post-processed
"""
for message in messages:
message.tags = self.post_process_tags(message.tags)

return messages

def post_process_tags(self, tags: list[dict[str, Any]]) -> list[dict[str, Any]]:
# Signature lines containing first/last name
tags = self.detect_name_signature(tags)

return tags

def detect_name_signature(self, tags: list[tuple[str, str]]) -> list[tuple[str, str]]:
def detect_name_signature(self, tags: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Method to detect lines containing First name / Surname
Ex: Mr Joe Dupond
Parameters
----------
tags: list[tuple[str, str]]
Original tags
tags: Original tags
Returns
-------
_: list[tuple[str, str]]
Post processed tags
_: Post processed tags
"""
# First name / Last name Signatures
capitalized_words: str = r"[A-Z][-'A-za-zÀ-ÿ]{,10}"
Expand All @@ -1599,18 +1624,19 @@ def detect_name_signature(self, tags: list[tuple[str, str]]) -> list[tuple[str,
# Forbidden words (lowercase)
forbidden_words: set[str] = {"urgent", "attention"}

new_tags: list[tuple[str, str]] = list()
for tag, text in tags:
for tag_data in tags:
tag = tag_data[self.BASE_TAG_KEY]
if tag == self.default_tag:
text = tag_data[self.BASE_TAG_KEY]
match = re.match(line_with_name, text)
has_forbidden_words: bool = bool(forbidden_words.intersection(text.lower().split()))

if match and not has_forbidden_words:
tag = "SIGNATURE_NAME"

new_tags.append((tag, text))
tag_data[self.REFINED_TAG_KEY] = tag

return new_tags
return tags


class TransferredEmailProcessor(MelusineTransformer):
Expand Down

0 comments on commit cc197e3

Please sign in to comment.