diff --git a/neptune/internal/channels/channels_values_sender.py b/neptune/internal/channels/channels_values_sender.py index 0fba13c67..c6a790066 100644 --- a/neptune/internal/channels/channels_values_sender.py +++ b/neptune/internal/channels/channels_values_sender.py @@ -105,7 +105,7 @@ def __init__(self, experiment, values_queue): self._values_batch = [] def run(self): - while not self.is_interrupted() or not self._values_queue.empty(): + while self.should_continue_running() or not self._values_queue.empty(): try: sleep_start = time.time() self._values_batch.append(self._values_queue.get(timeout=max(self._sleep_time, 0))) @@ -122,10 +122,6 @@ def run(self): self._process_batch() - def join(self, timeout=None): - self.interrupt() - super(ChannelsValuesSendingThread, self).join(timeout) - def _process_batch(self): send_start = time.time() if self._values_batch: diff --git a/neptune/internal/threads/aborting_thread.py b/neptune/internal/threads/aborting_thread.py index 0e72ef91e..a6a5dfbdb 100644 --- a/neptune/internal/threads/aborting_thread.py +++ b/neptune/internal/threads/aborting_thread.py @@ -30,7 +30,7 @@ def __init__(self, websocket_factory, abort_impl, experiment_id): def run(self): try: - while not self.is_interrupted(): + while self.should_continue_running(): raw_message = self._ws_client.recv() self._abort_message_processor.run(raw_message) except WebSocketConnectionClosedException: diff --git a/neptune/internal/threads/hardware_metric_reporting_thread.py b/neptune/internal/threads/hardware_metric_reporting_thread.py index b9e8f0581..51a834fd9 100644 --- a/neptune/internal/threads/hardware_metric_reporting_thread.py +++ b/neptune/internal/threads/hardware_metric_reporting_thread.py @@ -32,7 +32,7 @@ def __init__(self, metric_service, metric_sending_interval_seconds): def run(self): try: - while not self.is_interrupted(): + while self.should_continue_running(): before = time.time() try: diff --git a/neptune/internal/threads/neptune_thread.py b/neptune/internal/threads/neptune_thread.py index 1656903e5..4b81db27f 100644 --- a/neptune/internal/threads/neptune_thread.py +++ b/neptune/internal/threads/neptune_thread.py @@ -15,6 +15,8 @@ # import threading +import six + class NeptuneThread(threading.Thread): def __init__(self, is_daemon): @@ -22,8 +24,18 @@ def __init__(self, is_daemon): self.setDaemon(is_daemon) self._interrupted = threading.Event() - def is_interrupted(self): - return self._interrupted.is_set() + def should_continue_running(self): + # TODO: remove this pylint exception once we stop supporting Python 2 + # pylint: disable=no-member + if six.PY2: + all_threads = threading.enumerate() + + # pylint:disable=protected-access + main_thread_is_alive = any(t.__class__ is threading._MainThread and t.is_alive() for t in all_threads) + else: + main_thread_is_alive = threading.main_thread().is_alive() + + return not self._interrupted.is_set() and main_thread_is_alive def interrupt(self): self._interrupted.set() diff --git a/neptune/internal/threads/ping_thread.py b/neptune/internal/threads/ping_thread.py index ca40ea091..3d43f8957 100644 --- a/neptune/internal/threads/ping_thread.py +++ b/neptune/internal/threads/ping_thread.py @@ -32,7 +32,7 @@ def __init__(self, backend, experiment): self.__experiment = experiment def run(self): - while not self.is_interrupted(): + while self.should_continue_running(): try: self.__backend.ping_experiment(self.__experiment) except HTTPUnprocessableEntity: