Skip to content

Commit

Permalink
Merge pull request #58 from christoph-blessing/load_entities_on_demand
Browse files Browse the repository at this point in the history
Load entities on demand
  • Loading branch information
christoph-blessing authored Feb 28, 2024
2 parents 5f4dfa7 + a8258ce commit 3cf618c
Show file tree
Hide file tree
Showing 26 changed files with 596 additions and 750 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class Table:

Note that the name of the declared class must match the name of the table from which the data will be pulled.

The class returned by the decorator behaves like a regular table with some added functionality. For one it allows the browsing of rows that can be pulled from the source:
The class returned by the decorator behaves like a regular table with some added functionality. For one it allows the browsing of rows present in the source:

```python
Table().source
Expand Down
4 changes: 0 additions & 4 deletions link/adapters/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,3 @@ def pull(self, primary_keys: Iterable[PrimaryKey]) -> None:
def delete(self, primary_keys: Iterable[PrimaryKey]) -> None:
"""Execute the delete use-case."""
self._message_bus.handle(commands.DeleteEntities(frozenset(self._translator.to_identifiers(primary_keys))))

def list_unshared_entities(self) -> None:
"""Execute the use-case that lists unshared entities."""
self._message_bus.handle(commands.ListUnsharedEntities())
51 changes: 36 additions & 15 deletions link/adapters/facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from abc import ABC, abstractmethod
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Literal, Union
from typing import Literal

from .custom_types import PrimaryKey

Expand All @@ -14,52 +14,55 @@ class DJLinkFacade(ABC):
"""A facade around a link that is persisted using DataJoint."""

@abstractmethod
def get_assignments(self) -> DJAssignments:
"""Get the assignments of primary keys to the different components."""
def get_assignment(self, primary_key: PrimaryKey) -> DJAssignment:
"""Get the assignment of the entity with the given primary key to components."""

@abstractmethod
def get_tainted_primary_keys(self) -> list[PrimaryKey]:
"""Get all tainted primary keys."""
def get_condition(self, primary_key: PrimaryKey) -> DJCondition:
"""Get the condition of the entity with the given primary key."""

@abstractmethod
def get_processes(self) -> list[DJProcess]:
"""Get all processes associated with entities."""
def get_process(self, primary_key: PrimaryKey) -> DJProcess:
"""Get the process of the entity with the given primary key."""

@abstractmethod
def add_to_local(self, primary_keys: Iterable[PrimaryKey]) -> None:
"""Add the entity identified by the given primary key to the local component."""
"""Add the entities identified by the given primary keys to the local component."""

@abstractmethod
def remove_from_local(self, primary_keys: Iterable[PrimaryKey]) -> None:
"""Remove the entity identified by the given primary key from the local component."""
"""Remove the entities identified by the given primary keys from the local component."""

@abstractmethod
def start_pull_process(self, primary_keys: Iterable[PrimaryKey]) -> None:
"""Start the pull process for the entity identified by the given primary key."""
"""Start the pull process for the entities identified by the given primary keys."""

@abstractmethod
def finish_pull_process(self, primary_keys: Iterable[PrimaryKey]) -> None:
"""Finish the pull process for the entity identified by the given primary key."""
"""Finish the pull process for the entities identified by the given primary keys."""

@abstractmethod
def start_delete_process(self, primary_keys: Iterable[PrimaryKey]) -> None:
"""Start the delete process for the entity identified by the given primary key."""
"""Start the delete process for the entities identified by the given primary keys."""

@abstractmethod
def finish_delete_process(self, primary_keys: Iterable[PrimaryKey]) -> None:
"""Finish the delete process for the entity identified by the given primary key."""
"""Finish the delete process for the entities identified by the given primary keys."""

@abstractmethod
def deprecate(self, primary_keys: Iterable[PrimaryKey]) -> None:
"""Deprecate the entity identified by the given primary key."""
"""Deprecate the entities identified by the given primary key."""


ProcessType = Literal["PULL", "DELETE", "NONE"]


@dataclass(frozen=True)
class DJProcess:
"""An association between an entity's primary key and its process."""

primary_key: PrimaryKey
current_process: Union[Literal["PULL"], Literal["DELETE"], Literal["NONE"]]
current_process: ProcessType


@dataclass(frozen=True)
Expand All @@ -69,3 +72,21 @@ class DJAssignments:
source: list[PrimaryKey]
outbound: list[PrimaryKey]
local: list[PrimaryKey]


@dataclass(frozen=True)
class DJAssignment:
"""The presence of a specific entity in the three different tables that make up its link."""

primary_key: PrimaryKey
source: bool
outbound: bool
local: bool


@dataclass(frozen=True)
class DJCondition:
"""The condition of a specific entity."""

primary_key: PrimaryKey
is_flagged: bool
52 changes: 21 additions & 31 deletions link/adapters/gateway.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
"""Contains the DataJoint gateway class and related classes/functions."""
from __future__ import annotations

from collections import defaultdict
from itertools import groupby
from typing import Iterable

from link.domain import events
from link.domain.custom_types import Identifier
from link.domain.link import Link, create_link
from link.domain.state import Commands, Components, Processes
from link.domain.link import create_entity
from link.domain.state import Commands, Components, Entity, Processes
from link.service.gateway import LinkGateway

from .custom_types import PrimaryKey
from .facade import DJAssignments, DJLinkFacade, DJProcess
from .facade import DJLinkFacade
from .identification import IdentificationTranslator


Expand All @@ -24,32 +22,24 @@ def __init__(self, facade: DJLinkFacade, translator: IdentificationTranslator) -
self.facade = facade
self.translator = translator

def create_link(self) -> Link:
"""Create a link instance from persistent data."""

def translate_assignments(dj_assignments: DJAssignments) -> dict[Components, set[Identifier]]:
return {
Components.SOURCE: self.translator.to_identifiers(dj_assignments.source),
Components.OUTBOUND: self.translator.to_identifiers(dj_assignments.outbound),
Components.LOCAL: self.translator.to_identifiers(dj_assignments.local),
}

def translate_processes(dj_processes: Iterable[DJProcess]) -> dict[Processes, set[Identifier]]:
persisted_to_domain_process_map = {"PULL": Processes.PULL, "DELETE": Processes.DELETE}
domain_processes: dict[Processes, set[Identifier]] = defaultdict(set)
active_processes = [process for process in dj_processes if process.current_process != "NONE"]
for persisted_process in active_processes:
domain_process = persisted_to_domain_process_map[persisted_process.current_process]
domain_processes[domain_process].add(self.translator.to_identifier(persisted_process.primary_key))
return domain_processes

def translate_tainted_primary_keys(primary_keys: Iterable[PrimaryKey]) -> set[Identifier]:
return {self.translator.to_identifier(key) for key in primary_keys}

return create_link(
translate_assignments(self.facade.get_assignments()),
processes=translate_processes(self.facade.get_processes()),
tainted_identifiers=translate_tainted_primary_keys(self.facade.get_tainted_primary_keys()),
def create_entity(self, identifier: Identifier) -> Entity:
"""Create a entity instance from persistent data."""
dj_assignment = self.facade.get_assignment(self.translator.to_primary_key(identifier))
components = []
if dj_assignment.source:
components.append(Components.SOURCE)
if dj_assignment.outbound:
components.append(Components.OUTBOUND)
if dj_assignment.local:
components.append(Components.LOCAL)
dj_condition = self.facade.get_condition(self.translator.to_primary_key(identifier))
persisted_to_domain_process_map = {"PULL": Processes.PULL, "DELETE": Processes.DELETE, "NONE": Processes.NONE}
dj_process = self.facade.get_process(self.translator.to_primary_key(identifier))
return create_entity(
identifier,
components=components,
is_tainted=dj_condition.is_flagged,
process=persisted_to_domain_process_map[dj_process.current_process],
)

def apply(self, updates: Iterable[events.StateChanged]) -> None:
Expand Down
12 changes: 7 additions & 5 deletions link/adapters/identification.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from collections.abc import Iterable
from typing import Union
from uuid import UUID, uuid4
from uuid import uuid4

from link.domain.custom_types import Identifier

Expand All @@ -15,18 +15,20 @@ class IdentificationTranslator:

def __init__(self) -> None:
"""Initialize the translator."""
self.__mapping: dict[tuple[tuple[str, Union[str, int, float]], ...], UUID] = {}
self._key_to_identifier: dict[tuple[tuple[str, Union[str, int, float]], ...], Identifier] = {}
self._identifier_to_key: dict[Identifier, tuple[tuple[str, Union[str, int, float]], ...]] = {}

def to_identifier(self, primary_key: PrimaryKey) -> Identifier:
"""Translate the given primary key to its corresponding identifier."""
primary_key_tuple = tuple((k, v) for k, v in primary_key.items())
return Identifier(self.__mapping.setdefault(primary_key_tuple, uuid4()))
identifier = self._key_to_identifier.setdefault(primary_key_tuple, Identifier(uuid4()))
self._identifier_to_key[identifier] = primary_key_tuple
return identifier

def to_identifiers(self, primary_keys: Iterable[PrimaryKey]) -> set[Identifier]:
"""Translate multiple primary keys to their corresponding identifiers."""
return {self.to_identifier(key) for key in primary_keys}

def to_primary_key(self, identifier: Identifier) -> PrimaryKey:
"""Translate the given identifier to its corresponding primary key."""
primary_key_tuple = {v: k for k, v in self.__mapping.items()}[identifier]
return dict(primary_key_tuple)
return dict(self._identifier_to_key[identifier])
14 changes: 1 addition & 13 deletions link/adapters/present.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,13 @@
"""Logic associated with presenting information about finished use-cases."""
from __future__ import annotations

from typing import Callable, Iterable
from typing import Callable

from link.domain import events

from .custom_types import PrimaryKey
from .identification import IdentificationTranslator


def create_unshared_entities_updater(
translator: IdentificationTranslator, update: Callable[[Iterable[PrimaryKey]], None]
) -> Callable[[events.UnsharedEntitiesListed], None]:
"""Create a callable that when called updates the list of unshared entities."""

def update_unshared_entities(response: events.UnsharedEntitiesListed) -> None:
update(translator.to_primary_key(identifier) for identifier in response.identifiers)

return update_unshared_entities


def create_state_change_logger(
translator: IdentificationTranslator, log: Callable[[str], None]
) -> Callable[[events.StateChanged], None]:
Expand Down
139 changes: 18 additions & 121 deletions link/domain/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,126 +2,23 @@
from __future__ import annotations

from collections import deque
from typing import Any, Iterable, Iterator, Mapping, Optional, Set, TypeVar
from typing import Iterable

from .custom_types import Identifier
from .state import STATE_MAP, Components, Entity, PersistentState, Processes, State, Unshared


def create_link(
assignments: Mapping[Components, Iterable[Identifier]],
*,
tainted_identifiers: Optional[Iterable[Identifier]] = None,
processes: Optional[Mapping[Processes, Iterable[Identifier]]] = None,
) -> Link:
"""Create a new link instance."""

def pairwise_disjoint(sets: Iterable[Iterable[Any]]) -> bool:
union = set().union(*sets)
return len(union) == sum(len(set(s)) for s in sets)

T = TypeVar("T")
V = TypeVar("V")

def invert_mapping(mapping: Mapping[T, Iterable[V]]) -> dict[V, T]:
return {z: x for x, y in mapping.items() for z in y}

def validate_arguments(
assignments: Mapping[Components, Iterable[Identifier]],
tainted: Iterable[Identifier],
processes: Mapping[Processes, Iterable[Identifier]],
) -> None:
assert set(assignments[Components.OUTBOUND]) <= set(
assignments[Components.SOURCE]
), "Outbound must not be superset of source."
assert set(assignments[Components.LOCAL]) <= set(
assignments[Components.OUTBOUND]
), "Local must not be superset of source."
assert set(tainted) <= set(assignments[Components.SOURCE])
assert pairwise_disjoint(processes.values()), "Identifiers can not undergo more than one process."

def is_tainted(identifier: Identifier) -> bool:
assert tainted_identifiers is not None
return identifier in tainted_identifiers

def create_entities(
assignments: Mapping[Components, Iterable[Identifier]],
) -> set[Entity]:
def create_entity(identifier: Identifier) -> Entity:
presence = frozenset(
component for component, identifiers in assignments.items() if identifier in identifiers
)
persistent_state = PersistentState(
presence, is_tainted=is_tainted(identifier), has_process=identifier in processes_map
)
state = STATE_MAP[persistent_state]
return Entity(
identifier,
state=state,
current_process=processes_map.get(identifier, Processes.NONE),
is_tainted=is_tainted(identifier),
events=deque(),
)

return {create_entity(identifier) for identifier in assignments[Components.SOURCE]}

def assign_entities(entities: Iterable[Entity]) -> dict[Components, set[Entity]]:
def assign_to_component(component: Components) -> set[Entity]:
return {entity for entity in entities if entity.identifier in assignments[component]}

return {component: assign_to_component(component) for component in Components}

if tainted_identifiers is None:
tainted_identifiers = set()
if processes is None:
processes = {}
validate_arguments(assignments, tainted_identifiers, processes)
processes_map = invert_mapping(processes)
entity_assignments = assign_entities(create_entities(assignments))
return Link(entity_assignments[Components.SOURCE])


class Link(Set[Entity]):
"""The state of a link between two databases."""

def __init__(self, entities: Iterable[Entity]) -> None:
"""Initialize the link."""
self._entities = set(entities)

@property
def identifiers(self) -> frozenset[Identifier]:
"""Return the identifiers of all entities in the link."""
return frozenset(entity.identifier for entity in self)

def __getitem__(self, identifier: Identifier) -> Entity:
"""Return the entity with the given identifier."""
try:
return next(entity for entity in self if entity.identifier == identifier)
except StopIteration as error:
raise KeyError("Requested entity not present in link") from error

def list_unshared_entities(self) -> frozenset[Identifier]:
"""List the identifiers of all unshared entities in the link."""
return frozenset(entity.identifier for entity in self if entity.state is Unshared)

def __contains__(self, entity: object) -> bool:
"""Check if the link contains the given entity."""
return entity in self._entities

def __iter__(self) -> Iterator[Entity]:
"""Iterate over all entities in the link."""
return iter(self._entities)

def __len__(self) -> int:
"""Return the number of entities in the link."""
return len(self._entities)

def __eq__(self, other: object) -> bool:
"""Return True if both links have entities with the same identifiers and states."""
if not isinstance(other, type(self)):
raise NotImplementedError

def create_identifier_state_pairs(link: Link) -> set[tuple[Identifier, type[State]]]:
return {(entity.identifier, entity.state) for entity in link}

return create_identifier_state_pairs(self) == create_identifier_state_pairs(other)
from .state import STATE_MAP, Components, Entity, PersistentState, Processes


def create_entity(
identifier: Identifier, *, components: Iterable[Components], is_tainted: bool, process: Processes
) -> Entity:
"""Create an entity."""
presence = frozenset(components)
persistent_state = PersistentState(presence, is_tainted=is_tainted, has_process=process is not Processes.NONE)
state = STATE_MAP[persistent_state]
return Entity(
identifier,
state=state,
current_process=process,
is_tainted=is_tainted,
events=deque(),
)
Loading

0 comments on commit 3cf618c

Please sign in to comment.