Skip to content

Commit

Permalink
Fix stopping daemon threads in Python >= 3.7 (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
pitercl authored Apr 2, 2020
1 parent 3e6ec0e commit 0a84c1d
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 10 deletions.
6 changes: 1 addition & 5 deletions neptune/internal/channels/channels_values_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __init__(self, experiment, values_queue):
self._values_batch = []

def run(self):
while not self.is_interrupted() or not self._values_queue.empty():
while self.should_continue_running() or not self._values_queue.empty():
try:
sleep_start = time.time()
self._values_batch.append(self._values_queue.get(timeout=max(self._sleep_time, 0)))
Expand All @@ -122,10 +122,6 @@ def run(self):

self._process_batch()

def join(self, timeout=None):
self.interrupt()
super(ChannelsValuesSendingThread, self).join(timeout)

def _process_batch(self):
send_start = time.time()
if self._values_batch:
Expand Down
2 changes: 1 addition & 1 deletion neptune/internal/threads/aborting_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self, websocket_factory, abort_impl, experiment_id):

def run(self):
try:
while not self.is_interrupted():
while self.should_continue_running():
raw_message = self._ws_client.recv()
self._abort_message_processor.run(raw_message)
except WebSocketConnectionClosedException:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, metric_service, metric_sending_interval_seconds):

def run(self):
try:
while not self.is_interrupted():
while self.should_continue_running():
before = time.time()

try:
Expand Down
16 changes: 14 additions & 2 deletions neptune/internal/threads/neptune_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,27 @@
#
import threading

import six


class NeptuneThread(threading.Thread):
def __init__(self, is_daemon):
super(NeptuneThread, self).__init__(target=self.run)
self.setDaemon(is_daemon)
self._interrupted = threading.Event()

def is_interrupted(self):
return self._interrupted.is_set()
def should_continue_running(self):
# TODO: remove this pylint exception once we stop supporting Python 2
# pylint: disable=no-member
if six.PY2:
all_threads = threading.enumerate()

# pylint:disable=protected-access
main_thread_is_alive = any(t.__class__ is threading._MainThread and t.is_alive() for t in all_threads)
else:
main_thread_is_alive = threading.main_thread().is_alive()

return not self._interrupted.is_set() and main_thread_is_alive

def interrupt(self):
self._interrupted.set()
Expand Down
2 changes: 1 addition & 1 deletion neptune/internal/threads/ping_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, backend, experiment):
self.__experiment = experiment

def run(self):
while not self.is_interrupted():
while self.should_continue_running():
try:
self.__backend.ping_experiment(self.__experiment)
except HTTPUnprocessableEntity:
Expand Down

0 comments on commit 0a84c1d

Please sign in to comment.