Skip to content

Commit

Permalink
Remove methods from facade
Browse files Browse the repository at this point in the history
  • Loading branch information
christoph-blessing committed Feb 28, 2024
1 parent d3ac1ee commit a8258ce
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 25 deletions.
2 changes: 1 addition & 1 deletion link/adapters/facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def finish_delete_process(self, primary_keys: Iterable[PrimaryKey]) -> None:

@abstractmethod
def deprecate(self, primary_keys: Iterable[PrimaryKey]) -> None:
"""Deprecate the entities identified by the given primary keys."""
"""Deprecate the entities identified by the given primary key."""


ProcessType = Literal["PULL", "DELETE", "NONE"]
Expand Down
26 changes: 2 additions & 24 deletions link/infrastructure/facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

from collections.abc import Callable
from tempfile import TemporaryDirectory
from typing import Any, ContextManager, Iterable, Literal, Mapping, Protocol, Sequence, Union, cast
from typing import Any, ContextManager, Iterable, Literal, Mapping, Protocol, Sequence, Union

from link.adapters import PrimaryKey
from link.adapters.facade import DJAssignment, DJAssignments, DJCondition, DJProcess, ProcessType
from link.adapters.facade import DJAssignment, DJCondition, DJProcess, ProcessType
from link.adapters.facade import DJLinkFacade as AbstractDJLinkFacade


Expand Down Expand Up @@ -66,28 +66,6 @@ def __init__(self, source: Callable[[], Table], outbound: Callable[[], Table], l
self.outbound = outbound
self.local = local

def get_assignments(self) -> DJAssignments:
"""Get the assignments of primary keys to tables."""
return DJAssignments(
cast("list[PrimaryKey]", self.source().proj().fetch(as_dict=True)),
cast("list[PrimaryKey]", self.outbound().proj().fetch(as_dict=True)),
cast("list[PrimaryKey]", self.local().proj().fetch(as_dict=True)),
)

def get_processes(self) -> list[DJProcess]:
"""Get the current process (if any) from each entity in the outbound table."""
rows = self.outbound().proj("process").fetch(as_dict=True)
processes: list[DJProcess] = []
for row in rows:
process = row.pop("process")
processes.append(DJProcess(row, process))
return processes

def get_tainted_primary_keys(self) -> list[PrimaryKey]:
"""Get the flagged (i.e. tainted) primary keys from the outbound table."""
rows = (self.outbound() & 'is_flagged = "TRUE"').proj().fetch(as_dict=True)
return cast("list[PrimaryKey]", rows)

def get_assignment(self, primary_key: PrimaryKey) -> DJAssignment:
"""Get the assignment of the entity with the given primary key."""
return DJAssignment(
Expand Down

0 comments on commit a8258ce

Please sign in to comment.