diff --git a/quixstreams/app.py b/quixstreams/app.py index 71528ebb9..287951eae 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -75,6 +75,49 @@ def __call__( ) -> TopicManager: ... +class ExitManager: + def __init__(self, number: Optional[int] = None, timeout: Optional[int] = None): + """ + Initialize the exit manager with either a message count limit, timeout, or both. + + The `should_exit` function will be called for every message processed, so it needs + to be highly optimized. To ensure optimal performance, the logic for determining + which checks to perform is handled in `__init__` rather than in `should_exit` itself. + + Args: + number: Maximum number of messages to process before exiting + timeout: Maximum time in seconds to run before exiting + """ + if number is not None and timeout is not None: + self._number = number + self._timeout = timeout + self._counter = 0 + self._start = time.monotonic() + self.should_exit = self._check_number_and_timeout + elif number is not None: + self._number = number + self._counter = 0 + self.should_exit = self._check_number + elif timeout is not None: + self._timeout = timeout + self._start = time.monotonic() + self.should_exit = self._check_timeout + else: + self.should_exit = lambda: False + + def _check_number(self): + if self._counter >= self._number: + return True + self._counter += 1 + return False + + def _check_timeout(self): + return (time.monotonic() - self._start) > self._timeout + + def _check_number_and_timeout(self): + return self._check_number() or self._check_timeout() + + class Application: """ The main Application class. @@ -345,6 +388,7 @@ def __init__( pausing_manager=self._pausing_manager, ) self._dataframe_registry = DataframeRegistry() + self._exit_manager = ExitManager() @property def config(self) -> "ApplicationConfig": @@ -546,6 +590,14 @@ def stop(self, fail: bool = False): if self._state_manager.using_changelogs: self._state_manager.stop_recovery() + def running(self): + if not self._running: + return False + elif self._exit_manager.should_exit(): + self.stop() + return False + return True + def _get_rowproducer( self, on_error: Optional[ProducerErrorCallback] = None, @@ -712,7 +764,12 @@ def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic ) return topic - def run(self, dataframe: Optional[StreamingDataFrame] = None): + def run( + self, + dataframe: Optional[StreamingDataFrame] = None, + number: Optional[int] = None, + timeout: Optional[int] = None, + ): """ Start processing data from Kafka using provided `StreamingDataFrame` @@ -743,6 +800,7 @@ def run(self, dataframe: Optional[StreamingDataFrame] = None): "the argument should be removed.", FutureWarning, ) + self._exit_manager = ExitManager(number, timeout) self._run() def _exception_handler(self, exc_type, exc_val, exc_tb): @@ -803,7 +861,7 @@ def _run_dataframe(self): dataframes_composed = self._dataframe_registry.compose_all() - while self._running: + while self.running(): if self._state_manager.recovery_required: self._state_manager.do_recovery() else: @@ -818,7 +876,7 @@ def _run_dataframe(self): def _run_sources(self): self._running = True self._source_manager.start_sources() - while self._running: + while self.running(): self._source_manager.raise_for_error() if not self._source_manager.is_alive():