Skip to content

Commit

Permalink
Move to inotify_simple from unsupported pyinotify
Browse files Browse the repository at this point in the history
  • Loading branch information
oldpatricka committed Jul 2, 2024
1 parent b8a67d2 commit f428ea6
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 38 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
python: [3.6, 3.8, 3.9]
extras: ["test", "test,queable,sentry"]
python: [3.6, 3.8, 3.9, "3.10"]
extras: ["test", "test,queuable,sentry"]
steps:
- name: Setup Python
uses: actions/[email protected]
Expand Down
54 changes: 19 additions & 35 deletions client/datalake/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,10 @@
InsufficientConfiguration.'''
has_queue = True
try:
import pyinotify
import inotify_simple
except ImportError:
has_queue = False

class FakePyinotify(object):

class ProcessEvent(object):
pass

pyinotify = FakePyinotify


def requires_queue(f):
def wrapped(*args, **kwargs):
Expand Down Expand Up @@ -124,32 +117,19 @@ def __init__(self, archive, queue_dir, callback=None):
self._archive = archive
self._callback = callback

class EventHandler(pyinotify.ProcessEvent):

def __init__(self, callback):
super(Uploader.EventHandler, self).__init__()
self.callback = callback

def process_IN_CLOSE_WRITE(self, event):
self.callback(event.pathname)

def process_IN_MOVED_TO(self, event):
self.callback(event.pathname)
self.inotify = inotify_simple.INotify()

def _setup_watch_manager(self, timeout):
if timeout is not None:
timeout = int(timeout * 1000)
self._wm = pyinotify.WatchManager()
self._handler = Uploader.EventHandler(self._push)
self._notifier = pyinotify.Notifier(self._wm, self._handler,
timeout=timeout)
self._wm.add_watch(self.queue_dir,
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO)
flags = inotify_simple.flags
watch_flags = flags.CLOSE_WRITE | flags.MOVED_TO
self.inotify.add_watch(self.queue_dir, watch_flags)

def _push(self, filename):
if not os.path.isabs(filename):
filename = os.path.join(self.queue_dir, filename)
if os.path.basename(filename).startswith('.'):
return
if self._workers == []:
if not self._workers:
self._synchronous_push(filename)
else:
self._threaded_push(filename)
Expand Down Expand Up @@ -205,7 +185,7 @@ def _listen(self, timeout=None, workers=1):
msg = 'number of upload workers cannot be zero or negative'
raise InsufficientConfiguration(msg)
if workers > 1:
# when multipe workers are requested, the main thread monitors the
# when multiple workers are requested, the main thread monitors the
# queue directory and puts the files in a Queue that is serviced by
# the worker threads. So the word queue is a bit overloaded in this
# module.
Expand All @@ -229,12 +209,16 @@ def _create_worker(self, worker_number):
def _run(self, timeout):

self._prepare_to_track_run_time(timeout)
self._notifier.process_events()
while self._notifier.check_events():
self._notifier.read_events()
self._notifier.process_events()
if self._update_time_remaining() == 0:
break
if timeout is not None:
timeout = int(timeout * 1000)

while self._update_time_remaining() > 0:
for event in self.inotify.read(timeout=timeout):
if event.name is None:
continue
self._push(event.name)
if self._update_time_remaining() == 0:
break

def _update_time_remaining(self):
if self._run_time_remaining is self.INFINITY:
Expand Down
2 changes: 1 addition & 1 deletion client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def get_version():
# the queuable feature allows users to offload their datalake pushes
# to a separate uploader process.
'queuable': [
'pyinotify>=0.9.4',
'inotify_simple>=1.3.5',
],
'sentry': [
'raven>=5.0.0',
Expand Down

0 comments on commit f428ea6

Please sign in to comment.