Skip to content

Commit

Permalink
Create ExitManager
Browse files Browse the repository at this point in the history
ExistManager is responsible for stopping the application once
a certain number of messages arrives or a timeout is reached.
  • Loading branch information
gwaramadze committed Jan 27, 2025
1 parent 08177bb commit f730071
Showing 1 changed file with 61 additions and 3 deletions.
64 changes: 61 additions & 3 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,49 @@ def __call__(
) -> TopicManager: ...


class ExitManager:
def __init__(self, number: Optional[int] = None, timeout: Optional[int] = None):
"""
Initialize the exit manager with either a message count limit, timeout, or both.
The `should_exit` function will be called for every message processed, so it needs
to be highly optimized. To ensure optimal performance, the logic for determining
which checks to perform is handled in `__init__` rather than in `should_exit` itself.
Args:
number: Maximum number of messages to process before exiting
timeout: Maximum time in seconds to run before exiting
"""
if number is not None and timeout is not None:
self._number = number
self._timeout = timeout
self._counter = 0
self._start = time.monotonic()
self.should_exit = self._check_number_and_timeout
elif number is not None:
self._number = number
self._counter = 0
self.should_exit = self._check_number
elif timeout is not None:
self._timeout = timeout
self._start = time.monotonic()
self.should_exit = self._check_timeout
else:
self.should_exit = lambda: False

def _check_number(self):
if self._counter >= self._number:
return True
self._counter += 1
return False

def _check_timeout(self):
return (time.monotonic() - self._start) > self._timeout

def _check_number_and_timeout(self):
return self._check_number() or self._check_timeout()


class Application:
"""
The main Application class.
Expand Down Expand Up @@ -345,6 +388,7 @@ def __init__(
pausing_manager=self._pausing_manager,
)
self._dataframe_registry = DataframeRegistry()
self._exit_manager = ExitManager()

@property
def config(self) -> "ApplicationConfig":
Expand Down Expand Up @@ -546,6 +590,14 @@ def stop(self, fail: bool = False):
if self._state_manager.using_changelogs:
self._state_manager.stop_recovery()

def running(self):
if not self._running:
return False
elif self._exit_manager.should_exit():
self.stop()
return False
return True

def _get_rowproducer(
self,
on_error: Optional[ProducerErrorCallback] = None,
Expand Down Expand Up @@ -712,7 +764,12 @@ def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic
)
return topic

def run(self, dataframe: Optional[StreamingDataFrame] = None):
def run(
self,
dataframe: Optional[StreamingDataFrame] = None,
number: Optional[int] = None,
timeout: Optional[int] = None,
):
"""
Start processing data from Kafka using provided `StreamingDataFrame`
Expand Down Expand Up @@ -743,6 +800,7 @@ def run(self, dataframe: Optional[StreamingDataFrame] = None):
"the argument should be removed.",
FutureWarning,
)
self._exit_manager = ExitManager(number, timeout)
self._run()

def _exception_handler(self, exc_type, exc_val, exc_tb):
Expand Down Expand Up @@ -803,7 +861,7 @@ def _run_dataframe(self):

dataframes_composed = self._dataframe_registry.compose_all()

while self._running:
while self.running():
if self._state_manager.recovery_required:
self._state_manager.do_recovery()
else:
Expand All @@ -818,7 +876,7 @@ def _run_dataframe(self):
def _run_sources(self):
self._running = True
self._source_manager.start_sources()
while self._running:
while self.running():
self._source_manager.raise_for_error()

if not self._source_manager.is_alive():
Expand Down

0 comments on commit f730071

Please sign in to comment.