Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename "idle" and "pulled" states #55

Merged
merged 2 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions docs/entity_states.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ Links contain and operate on entities. A specific entity is unique within a link

## States
Each entity is in one of the following states at any given time:
* Idle: This is the default state that entities start in.
* Unshared: This is the default state that entities start in.
* Activated: The entity is in the process of being pulled/deleted to/from the local side. It is only present in the source side of the link.
* Received: The entity is in the process of being pulled/deleted to/from the local side. It is present in both sides of the link.
* Pulled: The entity has been copied from the source to the local side.
* Shared: The entity has been copied from the source to the local side.
* Tainted: The entity was flagged by the source side indicating to the local side to delete it.
* Deprecated: The entity was flagged by the source side and subsequently deleted by the local side.

Expand All @@ -15,19 +15,19 @@ The following state diagram shows the different states that entities can be in a

```mermaid
stateDiagram-v2
[*] --> Idle
Idle --> Activated: pulled / start pull process
[*] --> Unshared
Unshared --> Activated: pulled / start pull process
Activated --> Received: processed [in pull process and not flagged] / add to local
Received --> Pulled: processed [in pull process and not flagged] / finish pull process
Received --> Shared: processed [in pull process and not flagged] / finish pull process
Received --> Tainted: processed [in pull process and flagged] / finish pull process
Pulled --> Received: deleted / start delete process
Shared --> Received: deleted / start delete process
Received --> Activated: processed [in delete process] / remove from local
Activated --> Idle: processed [in delete process and not flagged] / finish delete process
Pulled --> Tainted: flagged
Tainted --> Pulled: unflagged
Activated --> Unshared: processed [in delete process and not flagged] / finish delete process
Shared --> Tainted: flagged
Tainted --> Shared: unflagged
Tainted --> Received: deleted / start delete process
Activated --> Deprecated: processed [flagged] / deprecate
Deprecated --> Idle: unflagged
Deprecated --> Unshared: unflagged
```

The diagram adheres to the following rule to avoid entities with invalid states due to interruptions (e.g. connection losses):
Expand All @@ -39,7 +39,7 @@ Not following this rule can lead to entities in invalid states due to modifying
The `pulled`, `processed` and `deleted` events are triggered by the application, whereas the `flagged` and `unflagged` events are triggered by the source side directly by modifying the persistent data. The `flagged` and `unflagged` events are also not associated with activities for the same reason.

## Processes
Idle entities can be pulled from the source side into the local side and once they are pulled they can be deleted from the local side. Activated and received entities are currently undergoing one of these two processes. The name of the specific process is associated with entities that are in the aforementioned states. This allows us to correctly transition these entities. For example without associating the process with the entity we would not be able to determine whether an activated entity should become a received one (pull) or an idle one (delete).
Unshared entities can be pulled from the source side into the local side and once they are shared they can be deleted from the local side. Activated and received entities are currently undergoing one of these two processes. The name of the specific process is associated with entities that are in the aforementioned states. This allows us to correctly transition these entities. For example without associating the process with the entity we would not be able to determine whether an activated entity should become a received one (pull) or an unshared one (delete).

## Persistence

Expand All @@ -50,10 +50,10 @@ The following table illustrates the chosen mapping:

| In source | In outbound | In local | Has process | Is flagged | State |
|--------------------|--------------------|--------------------|--------------------|--------------------------|------------|
| :white_check_mark: | :x: | :x: | :x: | :x: | Idle |
| :white_check_mark: | :x: | :x: | :x: | :x: | Unshared |
| :white_check_mark: | :white_check_mark: | :x: | :white_check_mark: | :white_check_mark: / :x: | Activated |
| :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: / :x: | Received |
| :white_check_mark: | :white_check_mark: | :white_check_mark: | :x: | :x: | Pulled |
| :white_check_mark: | :white_check_mark: | :white_check_mark: | :x: | :x: | Shared |
| :white_check_mark: | :white_check_mark: | :white_check_mark: | :x: | :white_check_mark: | Tainted |
| :white_check_mark: | :white_check_mark: | :x: | :x: | :white_check_mark: | Deprecated |

Expand Down
6 changes: 3 additions & 3 deletions link/adapters/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ def delete(self, primary_keys: Iterable[PrimaryKey]) -> None:
"""Execute the delete use-case."""
self._message_bus.handle(commands.DeleteEntities(frozenset(self._translator.to_identifiers(primary_keys))))

def list_idle_entities(self) -> None:
"""Execute the use-case that lists idle entities."""
self._message_bus.handle(commands.ListIdleEntities())
def list_unshared_entities(self) -> None:
"""Execute the use-case that lists unshared entities."""
self._message_bus.handle(commands.ListUnsharedEntities())
10 changes: 5 additions & 5 deletions link/adapters/present.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
from .identification import IdentificationTranslator


def create_idle_entities_updater(
def create_unshared_entities_updater(
translator: IdentificationTranslator, update: Callable[[Iterable[PrimaryKey]], None]
) -> Callable[[events.IdleEntitiesListed], None]:
"""Create a callable that when called updates the list of idle entities."""
) -> Callable[[events.UnsharedEntitiesListed], None]:
"""Create a callable that when called updates the list of unshared entities."""

def update_idle_entities(response: events.IdleEntitiesListed) -> None:
def update_unshared_entities(response: events.UnsharedEntitiesListed) -> None:
update(translator.to_primary_key(identifier) for identifier in response.identifiers)

return update_idle_entities
return update_unshared_entities


def create_state_change_logger(
Expand Down
2 changes: 1 addition & 1 deletion link/domain/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ class DeleteEntities(Command):


@dataclass(frozen=True)
class ListIdleEntities(Command):
class ListUnsharedEntities(Command):
"""Start the delete process for the requested entities."""
4 changes: 2 additions & 2 deletions link/domain/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class StateChanged(OperationApplied):


@dataclass(frozen=True)
class IdleEntitiesListed(Event):
"""Idle entities in a link have been listed."""
class UnsharedEntitiesListed(Event):
"""Unshared entities in a link have been listed."""

identifiers: frozenset[Identifier]

Expand Down
8 changes: 4 additions & 4 deletions link/domain/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Iterable, Iterator, Mapping, Optional, Set, TypeVar

from .custom_types import Identifier
from .state import STATE_MAP, Components, Entity, Idle, PersistentState, Processes, State
from .state import STATE_MAP, Components, Entity, PersistentState, Processes, State, Unshared


def create_link(
Expand Down Expand Up @@ -100,9 +100,9 @@ def __getitem__(self, identifier: Identifier) -> Entity:
except StopIteration as error:
raise KeyError("Requested entity not present in link") from error

def list_idle_entities(self) -> frozenset[Identifier]:
"""List the identifiers of all idle entities in the link."""
return frozenset(entity.identifier for entity in self if entity.state is Idle)
def list_unshared_entities(self) -> frozenset[Identifier]:
"""List the identifiers of all unshared entities in the link."""
return frozenset(entity.identifier for entity in self if entity.state is Unshared)

def __contains__(self, entity: object) -> bool:
"""Check if the link contains the given entity."""
Expand Down
24 changes: 12 additions & 12 deletions link/domain/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def register(self, state: type[State]) -> None:
states = States()


class Idle(State):
class Unshared(State):
"""The default state of an entity."""

@classmethod
Expand All @@ -74,7 +74,7 @@ def start_pull(cls, entity: Entity) -> None:
return cls._transition_entity(entity, Operations.START_PULL, Activated, new_process=Processes.PULL)


states.register(Idle)
states.register(Unshared)


class Activated(State):
Expand All @@ -89,7 +89,7 @@ def process(cls, entity: Entity) -> None:
elif entity.current_process is Processes.PULL:
return transition_entity(Received)
elif entity.current_process is Processes.DELETE:
return transition_entity(Idle, new_process=Processes.NONE)
return transition_entity(Unshared, new_process=Processes.NONE)
raise RuntimeError


Expand All @@ -107,7 +107,7 @@ def process(cls, entity: Entity) -> None:
if entity.is_tainted:
return transition_entity(Tainted, new_process=Processes.NONE)
else:
return transition_entity(Pulled, new_process=Processes.NONE)
return transition_entity(Shared, new_process=Processes.NONE)
elif entity.current_process is Processes.DELETE:
return transition_entity(Activated)
raise RuntimeError
Expand All @@ -116,7 +116,7 @@ def process(cls, entity: Entity) -> None:
states.register(Received)


class Pulled(State):
class Shared(State):
"""The state of an entity that has been copied to the local side."""

@classmethod
Expand All @@ -125,7 +125,7 @@ def start_delete(cls, entity: Entity) -> None:
return cls._transition_entity(entity, Operations.START_DELETE, Received, new_process=Processes.DELETE)


states.register(Pulled)
states.register(Shared)


class Tainted(State):
Expand Down Expand Up @@ -172,14 +172,14 @@ class Commands(Enum):


TRANSITION_MAP: dict[Transition, Commands] = {
Transition(Idle, Activated): Commands.START_PULL_PROCESS,
Transition(Unshared, Activated): Commands.START_PULL_PROCESS,
Transition(Activated, Received): Commands.ADD_TO_LOCAL,
Transition(Activated, Idle): Commands.FINISH_DELETE_PROCESS,
Transition(Activated, Unshared): Commands.FINISH_DELETE_PROCESS,
Transition(Activated, Deprecated): Commands.DEPRECATE,
Transition(Received, Pulled): Commands.FINISH_PULL_PROCESS,
Transition(Received, Shared): Commands.FINISH_PULL_PROCESS,
Transition(Received, Tainted): Commands.FINISH_PULL_PROCESS,
Transition(Received, Activated): Commands.REMOVE_FROM_LOCAL,
Transition(Pulled, Received): Commands.START_DELETE_PROCESS,
Transition(Shared, Received): Commands.START_DELETE_PROCESS,
Transition(Tainted, Received): Commands.START_DELETE_PROCESS,
}

Expand Down Expand Up @@ -222,7 +222,7 @@ class PersistentState:
frozenset({Components.SOURCE}),
is_tainted=False,
has_process=False,
): Idle,
): Unshared,
PersistentState(
frozenset({Components.SOURCE, Components.OUTBOUND}),
is_tainted=False,
Expand All @@ -247,7 +247,7 @@ class PersistentState:
frozenset({Components.SOURCE, Components.OUTBOUND, Components.LOCAL}),
is_tainted=False,
has_process=False,
): Pulled,
): Shared,
PersistentState(
frozenset({Components.SOURCE, Components.OUTBOUND, Components.LOCAL}),
is_tainted=True,
Expand Down
14 changes: 8 additions & 6 deletions link/infrastructure/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from link.adapters.custom_types import PrimaryKey
from link.adapters.gateway import DJLinkGateway
from link.adapters.identification import IdentificationTranslator
from link.adapters.present import create_idle_entities_updater, create_state_change_logger
from link.adapters.present import create_state_change_logger, create_unshared_entities_updater
from link.adapters.progress import DJProgressDisplayAdapter
from link.domain import commands, events
from link.service.handlers import (
Expand All @@ -20,7 +20,7 @@
inform_batch_processing_started,
inform_current_process_finished,
inform_next_process_started,
list_idle_entities,
list_unshared_entities,
log_state_change,
pull,
pull_entity,
Expand Down Expand Up @@ -59,7 +59,9 @@ def inner(obj: type) -> Any:
gateway = DJLinkGateway(facade, translator)
uow = UnitOfWork(gateway)
source_restriction: IterationCallbackList[PrimaryKey] = IterationCallbackList()
idle_entities_updater = create_idle_entities_updater(translator, create_content_replacer(source_restriction))
unshared_entities_updater = create_unshared_entities_updater(
translator, create_content_replacer(source_restriction)
)
logger = logging.getLogger(obj.__name__)

command_handlers: CommandHandlers = {}
Expand All @@ -69,8 +71,8 @@ def inner(obj: type) -> Any:
command_handlers[commands.DeleteEntity] = partial(delete_entity, uow=uow, message_bus=bus)
command_handlers[commands.PullEntities] = partial(pull, message_bus=bus)
command_handlers[commands.DeleteEntities] = partial(delete, message_bus=bus)
command_handlers[commands.ListIdleEntities] = partial(
list_idle_entities, uow=uow, output_port=idle_entities_updater
command_handlers[commands.ListUnsharedEntities] = partial(
list_unshared_entities, uow=uow, output_port=unshared_entities_updater
)
progress_view = TQDMProgressView()
display = DJProgressDisplayAdapter(translator, progress_view)
Expand All @@ -84,7 +86,7 @@ def inner(obj: type) -> Any:
event_handlers[events.InvalidOperationRequested] = [lambda event: None]

controller = DJController(bus, translator)
source_restriction.callback = controller.list_idle_entities
source_restriction.callback = controller.list_unshared_entities

return create_local_endpoint(controller, tables, source_restriction, progress_view)

Expand Down
4 changes: 2 additions & 2 deletions link/infrastructure/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class SourceEndpoint(Table):
_progress_view: ProgressView

def pull(self, *, display_progress: bool = False) -> None:
"""Pull idle entities from the source table into the local table."""
"""Pull unshared entities from the source table into the local table."""
if display_progress:
self._progress_view.enable()
primary_keys = self.proj().fetch(as_dict=True)
Expand Down Expand Up @@ -70,7 +70,7 @@ class LocalEndpoint(Table):
_progress_view: ProgressView

def delete(self, *, display_progress: bool = False) -> None:
"""Delete pulled entities from the local table."""
"""Delete shared entities from the local table."""
if display_progress:
self._progress_view.enable()
primary_keys = self.proj().fetch(as_dict=True)
Expand Down
16 changes: 8 additions & 8 deletions link/service/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def pull_entity(command: commands.PullEntity, *, uow: UnitOfWork, message_bus: M


def delete_entity(command: commands.DeleteEntity, *, uow: UnitOfWork, message_bus: MessageBus) -> None:
"""Delete a pulled entity."""
"""Delete a shared entity."""
message_bus.handle(events.ProcessStarted(Processes.DELETE, command.requested))
with uow:
uow.link[command.requested].delete()
Expand All @@ -38,23 +38,23 @@ def pull(command: commands.PullEntities, *, message_bus: MessageBus) -> None:


def delete(command: commands.DeleteEntities, *, message_bus: MessageBus) -> None:
"""Delete pulled entities."""
"""Delete shared entities."""
message_bus.handle(events.BatchProcessingStarted(Processes.DELETE, command.requested))
for identifier in command.requested:
message_bus.handle(commands.DeleteEntity(identifier))
message_bus.handle(events.BatchProcessingFinished(Processes.DELETE, command.requested))


def list_idle_entities(
command: commands.ListIdleEntities,
def list_unshared_entities(
command: commands.ListUnsharedEntities,
*,
uow: UnitOfWork,
output_port: Callable[[events.IdleEntitiesListed], None],
output_port: Callable[[events.UnsharedEntitiesListed], None],
) -> None:
"""List all idle entities."""
"""List all unshared entities."""
with uow:
idle = uow.link.list_idle_entities()
output_port(events.IdleEntitiesListed(idle))
unshared = uow.link.list_unshared_entities()
output_port(events.UnsharedEntitiesListed(unshared))


def log_state_change(event: events.StateChanged, log: Callable[[events.StateChanged], None]) -> None:
Expand Down
Loading