Skip to content

Commit

Permalink
Remove top lvl race condition (#112)
Browse files Browse the repository at this point in the history
* Make RESONATE GROUP configurable through env variables

* Simplify imports

* pin rye version

* Expose execute here option

* Remove race condition

* Change API to not use recv

* Allow to import ctx from top lvl

* fix subscribe typo

* Order helper methods on scheduler

* Fix typo error and pass pid to start methods

* rename queue to delay queue

* rename onboard to forkorjoin

* fix typo loopback
  • Loading branch information
Tomperez98 authored Jan 13, 2025
1 parent 95a3be3 commit 282c84b
Show file tree
Hide file tree
Showing 28 changed files with 439 additions and 322 deletions.
1 change: 1 addition & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
- uses: eifinger/setup-rye@v2
with:
enable-cache: true
version: '0.40.0'

- name: Sync dependencies
run: rye sync --no-lock
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ ignore = [
"ANN101",
"COM812",
"ISC001",
"S101"
"S101",
"ARG002",
"ARG003",
]
logger-objects = ["resonate.logging.logger"]

Expand Down
8 changes: 6 additions & 2 deletions src/resonate/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from __future__ import annotations

from . import random
from resonate.context import Context
from resonate.dataclasses import DurablePromise
from resonate.handle import Handle
from resonate.promise import Promise
from resonate.resonate import Resonate

__all__ = ["random"]
__all__ = ["Resonate", "Handle", "DurablePromise", "Promise", "Context"]
20 changes: 16 additions & 4 deletions src/resonate/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@ class RFI:
unit: Invocation[Any] | DurablePromise
opts: Options = field(default=Options())

def options(self, id: str | None = None, send_to: str | None = None) -> Self:
def options(
self,
*,
id: str | None = None,
send_to: str | None = None,
execute_here: bool = False,
) -> Self:
assert not isinstance(self.unit, DurablePromise), _ASSERT_MSG
self.opts = Options(id=id, send_to=send_to)
self.opts = Options(id=id, send_to=send_to, execute_here=execute_here)
return self


Expand All @@ -37,9 +43,15 @@ class RFC:
unit: Invocation[Any] | DurablePromise
opts: Options = field(default=Options())

def options(self, id: str | None = None, send_to: str | None = None) -> Self:
def options(
self,
*,
id: str | None = None,
send_to: str | None = None,
execute_here: bool = False,
) -> Self:
assert not isinstance(self.unit, DurablePromise), _ASSERT_MSG
self.opts = Options(id=id, send_to=send_to)
self.opts = Options(id=id, send_to=send_to, execute_here=execute_here)
return self

def to_rfi(self) -> RFI:
Expand Down
25 changes: 24 additions & 1 deletion src/resonate/cmd_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from typing_extensions import TypeAlias

if TYPE_CHECKING:
from resonate.dataclasses import Invocation
from resonate.handle import Handle
from resonate.result import Result
from resonate.stores.record import TaskRecord

Expand All @@ -16,6 +18,25 @@ class Invoke:
id: str


@dataclass(frozen=True)
class ForkOrJoin:
id: str
handle: Handle[Any]
invocation: Invocation[Any]


@dataclass(frozen=True)
class Subscribe:
id: str
handle: Handle[Any]


@dataclass(frozen=True)
class Notify:
id: str
value: Result[Any, Exception]


@dataclass(frozen=True)
class Resume:
id: str
Expand All @@ -33,5 +54,7 @@ class Claim:
record: TaskRecord


Command: TypeAlias = Union[Invoke, Resume, Complete, Claim]
Command: TypeAlias = Union[
Invoke, Resume, Complete, Claim, Subscribe, Notify, ForkOrJoin
]
CommandQ: TypeAlias = Queue[Union[Command, None]]
11 changes: 8 additions & 3 deletions src/resonate/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@

from typing_extensions import ParamSpec, assert_never

from resonate.promise import Promise
from resonate.result import Err, Ok, Result

if TYPE_CHECKING:
from collections.abc import Generator

from resonate.record import Handle, Record
from resonate.handle import Handle
from resonate.record import Record
from resonate.scheduler.traits import IScheduler
from resonate.typing import Data, DurableCoro, DurableFn, Headers, Tags, Yieldable

Expand All @@ -37,6 +39,9 @@ def __init__(
def run(self, id: str, *args: P.args, **kwargs: P.kwargs) -> Handle[T]:
return self._scheduler.run(id, self.fn, *args, **kwargs)

def get(self, id: str) -> Handle[T]:
return self._scheduler.get(id)


@final
@dataclass(frozen=True)
Expand Down Expand Up @@ -93,7 +98,7 @@ def _send(self, v: T) -> Yieldable:
self._next_child_to_yield += 1
if child.done():
continue
return child.promise
return Promise[Any](child.id)

assert all(child.done() for child in self._record.children)
assert not self._coro_active
Expand All @@ -117,7 +122,7 @@ def _throw(self, error: Exception) -> Yieldable:
self._next_child_to_yield += 1
if child.done():
continue
return child.promise
return Promise[Any](child.id)
assert all(
child.done() for child in self._record.children
), "All children promise must have been resolved."
Expand Down
2 changes: 1 addition & 1 deletion src/resonate/queue.py → src/resonate/delay_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self) -> None:
self._delayed: list[tuple[float, Invoke]] = []
self._worker_thread: Thread | None = None

def start(self, cmd_queue: CommandQ) -> None:
def start(self, cmd_queue: CommandQ, pid: str) -> None:
assert self._worker_thread is None, "Already been started."
self._worker_thread = Thread(target=self._run, args=(cmd_queue,), daemon=True)
self._worker_thread.start()
Expand Down
29 changes: 29 additions & 0 deletions src/resonate/handle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from __future__ import annotations

from concurrent.futures import Future
from dataclasses import dataclass, field
from typing import Generic, TypeVar, final

from typing_extensions import assert_never

from resonate.result import Err, Ok, Result

T = TypeVar("T")


@final
@dataclass(frozen=True)
class Handle(Generic[T]):
id: str
f: Future[T] = field(repr=False, default_factory=Future, init=False)

def result(self, timeout: float | None = None) -> T:
return self.f.result(timeout=timeout)

def set_result(self, result: Result[T, Exception]) -> None:
if isinstance(result, Ok):
self.f.set_result(result.unwrap())
elif isinstance(result, Err):
self.f.set_exception(result.err())
else:
assert_never(result)
22 changes: 8 additions & 14 deletions src/resonate/options.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, final

if TYPE_CHECKING:
from resonate import retry_policy


@final
@dataclass
class Options:
def __init__(
self,
*,
id: str | None = None,
durable: bool = True,
send_to: str | None = None,
retry_policy: retry_policy.RetryPolicy | None = None,
version: int = 1,
) -> None:
self.durable = durable
self.id = id
self.retry_policy = retry_policy
self.send_to = send_to
self.version = version
id: str | None = None
durable: bool = True
send_to: str | None = None
retry_policy: retry_policy.RetryPolicy | None = None
version: int = 1
execute_here: bool = True
4 changes: 2 additions & 2 deletions src/resonate/processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from typing import TYPE_CHECKING, Any

from resonate.cmd_queue import CommandQ, Complete
from resonate.delay_queue import Queue
from resonate.processor.traits import IProcessor
from resonate.queue import Queue
from resonate.result import Err, Ok, Result

if TYPE_CHECKING:
Expand All @@ -24,7 +24,7 @@ def __init__(

self._sq: Queue[SQE[Any]] = Queue()

def start(self, cmd_queue: CommandQ) -> None:
def start(self, cmd_queue: CommandQ, pid: str) -> None:
for _ in range(self._workers):
t = Thread(target=self._run, args=(cmd_queue,), daemon=True)
self._threads.add(t)
Expand Down
12 changes: 12 additions & 0 deletions src/resonate/promise.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Generic, TypeVar, final

T = TypeVar("T")


@final
@dataclass(frozen=True)
class Promise(Generic[T]):
id: str
43 changes: 7 additions & 36 deletions src/resonate/record.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
from __future__ import annotations

from asyncio import iscoroutinefunction
from concurrent.futures import Future
from dataclasses import dataclass, field
from inspect import isfunction, isgeneratorfunction
from typing import TYPE_CHECKING, Any, Generic, TypeVar, final

from typing_extensions import assert_never

from resonate.actions import LFI, RFI
from resonate.dataclasses import Invocation
from resonate.logging import logger
from resonate.result import Err, Ok, Result
from resonate.result import Ok, Result
from resonate.retry_policy import Never, exponential, never
from resonate.stores.record import DurablePromiseRecord, TaskRecord

Expand All @@ -25,22 +21,6 @@
T = TypeVar("T")


@final
@dataclass(frozen=True)
class Promise(Generic[T]):
id: str


@final
@dataclass(frozen=True)
class Handle(Generic[T]):
id: str
_f: Future[T] = field(repr=False)

def result(self, timeout: float | None = None) -> T:
return self._f.result(timeout=timeout)


@final
class Record(Generic[T]):
def __init__(
Expand All @@ -55,7 +35,7 @@ def __init__(
self.is_root: bool = (
True if self.parent is None else isinstance(invocation, RFI)
)
self._f = Future[T]()
self._result: Result[T, Exception] | None = None
self.children: list[Record[Any]] = []
self.invocation: LFI | RFI = invocation
self.retry_policy: retry_policy.RetryPolicy | None
Expand All @@ -77,8 +57,6 @@ def __init__(
)

self._attempt: int = 1
self.promise = Promise[T](id=id)
self.handle = Handle[T](id=self.id, _f=self._f)
self.durable_promise: DurablePromiseRecord | None = None
self._task: TaskRecord | None = None
self.ctx = ctx
Expand Down Expand Up @@ -167,22 +145,15 @@ def set_result(self, result: Result[T, Exception], *, deduping: bool) -> None:
assert all(
r.done() for r in self.children
), "All children record must be completed."
if isinstance(result, Ok):
self._f.set_result(result.unwrap())
elif isinstance(result, Err):
self._f.set_exception(result.err())
else:
assert_never(result)
assert self._result is None
self._result = result

def safe_result(self) -> Result[Any, Exception]:
assert self.done()
try:
return Ok(self._f.result())
except Exception as e: # noqa: BLE001
return Err(e)
assert self._result is not None
return self._result

def done(self) -> bool:
return self._f.done()
return self._result is not None

def next_child_name(self) -> str:
return f"{self.id}.{self._num_children+1}"
Expand Down
2 changes: 1 addition & 1 deletion src/resonate/resonate.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from resonate import retry_policy
from resonate.context import Context
from resonate.record import Handle
from resonate.handle import Handle
from resonate.scheduler.traits import IScheduler
from resonate.stores.local import LocalStore
from resonate.task_sources.traits import ITaskSource
Expand Down
Loading

0 comments on commit 282c84b

Please sign in to comment.