Skip to content

Commit

Permalink
Merging in Patrick's updates from master, as well as storage updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
ABPLMC committed Jul 11, 2024
2 parents cae7e24 + 681d93c commit 7fc18cd
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 1,940 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
python: [3.6, 3.8, 3.9]
extras: ["test", "test,queable,sentry"]
python: [3.8, 3.9, "3.10", 3.12]
extras: ["test", "test,queuable,sentry"]
steps:
- name: Setup Python
uses: actions/[email protected]
with:
python-version: ${{ matrix.python }}
- name: Check out repository code
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Test
working-directory: ./client
run: |
Expand Down
2 changes: 1 addition & 1 deletion .gitlab-ci.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ publish-pypi:
script:
- cd client
- rm -rf dist
- python3 setup.py sdist bdist_wheel
- python -m build
- twine upload -r pypi --skip-existing dist/*
tags:
- {{PLANET_RUNNER_TAG}}
Expand Down
54 changes: 19 additions & 35 deletions client/datalake/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,10 @@
InsufficientConfiguration.'''
has_queue = True
try:
import pyinotify
import inotify_simple
except ImportError:
has_queue = False

class FakePyinotify(object):

class ProcessEvent(object):
pass

pyinotify = FakePyinotify


def requires_queue(f):
def wrapped(*args, **kwargs):
Expand Down Expand Up @@ -124,32 +117,19 @@ def __init__(self, archive, queue_dir, callback=None):
self._archive = archive
self._callback = callback

class EventHandler(pyinotify.ProcessEvent):

def __init__(self, callback):
super(Uploader.EventHandler, self).__init__()
self.callback = callback

def process_IN_CLOSE_WRITE(self, event):
self.callback(event.pathname)

def process_IN_MOVED_TO(self, event):
self.callback(event.pathname)
self.inotify = inotify_simple.INotify()

def _setup_watch_manager(self, timeout):
if timeout is not None:
timeout = int(timeout * 1000)
self._wm = pyinotify.WatchManager()
self._handler = Uploader.EventHandler(self._push)
self._notifier = pyinotify.Notifier(self._wm, self._handler,
timeout=timeout)
self._wm.add_watch(self.queue_dir,
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO)
flags = inotify_simple.flags
watch_flags = flags.CLOSE_WRITE | flags.MOVED_TO
self.inotify.add_watch(self.queue_dir, watch_flags)

def _push(self, filename):
if not os.path.isabs(filename):
filename = os.path.join(self.queue_dir, filename)
if os.path.basename(filename).startswith('.'):
return
if self._workers == []:
if not self._workers:
self._synchronous_push(filename)
else:
self._threaded_push(filename)
Expand Down Expand Up @@ -205,7 +185,7 @@ def _listen(self, timeout=None, workers=1):
msg = 'number of upload workers cannot be zero or negative'
raise InsufficientConfiguration(msg)
if workers > 1:
# when multipe workers are requested, the main thread monitors the
# when multiple workers are requested, the main thread monitors the
# queue directory and puts the files in a Queue that is serviced by
# the worker threads. So the word queue is a bit overloaded in this
# module.
Expand All @@ -229,12 +209,16 @@ def _create_worker(self, worker_number):
def _run(self, timeout):

self._prepare_to_track_run_time(timeout)
self._notifier.process_events()
while self._notifier.check_events():
self._notifier.read_events()
self._notifier.process_events()
if self._update_time_remaining() == 0:
break
if timeout is not None:
timeout = int(timeout * 1000)

while self._update_time_remaining() > 0:
for event in self.inotify.read(timeout=timeout):
if event.name is None:
continue
self._push(event.name)
if self._update_time_remaining() == 0:
break

def _update_time_remaining(self):
if self._run_time_remaining is self.INFINITY:
Expand Down
79 changes: 79 additions & 0 deletions client/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
[build-system]
requires = [
"setuptools",
"versioningit",
]
build-backend = "setuptools.build_meta"

[project]
name = "datalake"
authors = [
{name = "Brian Cavagnolo", email = "[email protected]"},
]
description = "datalake: a metadata-aware archive"
readme = "README.md"
requires-python = ">=3.8"
classifiers = [
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
]
dependencies = [
'boto3>=1.9.68',
'memoized_property>=1.0.1',
'pyblake2>=0.9.3; python_version<"3.6"',
'click>=4.1',
'python-dotenv>=0.1.3',
'requests>=2.5',
'six>=1.10.0',
'python-dateutil>=2.4.2',
'pytz>=2015.4',
]
dynamic = ["version"]

[project.optional-dependencies]
test = [
'pytest<8.0.0',
'pytest-cov>=2.5.1,<4',
'moto[s3]>4,<5',
'twine<4.0.0',
'pip>=20.0.0,<22.0.0',
'wheel<0.38.0',
'flake8>=2.5.0,<4.1',
'responses<0.22.0',
]
# the queuable feature allows users to offload their datalake pushes
# to a separate uploader process.
queuable = [
'inotify_simple>=1.3.5',
]
sentry = [
'raven>=5.0.0',
]

[project.scripts]
datalake = "datalake.scripts.cli:cli"

[tool.setuptools.packages.find]
exclude = ["test"]

[tool.versioningit]
default-version = "0.0.0-dev"

[tool.versioningit.format]
distance = "{base_version}+{distance}.{vcs}{rev}"
# Example formatted version: 1.2.3+42.ge174a1f

dirty = "{base_version}+{distance}.{vcs}{rev}.dirty"
# Example formatted version: 1.2.3+42.ge174a1f.dirty

distance-dirty = "{base_version}+{distance}.{vcs}{rev}.dirty"
# Example formatted version: 1.2.3+42.ge174a1f.dirty

[tool.pytest.ini_options]
addopts = "--cov=planet.mc_client --cov-config .coveragerc"
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')"
]
6 changes: 0 additions & 6 deletions client/setup.cfg

This file was deleted.

62 changes: 0 additions & 62 deletions client/setup.py

This file was deleted.

2 changes: 1 addition & 1 deletion client/test/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,4 @@ def test_threaded_uploader_exits(enqueuer, faulty_uploader, random_file,
random_metadata, uploaded_file_validator):
enqueuer.enqueue(random_file, **random_metadata)
with pytest.raises(KeyboardInterrupt):
faulty_uploader.listen(timeout=0.1, workers=2)
faulty_uploader.listen(timeout=1.0, workers=2)
Loading

0 comments on commit 7fc18cd

Please sign in to comment.