From 6bd3b259faebf16c534a5af1b5dbe4a3db5e575d Mon Sep 17 00:00:00 2001 From: Nikoletos Konstantinos <47646955+Nikoletos-K@users.noreply.github.com> Date: Mon, 15 Jan 2024 12:35:17 +0200 Subject: [PATCH] Added code for r:0.1.5 Added - SCANN/FAISS - RowColumnClustering - Schema Matching & Clustering component Co-Authored-By: Jakub Maciejewski <71031279+JacobMaciejewski@users.noreply.github.com> --- docs/tutorials/SchemaMatching.ipynb | 2 +- pyproject.toml | 5 +- src/pyjedai/_version.py | 2 +- src/pyjedai/clustering.py | 155 ++++++++++++- src/pyjedai/schema/clustering.py | 215 ++++++++++++++++++ .../matching.py} | 4 +- src/pyjedai/utils.py | 3 +- src/pyjedai/vector_based_blocking.py | 64 +++++- src/pyjedai/workflow.py | 91 +++++--- 9 files changed, 495 insertions(+), 46 deletions(-) create mode 100644 src/pyjedai/schema/clustering.py rename src/pyjedai/{schema_matching.py => schema/matching.py} (98%) diff --git a/docs/tutorials/SchemaMatching.ipynb b/docs/tutorials/SchemaMatching.ipynb index 506d23b..046e850 100644 --- a/docs/tutorials/SchemaMatching.ipynb +++ b/docs/tutorials/SchemaMatching.ipynb @@ -131,7 +131,7 @@ "metadata": {}, "outputs": [], "source": [ - "from pyjedai.schema_matching import ValentineMethodBuilder, ValentineSchemaMatching\n", + "from pyjedai.schema.matching import ValentineMethodBuilder, ValentineSchemaMatching\n", "\n", "sm = ValentineSchemaMatching(ValentineMethodBuilder.cupid_matcher())\n", "sm.process(data)\n" diff --git a/pyproject.toml b/pyproject.toml index 667f981..601d9a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "pyjedai" -version = "0.1.4" +version = "0.1.5" description = "An open-source library that builds powerful end-to-end Entity Resolution workflows." readme = "README.md" authors = [ @@ -67,7 +67,8 @@ dependencies = [ "ordered-set >= 4.0", "plotly >= 5.16.0", "shapely >= 2.0", - 'scann ; platform_system == "Linux"' + 'scann ; platform_system == "Linux"', + 'falconn ; platform_system == "Linux"' ] [project.optional-dependencies] diff --git a/src/pyjedai/_version.py b/src/pyjedai/_version.py index bbab024..1276d02 100644 --- a/src/pyjedai/_version.py +++ b/src/pyjedai/_version.py @@ -1 +1 @@ -__version__ = "0.1.4" +__version__ = "0.1.5" diff --git a/src/pyjedai/clustering.py b/src/pyjedai/clustering.py index dca0293..11f14fc 100644 --- a/src/pyjedai/clustering.py +++ b/src/pyjedai/clustering.py @@ -14,6 +14,7 @@ from collections import defaultdict import random from ordered_set import OrderedSet +import math RANDOM_SEED = 42 @@ -320,10 +321,15 @@ def stats(self) -> None: class AbstractClustering(PYJEDAIFeature): + _method_name: str = "Abstract Clustering" + _method_short_name: str = "AC" + _method_info: str = "Abstract Clustering Method" + def __init__(self) -> None: super().__init__() self.data: Data self.similarity_threshold: float = 0.1 + self.execution_time: float = 0.0 def evaluate(self, prediction, @@ -367,6 +373,9 @@ def evaluate(self, def stats(self) -> None: pass + def _configuration(self) -> dict: + return {} + def export_to_df(self, prediction: list) -> pd.DataFrame: """creates a dataframe for the evaluation report @@ -769,6 +778,7 @@ class CorrelationClustering(AbstractClustering): "In essence, it implements iterative clustering, " + \ "reassigning clusters to randomly chosen entities based on the reassignment's effect on our objective function " + \ "that evaluates the quality of the newly defined clusters." + def __init__(self) -> None: super().__init__() self.similarity_threshold: float @@ -777,6 +787,7 @@ def __init__(self) -> None: self.non_similarity_threshold : float self.move_limit : int self.lsi_iterations: int + def process(self, graph: Graph, data: Data, @@ -1214,7 +1225,7 @@ def process(self, for edge in edges: man, woman, similarity = edge.left_node, edge.right_node, edge.similarity new_graph.add_edge(man, woman, weight=similarity) - + clusters = list(connected_components(new_graph)) self.execution_time = time() - start_time return clusters @@ -1431,5 +1442,147 @@ def process(self, self.execution_time = time() - start_time return clusters + def _configuration(self) -> dict: + return {} + + +class RowColumnClustering(AbstractClustering): + """Ιmplements the Row Column Clustering algorithm. For each row and column find their equivalent + column and row respectively corresponding to the smallest similarity. Subsequently, chooses + either rows or columns dependent on which one has the highest out of the lowest similariities + on average. + """ + + _method_name: str = "Row Column Clustering" + _method_short_name: str = "RCC" + _method_info: str = "Ιmplements the Row Column Clustering algorithm," + \ + "In essence, it is a 3/2-approximation to the Maximum Stable Marriage (MSM) problem." + def __init__(self) -> None: + super().__init__() + self.similarity_threshold : float + + def process(self, + graph: Graph, + data: Data, + similarity_threshold: float = 0.5) -> list: + + start_time = time() + self.similarity_threshold : float = similarity_threshold + self.data = data + number_of_comparisons : int = len(graph.edges(data=True)) + self.similarity = lil_matrix((self.data.num_of_entities_1, self.data.num_of_entities_2), dtype=float) + matched_ids = set() + new_graph : Graph = Graph() + clusters : list = [] + + if(number_of_comparisons == 0): + return clusters + + if self.data.is_dirty_er: + raise ValueError(f"Kiraly MSM Approximate Clustering doesn't support Dirty ER.") + + for (v1, v2, data) in graph.edges(data=True): + d1_id, d2_id = self.sorted_indicators(v1, v2) + d1_index, d2_index = (self.id_to_index(d1_id), self.id_to_index(d2_id)) + similarity_score = data.get('weight', 0) + + if(similarity_score > self.similarity_threshold): + self.similarity[d1_index, d2_index] = similarity_score + + self.initialize(self.get_negative(self.similarity)) + self.solution_proxy = self.get_solution() + + for entry in range(len(self.solution_proxy)): + d1_index = entry + d2_index = self.solution_proxy[entry] + _similarity = self.similarity[d1_index, d2_index] + if(_similarity < self.similarity_threshold): + continue + d2_index += self.data.dataset_limit + + if(d1_index in matched_ids or d2_index in matched_ids): + continue + + matched_ids.add(d1_index) + matched_ids.add(d2_index) + new_graph.add_edge(d1_index, d2_index, weight=_similarity) + + + clusters = list(connected_components(new_graph)) + self.execution_time = time() - start_time + return clusters + + def get_min_row(self, column): + position = -1 + minimum = math.inf + + for row in range(self.similarity.shape[0]): + if(self.row_covered[row]): continue + if(self.similarity[row, column] < minimum): + position = row + minimum = self.similarity[row, column] + + return position + + def get_min_column(self, row): + position = -1 + minimum = math.inf + + for column in range(self.similarity.shape[1]): + if(self.column_covered[column]): continue + if(self.similarity[row, column] < minimum): + position = column + minimum = self.similarity[row, column] + + return position + + def get_row_assignment(self): + self.row_scan_cost = 0.0 + + for row in range(self.similarity.shape[0]): + self.selected_column[row] = self.get_min_column(row) + _selected_column = self.selected_column[row] + if(_selected_column == -1): break + self.column_covered[_selected_column] = True + self.row_scan_cost += self.similarity[row, _selected_column] + + def get_column_assignment(self): + self.column_scan_cost = 0.0 + + for column in range(self.similarity.shape[1]): + self.selected_row[column] = self.get_min_row(column) + _selected_row = self.selected_row[column] + if(_selected_row == -1): break + self.columns_from_selected_row[_selected_row] = column + self.row_covered[_selected_row] = True + self.column_scan_cost += self.similarity[_selected_row, column] + + def get_solution(self): + self.get_row_assignment() + self.get_column_assignment() + + if(self.row_scan_cost < self.column_scan_cost): + return self.selected_column + else: + return self.columns_from_selected_row + + def get_negative(self, similarity_matrix) -> np.array: + self.negative_similarity = lil_matrix((self.data.num_of_entities_1, self.data.num_of_entities_2), dtype=float) + + for row in range(similarity_matrix.shape[0]): + for column in range(similarity_matrix.shape[1]): + self.negative_similarity[row, column] = 1.0 - similarity_matrix[row, column] + + return self.negative_similarity + + def initialize(self, similarity_matrix) -> None: + self.similarity = similarity_matrix + self.selected_column = [0] * similarity_matrix.shape[0] + self.selected_row = [0] * similarity_matrix.shape[1] + self.row_covered = [False] * similarity_matrix.shape[0] + self.column_covered = [False] * similarity_matrix.shape[1] + + self.columns_from_selected_row = [0] * similarity_matrix.shape[0] + def _configuration(self) -> dict: return {} \ No newline at end of file diff --git a/src/pyjedai/schema/clustering.py b/src/pyjedai/schema/clustering.py new file mode 100644 index 0000000..b3cf17c --- /dev/null +++ b/src/pyjedai/schema/clustering.py @@ -0,0 +1,215 @@ +import pandas as pd +import time as time + +from tqdm import tqdm +from ..datamodel import Data, PYJEDAIFeature +from ..evaluation import Evaluation +from ..workflow import PYJEDAIWorkFlow, BlockingBasedWorkFlow +from ..clustering import AbstractClustering +from abc import abstractmethod + +from typing import Optional, List, Tuple + +class AbstractSchemaClustering(AbstractClustering): + """Abstract class for schema clustering methods + """ + + def __init__(self): + super().__init__() + self.execution_time: float = 0.0 + self.schema_clustering_execution_time: float = 0.0 + + @abstractmethod + def _configuration(self) -> dict: + pass + + @abstractmethod + def stats(self) -> None: + pass + + def report(self) -> None: + """Prints method configuration + """ + print( + "Method name: " + self._method_name + + "\nMethod info: " + self._method_info + + ("\nParameters: \n" + ''.join(['\t{0}: {1}\n'.format(k, v) for k, v in self._configuration().items()]) if self._configuration().items() else "\nParameters: Parameter-Free method\n") + + "\nRuntime (schema-clustering): {:2.4f} seconds".format(self.schema_clustering_execution_time) + + "\nRuntime (total): {:2.4f} seconds".format(self.execution_time) + ) + +class SchemaClustering(AbstractSchemaClustering): + """Class to provide schema clustering methods + """ + _method_name = "Schema Clustering" + _method_info = "Performs pyjedai workflow to the names or values of the given data schema and then for each cluster performs pyjedai workflow for entity resolution" + + def __init__(self): + super().__init__() + self.on: str = 'names' + self.schema_clustering_workflow: str + self.entity_resolution_workflow: str + + def _configuration(self) -> dict: + return { + 'on': self.on, + 'schema_clustering_workflow': self.schema_clustering_workflow, + 'entity_resolution_workflow': self.entity_resolution_workflow + } + + def stats(self) -> None: + pass + + def process(self, + data: Data, on='names', + pyjedai_workflow_for_clustering: PYJEDAIWorkFlow = None, + pyjedai_workflow_for_er: PYJEDAIWorkFlow = None, + verbose_schema_clustering: bool = False, + verbose_er: bool = False, + return_clusters = False) -> Optional[List[Tuple[pd.DataFrame, pd.DataFrame]]]: + + _start_time = time.time() + self.data = data + self.on = on + + if pyjedai_workflow_for_clustering == None: + pyjedai_workflow_for_clustering = BlockingBasedWorkFlow() + if data.is_dirty_er: + pyjedai_workflow_for_clustering.default_schema_clustering_workflow_der() + else: + pyjedai_workflow_for_clustering.default_schema_clustering_workflow_ccer() + + self.schema_clustering_workflow = pyjedai_workflow_for_clustering.name + + print("Workflow used for schema clustering:", pyjedai_workflow_for_clustering.name) + # D1 + entities_d1 = dict() + for column in data.dataset_1.columns: + if on == 'names': + entities_d1[column] = column + elif on == 'values': + entities_d1[column] = ' '.join(data.dataset_1[column].astype(str)) + elif on == 'hybrid': + entities_d1[column] = column + ' ' + ' '.join(data.dataset_1[column].astype(str)) + else: + raise ValueError("on parameter must be one of 'names', 'values' or 'hybrid'") + # print(entities_d1) + + entities_d2 = None + if not data.is_dirty_er: + entities_d2 = dict() + # D2 + for column in data.dataset_2.columns: + if on == 'names': + entities_d2[column] = column + elif on == 'values': + entities_d2[column] = ' '.join(data.dataset_2[column].astype(str)) + elif on == 'hybrid': + entities_d2[column] = column + ' ' + ' '.join(data.dataset_2[column].astype(str)) + else: + raise ValueError("on parameter must be one of 'names', 'values' or 'hybrid'") + # print(entities_d2) + + # Create dataframes from dictionaries + attributes_d1 = pd.DataFrame.from_dict(entities_d1, orient='index', columns=['attribute']) + attributes_d1['ids'] = range(0, len(attributes_d1)) + + if not data.is_dirty_er: + attributes_d2 = pd.DataFrame.from_dict(entities_d2, orient='index', columns=['attribute']) + attributes_d2['ids'] = range(0, len(attributes_d2)) + + # Clustering with pyJedAI + attributes_data = Data( + dataset_1=attributes_d1, + attributes_1=['attribute'], + id_column_name_1='ids', + dataset_2=attributes_d2, + attributes_2=['attribute'], + id_column_name_2='ids' + ) + + # attributes_data.print_specs() + + pyjedai_workflow_for_clustering.run(attributes_data, verbose=verbose_schema_clustering) + + clusters = pyjedai_workflow_for_clustering.clusters + + # Create a new cluster of entities not in clusters + def find_entities_not_in_clusters(ids, clusters): + all_entities = set(ids) + entities_in_clusters = set.union(*clusters) + entities_not_in_clusters = all_entities - entities_in_clusters + new_set = set(entities_not_in_clusters) + return new_set + + all_ids = set(range(0, len(attributes_data.dataset_1) + len(attributes_data.dataset_2))) + redundant_entities = find_entities_not_in_clusters(all_ids, clusters) + if len(redundant_entities) > 0: + clusters.append(redundant_entities) + + # print("\n\n\n Clusters: ", clusters) + + def contains_attributes_from_both(limit, cluster): + has_entity_from_d1 = any(num < limit for num in cluster) + has_entity_from_d2 = any(num >= limit for num in cluster) + + return has_entity_from_d1 and has_entity_from_d2 + + new_datasets = [] + for i, cluster in enumerate(clusters): + + if not contains_attributes_from_both(attributes_data.dataset_limit, cluster): + continue + + non_nan_indexes_d1 = set() + non_nan_indexes_d2 = set() if not data.is_dirty_er else None + + for entity in cluster: + if entity < attributes_data.dataset_limit: + attribute_name = attributes_data.dataset_1.iloc[entity].name + new_ids_d1 = set(data.dataset_1[data.dataset_1[attribute_name].notna()].index) + non_nan_indexes_d1.update(new_ids_d1) + else: + attribute_name = attributes_data.dataset_2.iloc[entity-attributes_data.dataset_limit].name + non_nan_indexes_d2_attr = set(data.dataset_2[data.dataset_2[attribute_name].notna()].index) + new_ids_2 = set(map(lambda x: x - attributes_data.dataset_limit, non_nan_indexes_d2_attr)) + non_nan_indexes_d2.update(new_ids_2) + + new_df_1 = data.dataset_1.iloc[list(non_nan_indexes_d1)] + + if not data.is_dirty_er: + new_df_2 = data.dataset_2.iloc[list(non_nan_indexes_d2)] + new_datasets.append((new_df_1, new_df_2)) + + self.schema_clustering_execution_time = time.time() - _start_time + # print(new_datasets) + if return_clusters: + return new_datasets + + if pyjedai_workflow_for_er == None: + pyjedai_workflow_for_er = BlockingBasedWorkFlow() + if data.is_dirty_er: + pyjedai_workflow_for_er.best_blocking_workflow_der() + else: + pyjedai_workflow_for_er.best_blocking_workflow_ccer() + + self.entity_resolution_workflow = pyjedai_workflow_for_er.name + + all_clusters = [] + for i in tqdm(range(len(new_datasets)), desc="Entity resolution for clusters"): + d1, d2 = new_datasets[i] + new_data = Data( + dataset_1=d1, + attributes_1=data.attributes_1, + id_column_name_1=data.id_column_name_1, + dataset_2=d2, + attributes_2=data.attributes_2, + id_column_name_2=data.id_column_name_2, + ground_truth=data.ground_truth + ) + pyjedai_workflow_for_er.run(new_data, verbose=verbose_er) + new_clusters = pyjedai_workflow_for_er.clusters + all_clusters += new_clusters + + self.execution_time = time.time() - _start_time + return all_clusters diff --git a/src/pyjedai/schema_matching.py b/src/pyjedai/schema/matching.py similarity index 98% rename from src/pyjedai/schema_matching.py rename to src/pyjedai/schema/matching.py index c7a773e..4db4d9d 100644 --- a/src/pyjedai/schema_matching.py +++ b/src/pyjedai/schema/matching.py @@ -11,8 +11,8 @@ import valentine.metrics as valentine_metrics from pandas import DataFrame, concat -from .datamodel import Block, SchemaData, PYJEDAIFeature -from .evaluation import Evaluation +from ..datamodel import Block, SchemaData, PYJEDAIFeature +from ..evaluation import Evaluation from abc import abstractmethod class AbstractSchemaMatching(PYJEDAIFeature): diff --git a/src/pyjedai/utils.py b/src/pyjedai/utils.py index 6e69d81..86a240e 100644 --- a/src/pyjedai/utils.py +++ b/src/pyjedai/utils.py @@ -1130,7 +1130,6 @@ def load_distance_matrix_from_path(self, path : str) -> np.ndarray: try: print(f"Loading Distance Matrix from: {path}") return np.load(path) - pass except FileNotFoundError: print(f"Unable to load distance matrix -> {path}") @@ -1193,7 +1192,7 @@ def fit(self, indexing : str, d1_entities : list = None, d2_entities : list = None, - save_dm : bool = True) -> None: + save_dm : bool = False) -> None: """Initializes the entities' corpus, and constructs the similarity matrix Args: metric (str): Distance metric for entity strings diff --git a/src/pyjedai/vector_based_blocking.py b/src/pyjedai/vector_based_blocking.py index db09355..3e081b0 100644 --- a/src/pyjedai/vector_based_blocking.py +++ b/src/pyjedai/vector_based_blocking.py @@ -16,6 +16,7 @@ RUNNING_OS = platform.system() if RUNNING_OS != "Windows": import scann + import falconn import gensim.downloader as api import networkx as nx import numpy as np @@ -27,6 +28,7 @@ BertTokenizer, DistilBertModel, DistilBertTokenizer, RobertaModel, RobertaTokenizer, XLNetModel, XLNetTokenizer) +from math import log transformers.logging.set_verbosity_error() from faiss import normalize_L2 @@ -41,14 +43,6 @@ EMBEDDINGS_DIR = os.path.abspath(EMBEDDINGS_DIR) print('Created embeddings directory at: ' + EMBEDDINGS_DIR) -LINUX_ENV=False -# try: -# if 'linux' in sys.platform: -# import falconn -# import scann -# LINUX_ENV=True -# except: -# warnings.warn(ImportWarning, "Can't use FALCONN/SCANN in windows environment") class EmbeddingsNNBlockBuilding(PYJEDAIFeature): """Block building via creation of embeddings and a Nearest Neighbor Approach. @@ -259,7 +253,7 @@ def build_blocks(self, self._faiss_metric_type = faiss.METRIC_L2 self._similarity_search_with_FAISS() elif self.similarity_search == 'falconn': - raise NotImplementedError("FALCONN") + self._similarity_search_with_FALCONN() elif self.similarity_search == 'scann': self._similarity_search_with_SCANN() else: @@ -436,7 +430,55 @@ def _similarity_search_with_FAISS(self): self.graph.add_edge(_entity_id, _neighbor_id, weight=self.distances[_entity][_neighbor_index]) def _similarity_search_with_FALCONN(self): - raise NotImplementedError("FALCONN is not implemented yet.") + _falconn_parameters = falconn.LSHConstructionParameters() + _falconn_parameters.distance_function = falconn.DistanceFunction.NegativeInnerProduct \ + if (self.similarity_distance == 'cosine') \ + else falconn.DistanceFunction.EuclideanSquared + + _normalized_source_vectors = self.vectors_1 / np.linalg.norm(self.vectors_1, axis=1)[:, np.newaxis] + _normalized_target_vectors = _normalized_source_vectors \ + if self.data.is_dirty_er \ + else self.vectors_2 / np.linalg.norm(self.vectors_2, axis=1)[:, np.newaxis] + + + _normalized_source_vectors = - _normalized_source_vectors + _normalized_target_vectors = -_normalized_target_vectors + + _falconn_parameters.dimension = _normalized_source_vectors.shape[1] + _falconn_parameters.lsh_family = falconn.LSHFamily.CrossPolytope + _falconn_parameters.storage_hash_table = falconn.StorageHashTable.FlatHashTable + _falconn_parameters.num_setup_threads = 0 + _falconn_parameters.l = 50 + _falconn_parameters.num_rotations = 1 + falconn.compute_number_of_hash_functions(int(log(_normalized_source_vectors.shape[0],2)), _falconn_parameters) + _normalized_source_vectors = _normalized_source_vectors.astype(np.float32) + _normalized_target_vectors = _normalized_target_vectors.astype(np.float32) + + index = falconn.LSHIndex(_falconn_parameters) + index.setup(_normalized_source_vectors) + query_object = index.construct_query_object() + + self.blocks = dict() + + for _taget_entity_index, _normalized_target_vector in enumerate(_normalized_target_vectors): + _target_entity_id = self._si.d1_retained_ids[_taget_entity_index] \ + if self.data.is_dirty_er \ + else self._si.d2_retained_ids[_taget_entity_index] + _source_entity_ids = query_object.find_k_nearest_neighbors(query=_normalized_target_vector, k=self.top_k) + # if _target_entity_id not in self.blocks: + # self.blocks[_target_entity_id] = set() + + for id in _source_entity_ids: + + _source_entity_id = self._si.d1_retained_ids[id] + + if _source_entity_id not in self.blocks: + self.blocks[_source_entity_id] = set() + + self.blocks[_source_entity_id].add(_target_entity_id) + + if self.with_entity_matching: + self.graph.add_edge(_source_entity_id, _target_entity_id) def _similarity_search_with_SCANN(self): @@ -483,7 +525,7 @@ def _similarity_search_with_SCANN(self): self.blocks[_neighbor_id] = set() self.blocks[_neighbor_id].add(_entity_id) - self.blocks[_entity_id].add(_neighbor_id) + # self.blocks[_entity_id].add(_neighbor_id) if self.with_entity_matching: self.graph.add_edge(_entity_id, _neighbor_id, weight=self.distances[_entity][_neighbor_index]) diff --git a/src/pyjedai/workflow.py b/src/pyjedai/workflow.py index 185a10d..06b2cc0 100644 --- a/src/pyjedai/workflow.py +++ b/src/pyjedai/workflow.py @@ -209,6 +209,7 @@ def get_final_scores(self) -> Tuple[float, float, float]: Tuple[float, float, float]: F-Measure, Precision, Recall. """ return self.f1[-1], self.precision[-1], self.recall[-1] + class ProgressiveWorkFlow(PYJEDAIWorkFlow): """Main module of the pyjedAI and the simplest way to create an end-to-end PER workflow. """ @@ -680,11 +681,12 @@ def run(self, if "attributes_2" in self.block_building else None, tqdm_disable=workflow_step_tqdm_disable) self.final_pairs = block_building_blocks - res = block_building_method.evaluate(block_building_blocks, - export_to_dict=True, - with_classification_report=with_classification_report, - verbose=verbose) - self._save_step(res, block_building_method.method_configuration()) + if data.ground_truth is not None: + res = block_building_method.evaluate(block_building_blocks, + export_to_dict=True, + with_classification_report=with_classification_report, + verbose=verbose) + self._save_step(res, block_building_method.method_configuration()) self._workflow_bar.update(1) # # Block cleaning step [optional]: Multiple algorithms @@ -703,11 +705,12 @@ def run(self, tqdm_disable=workflow_step_tqdm_disable) self.final_pairs = bblocks = block_cleaning_blocks - res = block_cleaning_method.evaluate(bblocks, - export_to_dict=True, - with_classification_report=with_classification_report, - verbose=verbose) - self._save_step(res, block_cleaning_method.method_configuration()) + if data.ground_truth is not None: + res = block_cleaning_method.evaluate(bblocks, + export_to_dict=True, + with_classification_report=with_classification_report, + verbose=verbose) + self._save_step(res, block_cleaning_method.method_configuration()) self._workflow_bar.update(1) # # Comparison cleaning step [optional] @@ -723,11 +726,12 @@ def run(self, else block_building_blocks, data, tqdm_disable=workflow_step_tqdm_disable) - res = comparison_cleaning_method.evaluate(comparison_cleaning_blocks, - export_to_dict=True, - with_classification_report=with_classification_report, - verbose=verbose) - self._save_step(res, comparison_cleaning_method.method_configuration()) + if data.ground_truth is not None: + res = comparison_cleaning_method.evaluate(comparison_cleaning_blocks, + export_to_dict=True, + with_classification_report=with_classification_report, + verbose=verbose) + self._save_step(res, comparison_cleaning_method.method_configuration()) self._workflow_bar.update(1) # # Entity Matching step @@ -750,11 +754,12 @@ def run(self, tqdm_disable=workflow_step_tqdm_disable, **self.entity_matching["exec_params"]) - res = entity_matching_method.evaluate(em_graph, - export_to_dict=True, - with_classification_report=with_classification_report, - verbose=verbose) - self._save_step(res, entity_matching_method.method_configuration()) + if data.ground_truth is not None: + res = entity_matching_method.evaluate(em_graph, + export_to_dict=True, + with_classification_report=with_classification_report, + verbose=verbose) + self._save_step(res, entity_matching_method.method_configuration()) self._workflow_bar.update(1) # # Clustering step [optional] @@ -768,15 +773,17 @@ def run(self, else: self.final_pairs = components = clustering_method.process(em_graph, data, **self.clustering["exec_params"]) - res = clustering_method.evaluate(components, - export_to_dict=True, - with_classification_report=False, - verbose=verbose) - self._save_step(res, clustering_method.method_configuration()) + self.clusters = components + if data.ground_truth is not None: + res = clustering_method.evaluate(components, + export_to_dict=True, + with_classification_report=False, + verbose=verbose) + self._save_step(res, clustering_method.method_configuration()) self.workflow_exec_time = time() - start_time self._workflow_bar.update(1) # self.runtime.append(self.workflow_exec_time) - + ############################################ # Pre-defined workflows same as JedAI # ############################################ @@ -822,6 +829,38 @@ def best_blocking_workflow_der(self) -> None: self.clustering = dict(method=ConnectedComponentsClustering), self.name="best-der-workflow" + def default_schema_clustering_workflow_der(self) -> None: + """Default D-ER workflow. + + Returns: + PYJEDAIWorkFlow: Best workflow + """ + self.block_building = dict(method=StandardBlocking) + self.block_cleaning = [ + dict(method=BlockPurging, params=dict(smoothing_factor=1.0)), + dict(method=BlockFiltering) + ] + self.entity_matching = dict(method=EntityMatching, + params=dict(metric='cosine', + similarity_threshold=0.35)) + self.clustering = dict(method=ConnectedComponentsClustering), + self.name="best-schema-clustering-der-workflow" + + + def default_schema_clustering_workflow_ccer(self) -> None: + """Default CC-ER workflow. + """ + self.block_building = dict(method=StandardBlocking) + self.block_cleaning = [ + dict(method=BlockPurging, params=dict(smoothing_factor=1.0)), + dict(method=BlockFiltering) + ] + self.entity_matching = dict(method=EntityMatching, + metric='cosine', + similarity_threshold=0.35) + self.clustering = dict(method=ConnectedComponentsClustering) + self.name="default-schema-clustering-ccer-workflow" + def default_blocking_workflow_ccer(self) -> None: """Default CC-ER workflow. """