Skip to content

Commit

Permalink
Check for updated files in the backend in the retrieve* functions.
Browse files Browse the repository at this point in the history
This is done by doing a lightweight HEAD request and then checking
whether the modification time occurs after the age of the cached file;
if so, we overwrite the cache with the new remote content.

We make sure to set the modification time correctly on all downloaded
files to ensure that we can accurately determine their age in the cache.
  • Loading branch information
LTLA committed Aug 23, 2024
1 parent e3e1b6e commit 0462be7
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 21 deletions.
28 changes: 28 additions & 0 deletions src/sewerrat/_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import requests
import os
import time
import shutil


def format_error(res):
Expand Down Expand Up @@ -39,3 +41,29 @@ def clean_path(path: str) -> str:

keep = [""] + keep # add back the root.
return '/'.join(keep)


def parse_remote_last_modified(res) -> time.time:
if "last-modified" not in res.headers:
warnings.warn("failed to extract the 'last-modified' header")
return None
try:
mod_time = res.headers["last-modified"]
return time.mktime(time.strptime(mod_time, "%a, %d %b %Y %H:%M:%S %Z"))
except:
warnings.warn("failed to parse the 'last-modified' header")
return None


def download_file(url: str, dest: str):
with requests.get(url, stream=True) as r:
if r.status_code >= 300:
raise format_error(r)
with open(dest, "wb") as f:
shutil.copyfileobj(r.raw, f)

# Setting the correct modified time; we use the
# current time as the access time for comparison.
modtime = parse_remote_last_modified(r)
if modtime is not None:
os.utime(dest, (time.time(), modtime))
51 changes: 36 additions & 15 deletions src/sewerrat/retrieve_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import os
import tempfile
import urllib
import time
import requests
import shutil
from . import _utils as ut


Expand All @@ -14,21 +14,34 @@ def _local_root(cache: Optional[str], url: str) -> str:
return os.path.join(cache, urllib.parse.quote_plus(url))


def _acquire_file_raw(cache: str, path: str, url: str, overwrite: bool) -> str:
def _full_file_url(url: str, path: str) -> str:
return url + "/retrieve/file?path=" + urllib.parse.quote_plus(path)


def _acquire_file_raw(cache: str, path: str, url: str, overwrite: bool, update_delay: int) -> str:
target = os.path.join(cache, "LOCAL" + path) # os.path.join behaves poorly when 'path' is an absolute path.

if overwrite or not os.path.exists(target):
if not overwrite:
if not os.path.exists(target):
overwrite = True
else:
last_mod = os.path.getmtime(target)
if last_mod + update_delay < time.time(): # only check older files for updates, to avoid excessive queries.
with requests.head(_full_file_url(url, path)) as r:
if r.status_code >= 300:
raise format_error(r)
modtime = ut.parse_remote_last_modified(r)
if modtime is not None and modtime > last_mod:
overwrite = True

if overwrite:
tempdir = os.path.join(cache, "TEMP")
os.makedirs(tempdir, exist_ok=True)
os.makedirs(os.path.dirname(target), exist_ok=True)

tempfid, temppath = tempfile.mkstemp(dir=tempdir)
_, temppath = tempfile.mkstemp(dir=tempdir)
try:
with requests.get(url + "/retrieve/file?path=" + urllib.parse.quote_plus(path), stream=True) as r:
if r.status_code >= 300:
raise ut.format_error(r)
with os.fdopen(tempfid, 'wb') as f:
shutil.copyfileobj(r.raw, f)
ut.download_file(_full_file_url(url, path), temppath)
os.rename(temppath, target) # this should be more or less atomic, so no need for locks.
finally:
if os.path.exists(temppath):
Expand All @@ -37,11 +50,11 @@ def _acquire_file_raw(cache: str, path: str, url: str, overwrite: bool) -> str:
return target


def _acquire_file(cache: str, path: str, name: str, url: str, overwrite: bool) -> str:
return _acquire_file_raw(cache, path + "/" + name, url, overwrite)
def _acquire_file(cache: str, path: str, name: str, url: str, overwrite: bool, update_delay: int) -> str:
return _acquire_file_raw(cache, path + "/" + name, url, overwrite, update_delay)


def retrieve_directory(path: str, url: str, cache: Optional[str] = None, force_remote: bool = False, overwrite: bool = False, concurrent: int = 1) -> str:
def retrieve_directory(path: str, url: str, cache: Optional[str] = None, force_remote: bool = False, overwrite: bool = False, concurrent: int = 1, update_delay: int = 3600) -> str:
"""
Obtain the path to a registered directory or one of its subdirectories.
This may create a local copy of the directory's contents if the caller
Expand Down Expand Up @@ -69,6 +82,10 @@ def retrieve_directory(path: str, url: str, cache: Optional[str] = None, force_r
concurrent:
Number of concurrent downloads.
update_delay:
Maximum age of a cached file, in seconds. Older files will be
automatically checked for updates.
Returns:
Path to the subdirectory on the caller's filesystem. This is either
``path`` if it is accessible, or a path to a local cache of the
Expand All @@ -80,8 +97,12 @@ def retrieve_directory(path: str, url: str, cache: Optional[str] = None, force_r
cache = _local_root(cache, url)
final = os.path.join(cache, "LOCAL" + path) # os.path.join doesn't like joining of absolute paths.
ok = os.path.join(cache, "SUCCESS" + path, "....OK")

if not overwrite and os.path.exists(ok) and os.path.exists(final):
return final
# Only check for updates every 'update_delay' since the last check, so
# as to avoid excessive queries to the API.
if os.path.getmtime(ok) + update_delay >= time.time():
return final

res = requests.get(url + "/list?path=" + urllib.parse.quote_plus(path) + "&recursive=true")
if res.status_code >= 300:
Expand All @@ -90,12 +111,12 @@ def retrieve_directory(path: str, url: str, cache: Optional[str] = None, force_r

if concurrent == 1:
for y in listing:
_acquire_file(cache, name=y, path=path, url=url, overwrite=overwrite)
_acquire_file(cache, name=y, path=path, url=url, overwrite=overwrite, update_delay=update_delay)
else:
import multiprocessing
import functools
with multiprocessing.Pool(concurrent) as p:
p.map(functools.partial(_acquire_file, cache, path, url=url, overwrite=overwrite), listing)
p.map(functools.partial(_acquire_file, cache, path, url=url, overwrite=overwrite, update_delay=update_delay), listing)

# We use a directory-level OK file to avoid having to scan through all
# the directory contents to indicate that it's complete.
Expand Down
8 changes: 6 additions & 2 deletions src/sewerrat/retrieve_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .retrieve_directory import _local_root, _acquire_file_raw


def retrieve_file(path, url, cache: Optional[str] = None, force_remote: bool = False, overwrite: bool = False) -> str:
def retrieve_file(path, url, cache: Optional[str] = None, force_remote: bool = False, overwrite: bool = False, update_delay: int = 3600) -> str:
"""
Retrieve the path to a single file in a registered directory. This will
call the REST API if the caller is not on the same filesystem.
Expand All @@ -27,6 +27,10 @@ def retrieve_file(path, url, cache: Optional[str] = None, force_remote: bool = F
overwrite:
Whether to overwrite existing files in the cache.
update_delay:
Maximum age of a cached file, in seconds. Older files will be
automatically checked for updates.
Returns:
Path to the subdirectory on the caller's filesystem. This is either
``path`` if it is accessible, or a path to a local copy otherwise.
Expand All @@ -35,4 +39,4 @@ def retrieve_file(path, url, cache: Optional[str] = None, force_remote: bool = F
return path
else:
cache = _local_root(cache, url)
return _acquire_file_raw(cache, path, url=url, overwrite=overwrite)
return _acquire_file_raw(cache, path, url=url, overwrite=overwrite, update_delay=update_delay)
6 changes: 2 additions & 4 deletions src/sewerrat/start_sewerrat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import requests
import os
import time
from . import _utils as ut


test_api_process = None
Expand Down Expand Up @@ -75,10 +76,7 @@ def _acquire_sewerrat_binary(version: str, overwrite: bool):
import shutil
os.makedirs(cache, exist_ok=True)
tmp = exe + ".tmp"
with requests.get(url, stream=True) as r:
with open(tmp, 'wb') as f:
shutil.copyfileobj(r.raw, f)

ut.download_file(url, tmp)
os.chmod(tmp, 0o755)

# Using a write-and-rename paradigm to provide some atomicity. Note
Expand Down
42 changes: 42 additions & 0 deletions tests/test_retrieve.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import sewerrat
from sewerrat.retrieve_directory import _local_root
import pytest
import os
import tempfile
import json
import time


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -30,13 +32,40 @@ def test_retrieve_file(setup):
meta = json.load(f)
assert meta["first"] == "Aaron"

# Caching of remotes works as expected.
cache = tempfile.mkdtemp()
p = sewerrat.retrieve_file(mydir + "/metadata.json", url=url, cache=cache, force_remote=True)
assert p.startswith(cache)
with open(p, "r") as f:
meta = json.load(f)
assert meta["first"] == "Aaron"

# Subsequent requests are no-ops.
with open(p, "w") as f:
f.write('{ "first": "Erika" }')
p2 = sewerrat.retrieve_file(mydir + "/metadata.json", url=url, cache=cache, force_remote=True)
assert p == p2
with open(p2, "r") as f:
meta = json.load(f)
assert meta["first"] == "Erika"

# Overwritten successfully:
p2 = sewerrat.retrieve_file(mydir + "/metadata.json", url=url, cache=cache, force_remote=True, overwrite=True)
assert p == p2
with open(p2, "r") as f:
meta = json.load(f)
assert meta["first"] == "Aaron"

# We also get an update if the cached file is too old.
with open(p, "w") as f:
f.write('{ "first": "Erika" }')
os.utime(p, (time.time(), time.time() - 4000))
p2 = sewerrat.retrieve_file(mydir + "/metadata.json", url=url, cache=cache, force_remote=True)
assert p == p2
with open(p2, "r") as f:
meta = json.load(f)
assert meta["first"] == "Aaron"


def test_retrieve_metadata(setup):
mydir = setup
Expand Down Expand Up @@ -76,6 +105,19 @@ def test_retrieve_directory(setup):

# Unless we force an overwrite.
rdir2 == sewerrat.retrieve_directory(subpath, url=url, cache=cache, force_remote=True, overwrite=True)
assert rdir == rdir2
with open(os.path.join(rdir2, "metadata.json"), "r") as f:
meta = json.load(f)
assert meta["meal"] == "lunch"

# Or the cached file AND the success file are both too old, in which case they get updated.
with open(os.path.join(rdir, "metadata.json"), "w") as f:
f.write('{ "meal": "dinner" }')
os.utime(os.path.join(rdir, "metadata.json"), (time.time(), time.time() - 4000))
os.utime(os.path.join(_local_root(cache, url), "SUCCESS" + subpath, "....OK"), (time.time(), time.time() - 4000))

rdir2 = sewerrat.retrieve_directory(subpath, url=url, cache=cache, force_remote=True)
assert rdir == rdir2
with open(os.path.join(rdir2, "metadata.json"), "r") as f:
meta = json.load(f)
assert meta["meal"] == "lunch"
Expand Down

0 comments on commit 0462be7

Please sign in to comment.