From 7954af2ba5681840b2d0e87bc74a2ca70c84fecc Mon Sep 17 00:00:00 2001 From: Tomperez98 Date: Wed, 8 Jan 2025 13:27:17 -0500 Subject: [PATCH] Simplify imports --- src/resonate/__init__.py | 12 +++++++++-- src/resonate/dataclasses.py | 8 +++++--- src/resonate/handle.py | 20 ++++++++++++++++++ src/resonate/promise.py | 12 +++++++++++ src/resonate/record.py | 29 +++++---------------------- src/resonate/resonate.py | 2 +- src/resonate/scheduler/scheduler.py | 25 ++++++++++++++--------- src/resonate/scheduler/traits.py | 2 +- src/resonate/stores/__init__.py | 9 +++++++++ src/resonate/task_sources/__init__.py | 5 +++++ src/resonate/typing.py | 2 +- tests/test_functionality.py | 10 ++++----- 12 files changed, 89 insertions(+), 47 deletions(-) create mode 100644 src/resonate/handle.py create mode 100644 src/resonate/promise.py diff --git a/src/resonate/__init__.py b/src/resonate/__init__.py index 07cff42e..516ee982 100644 --- a/src/resonate/__init__.py +++ b/src/resonate/__init__.py @@ -1,5 +1,13 @@ from __future__ import annotations -from . import random +from resonate.dataclasses import DurablePromise +from resonate.handle import Handle +from resonate.promise import Promise +from resonate.resonate import Resonate -__all__ = ["random"] +__all__ = [ + "Resonate", + "Handle", + "DurablePromise", + "Promise", +] diff --git a/src/resonate/dataclasses.py b/src/resonate/dataclasses.py index 71c95ab2..e9c4e13d 100644 --- a/src/resonate/dataclasses.py +++ b/src/resonate/dataclasses.py @@ -6,12 +6,14 @@ from typing_extensions import ParamSpec, assert_never +from resonate.promise import Promise from resonate.result import Err, Ok, Result if TYPE_CHECKING: from collections.abc import Generator - from resonate.record import Handle, Record + from resonate.handle import Handle + from resonate.record import Record from resonate.scheduler.traits import IScheduler from resonate.typing import Data, DurableCoro, DurableFn, Headers, Tags, Yieldable @@ -93,7 +95,7 @@ def _send(self, v: T) -> Yieldable: self._next_child_to_yield += 1 if child.done(): continue - return child.promise + return Promise[Any](child.id) assert all(child.done() for child in self._record.children) assert not self._coro_active @@ -117,7 +119,7 @@ def _throw(self, error: Exception) -> Yieldable: self._next_child_to_yield += 1 if child.done(): continue - return child.promise + return Promise[Any](child.id) assert all( child.done() for child in self._record.children ), "All children promise must have been resolved." diff --git a/src/resonate/handle.py b/src/resonate/handle.py new file mode 100644 index 00000000..505789b2 --- /dev/null +++ b/src/resonate/handle.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Generic, TypeVar, final + +if TYPE_CHECKING: + from concurrent.futures import Future + + +T = TypeVar("T") + + +@final +@dataclass(frozen=True) +class Handle(Generic[T]): + id: str + f: Future[T] = field(repr=False) + + def result(self, timeout: float | None = None) -> T: + return self.f.result(timeout=timeout) diff --git a/src/resonate/promise.py b/src/resonate/promise.py new file mode 100644 index 00000000..f92b157f --- /dev/null +++ b/src/resonate/promise.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Generic, TypeVar, final + +T = TypeVar("T") + + +@final +@dataclass(frozen=True) +class Promise(Generic[T]): + id: str diff --git a/src/resonate/record.py b/src/resonate/record.py index e4e8fe3d..6ae432e3 100644 --- a/src/resonate/record.py +++ b/src/resonate/record.py @@ -2,7 +2,6 @@ from asyncio import iscoroutinefunction from concurrent.futures import Future -from dataclasses import dataclass, field from inspect import isfunction, isgeneratorfunction from typing import TYPE_CHECKING, Any, Generic, TypeVar, final @@ -25,22 +24,6 @@ T = TypeVar("T") -@final -@dataclass(frozen=True) -class Promise(Generic[T]): - id: str - - -@final -@dataclass(frozen=True) -class Handle(Generic[T]): - id: str - _f: Future[T] = field(repr=False) - - def result(self, timeout: float | None = None) -> T: - return self._f.result(timeout=timeout) - - @final class Record(Generic[T]): def __init__( @@ -55,7 +38,7 @@ def __init__( self.is_root: bool = ( True if self.parent is None else isinstance(invocation, RFI) ) - self._f = Future[T]() + self.f = Future[T]() self.children: list[Record[Any]] = [] self.invocation: LFI | RFI = invocation self.retry_policy: retry_policy.RetryPolicy | None @@ -77,8 +60,6 @@ def __init__( ) self._attempt: int = 1 - self.promise = Promise[T](id=id) - self.handle = Handle[T](id=self.id, _f=self._f) self.durable_promise: DurablePromiseRecord | None = None self._task: TaskRecord | None = None self.ctx = ctx @@ -168,21 +149,21 @@ def set_result(self, result: Result[T, Exception], *, deduping: bool) -> None: r.done() for r in self.children ), "All children record must be completed." if isinstance(result, Ok): - self._f.set_result(result.unwrap()) + self.f.set_result(result.unwrap()) elif isinstance(result, Err): - self._f.set_exception(result.err()) + self.f.set_exception(result.err()) else: assert_never(result) def safe_result(self) -> Result[Any, Exception]: assert self.done() try: - return Ok(self._f.result()) + return Ok(self.f.result()) except Exception as e: # noqa: BLE001 return Err(e) def done(self) -> bool: - return self._f.done() + return self.f.done() def next_child_name(self) -> str: return f"{self.id}.{self._num_children+1}" diff --git a/src/resonate/resonate.py b/src/resonate/resonate.py index a6826f36..b9dbe8a4 100644 --- a/src/resonate/resonate.py +++ b/src/resonate/resonate.py @@ -20,7 +20,7 @@ from resonate import retry_policy from resonate.context import Context - from resonate.record import Handle + from resonate.handle import Handle from resonate.scheduler.traits import IScheduler from resonate.stores.local import LocalStore from resonate.task_sources.traits import ITaskSource diff --git a/src/resonate/scheduler/scheduler.py b/src/resonate/scheduler/scheduler.py index 56b442dd..08ca80f5 100644 --- a/src/resonate/scheduler/scheduler.py +++ b/src/resonate/scheduler/scheduler.py @@ -32,10 +32,12 @@ ResonateCoro, ) from resonate.encoders import JsonEncoder +from resonate.handle import Handle from resonate.logging import logger from resonate.processor.processor import Processor +from resonate.promise import Promise from resonate.queue import DelayQueue -from resonate.record import Promise, Record +from resonate.record import Record from resonate.result import Err, Ok, Result from resonate.scheduler.traits import IScheduler from resonate.stores.record import ( @@ -49,7 +51,6 @@ if TYPE_CHECKING: from resonate.collections import FunctionRegistry from resonate.dependencies import Dependencies - from resonate.record import Handle from resonate.stores.local import LocalStore from resonate.stores.record import TaskRecord from resonate.task_sources.traits import ITaskSource @@ -117,7 +118,7 @@ def run( # If there's already a record with this ID, dedup. record = self._records.get(id) if record is not None: - return record.handle + return Handle[T](record.id, record.f) # Get function name from registry fn_name = self._registry.get_from_value(func) @@ -162,7 +163,7 @@ def run( else: self._cmd_queue.put(Invoke(record.id)) - return record.handle + return Handle[T](record.id, record.f) def _heartbeat(self) -> None: assert isinstance(self._store, RemoteStore) @@ -612,7 +613,9 @@ def _process_rfi(self, record: Record[Any], rfi: RFI) -> list[Command]: child_record = self._records.get(child_id) if child_record is not None: record.add_child(child_record) - loopbacks.extend(self._handle_continue(record.id, Ok(child_record.promise))) + loopbacks.extend( + self._handle_continue(record.id, Ok(Promise[Any](child_record.id))) + ) else: child_record = record.create_child(id=child_id, invocation=rfi) self._records[child_id] = child_record @@ -636,7 +639,9 @@ def _process_rfi(self, record: Record[Any], rfi: RFI) -> list[Command]: if durable_promise.is_completed(): value = durable_promise.get_value(self._encoder) child_record.set_result(value, deduping=True) - loopbacks.extend(self._handle_continue(record.id, Ok(child_record.promise))) + loopbacks.extend( + self._handle_continue(record.id, Ok(Promise[Any](child_record.id))) + ) return loopbacks @@ -646,7 +651,9 @@ def _process_lfi(self, record: Record[Any], lfi: LFI) -> list[Command]: child_record = self._records.get(child_id) if child_record is not None: record.add_child(child_record) - loopbacks.extend(self._handle_continue(record.id, Ok(child_record.promise))) + loopbacks.extend( + self._handle_continue(record.id, Ok(Promise[Any](child_record.id))) + ) else: child_record = record.create_child(id=child_id, invocation=lfi) self._records[child_id] = child_record @@ -672,12 +679,12 @@ def _process_lfi(self, record: Record[Any], lfi: LFI) -> list[Command]: else: loopbacks.append(Invoke(child_id)) loopbacks.extend( - self._handle_continue(record.id, Ok(child_record.promise)) + self._handle_continue(record.id, Ok(Promise[Any](child_record.id))) ) else: loopbacks.append(Invoke(child_id)) loopbacks.extend( - self._handle_continue(record.id, Ok(child_record.promise)) + self._handle_continue(record.id, Ok(Promise[Any](child_record.id))) ) return loopbacks diff --git a/src/resonate/scheduler/traits.py b/src/resonate/scheduler/traits.py index 11858cfd..72b99d68 100644 --- a/src/resonate/scheduler/traits.py +++ b/src/resonate/scheduler/traits.py @@ -6,7 +6,7 @@ from typing_extensions import ParamSpec if TYPE_CHECKING: - from resonate.record import Handle + from resonate.handle import Handle from resonate.typing import DurableCoro, DurableFn P = ParamSpec("P") diff --git a/src/resonate/stores/__init__.py b/src/resonate/stores/__init__.py index e69de29b..37b604fc 100644 --- a/src/resonate/stores/__init__.py +++ b/src/resonate/stores/__init__.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from .local import LocalStore +from .remote import RemoteStore + +__all__ = [ + "LocalStore", + "RemoteStore", +] diff --git a/src/resonate/task_sources/__init__.py b/src/resonate/task_sources/__init__.py index e69de29b..93d2cc08 100644 --- a/src/resonate/task_sources/__init__.py +++ b/src/resonate/task_sources/__init__.py @@ -0,0 +1,5 @@ +from __future__ import annotations + +from .poller import Poller + +__all__ = ["Poller"] diff --git a/src/resonate/typing.py b/src/resonate/typing.py index 1c5961a1..9e3cc832 100644 --- a/src/resonate/typing.py +++ b/src/resonate/typing.py @@ -13,7 +13,7 @@ RFI, ) from resonate.context import Context -from resonate.record import Promise +from resonate.promise import Promise T = TypeVar("T") P = ParamSpec("P") diff --git a/tests/test_functionality.py b/tests/test_functionality.py index da1d1173..bffd13f5 100644 --- a/tests/test_functionality.py +++ b/tests/test_functionality.py @@ -9,14 +9,12 @@ import pytest -from resonate.dataclasses import DurablePromise -from resonate.record import Handle, Promise -from resonate.resonate import Resonate +from resonate import DurablePromise, Handle, Resonate +from resonate.promise import Promise from resonate.retry_policy import constant, exponential, linear, never -from resonate.stores.local import LocalStore -from resonate.stores.remote import RemoteStore +from resonate.stores import LocalStore, RemoteStore from resonate.targets import poll -from resonate.task_sources.poller import Poller +from resonate.task_sources import Poller if TYPE_CHECKING: from collections.abc import Generator