Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove staging area functionality #97

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 0 additions & 44 deletions examples/lindi_demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -138,50 +138,6 @@
"# Save the changes to a new .nwb.lindi.json file\n",
"client.write_lindi_file('new.nwb.lindi.json')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fe19e0f6-1c62-42e9-9af0-4a57c8a61364",
"metadata": {},
"outputs": [],
"source": [
"# Now load that file\n",
"client2 = lindi.LindiH5pyFile.from_lindi_file('new.nwb.lindi.json')\n",
"print(client2.attrs['new_attribute'])"
]
},
{
"cell_type": "markdown",
"id": "4a2addfc-58ed-4e79-9c64-b7ec95cb12f5",
"metadata": {},
"source": [
"### Add datasets to a .nwb.lindi.json file using a local staging area"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6e87640d-1927-43c1-89c1-c1274a11f185",
"metadata": {},
"outputs": [],
"source": [
"import lindi\n",
"\n",
"# URL of the remote .nwb.lindi.json file\n",
"url = 'https://lindi.neurosift.org/dandi/dandisets/000939/assets/56d875d6-a705-48d3-944c-53394a389c85/nwb.lindi.json'\n",
"\n",
"# Load the h5py-like client for the reference file system\n",
"# in read-write mode with a staging area\n",
"with lindi.StagingArea.create(base_dir='lindi_staging') as staging_area:\n",
" client = lindi.LindiH5pyFile.from_lindi_file(\n",
" url,\n",
" mode=\"r+\",\n",
" staging_area=staging_area\n",
" )\n",
" # add datasets to client using pynwb or other tools\n",
" # upload the changes to the remote .nwb.lindi.json file"
]
}
],
"metadata": {
Expand Down
34 changes: 0 additions & 34 deletions lindi/File/File.py

This file was deleted.

Empty file removed lindi/File/__init__.py
Empty file.
124 changes: 11 additions & 113 deletions lindi/LindiH5pyFile/LindiH5pyFile.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union, Literal, Callable
from typing import Union, Literal
import os
import json
import tempfile
Expand All @@ -12,8 +12,6 @@
from .LindiH5pyReference import LindiH5pyReference
from .LindiReferenceFileSystemStore import LindiReferenceFileSystemStore

from ..LindiStagingStore.StagingArea import StagingArea
from ..LindiStagingStore.LindiStagingStore import LindiStagingStore, _apply_templates
from ..LindiH5ZarrStore.LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts

from ..LocalCache.LocalCache import LocalCache
Expand All @@ -26,10 +24,6 @@

LindiFileMode = Literal["r", "r+", "w", "w-", "x", "a"]

# Accepts a string path to a file, uploads (or copies) it somewhere, and returns a string URL
# (or local path)
UploadFileFunc = Callable[[str], str]


class LindiH5pyFile(h5py.File):
def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None, _close_source_tar_file_on_close: bool = False):
Expand All @@ -53,7 +47,7 @@ def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, Non
self._is_open = True

@staticmethod
def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None):
def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None):
"""
Create a LindiH5pyFile from a URL or path to a .lindi.json file.

Expand All @@ -62,7 +56,6 @@ def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area
return LindiH5pyFile.from_reference_file_system(
url_or_path,
mode=mode,
staging_area=staging_area,
local_cache=local_cache
)

Expand Down Expand Up @@ -108,7 +101,7 @@ def from_hdf5_file(
)

@staticmethod
def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None, _close_source_tar_file_on_close: bool = False):
def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None, _close_source_tar_file_on_close: bool = False):
"""
Create a LindiH5pyFile from a reference file system.

Expand All @@ -120,9 +113,6 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
be created.
mode : Literal["r", "r+", "w", "w-", "x", "a"], optional
The mode to open the file object in, by default "r".
staging_area : Union[StagingArea, None], optional
The staging area to use for writing data, preparing for upload. This
is only used in write mode, by default None.
local_cache : Union[LocalCache, None], optional
The local cache to use for caching data, by default None.
_source_url_or_path : Union[str, None], optional
Expand Down Expand Up @@ -152,7 +142,6 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
return LindiH5pyFile.from_reference_file_system(
data,
mode=mode,
staging_area=staging_area,
local_cache=local_cache,
_source_tar_file=tar_file,
_source_url_or_path=rfs,
Expand Down Expand Up @@ -193,7 +182,6 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
return LindiH5pyFile.from_reference_file_system(
data,
mode=mode,
staging_area=staging_area,
local_cache=local_cache,
_source_url_or_path=rfs,
_source_tar_file=tar_file,
Expand All @@ -208,11 +196,7 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
_source_tar_file=_source_tar_file
)
source_is_url = _source_url_or_path is not None and (_source_url_or_path.startswith("http://") or _source_url_or_path.startswith("https://"))
if staging_area:
if _source_tar_file and not source_is_url:
raise Exception("Cannot use staging area when source is a local tar file")
store = LindiStagingStore(base_store=store, staging_area=staging_area)
elif _source_url_or_path and _source_tar_file and not source_is_url:
if _source_url_or_path and _source_tar_file and not source_is_url:
store = LindiTarStore(base_store=store, tar_file=_source_tar_file)
return LindiH5pyFile.from_zarr_store(
store,
Expand Down Expand Up @@ -274,9 +258,6 @@ def to_reference_file_system(self):
if self._zarr_store is None:
raise Exception("Cannot convert to reference file system without zarr store")
zarr_store = self._zarr_store
if isinstance(zarr_store, LindiStagingStore):
zarr_store.consolidate_chunks()
zarr_store = zarr_store._base_store
if isinstance(zarr_store, LindiTarStore):
zarr_store = zarr_store._base_store
if isinstance(zarr_store, LindiH5ZarrStore):
Expand All @@ -289,58 +270,6 @@ def to_reference_file_system(self):
LindiReferenceFileSystemStore.use_templates_in_rfs(rfs_copy)
return rfs_copy

def upload(
self,
*,
on_upload_blob: UploadFileFunc,
on_upload_main: UploadFileFunc
):
"""
Consolidate the chunks in the staging area, upload them to a storage
system, updating the references in the base store, and then upload the
updated reference file system .json file.

Parameters
----------
on_upload_blob : StoreFileFunc
A function that takes a string path to a blob file, uploads or copies it
somewhere, and returns a string URL (or local path).
on_upload_main : StoreFileFunc
A function that takes a string path to the main .json file, stores
it somewhere, and returns a string URL (or local path).

Returns
-------
str
The URL (or local path) of the uploaded reference file system .json
file.
"""
rfs = self.to_reference_file_system()
blobs_to_upload = set()
# Get the set of all local URLs in rfs['refs']
for k, v in rfs['refs'].items():
if isinstance(v, list) and len(v) == 3:
url = _apply_templates(v[0], rfs.get('templates', {}))
if not url.startswith("http://") and not url.startswith("https://"):
local_path = url
blobs_to_upload.add(local_path)
# Upload each of the local blobs using the given upload function and get a mapping from
# the original file paths to the URLs of the uploaded files
blob_mapping = _upload_blobs(blobs_to_upload, on_upload_blob=on_upload_blob)
# Replace the local URLs in rfs['refs'] with URLs of the uploaded files
for k, v in rfs['refs'].items():
if isinstance(v, list) and len(v) == 3:
url1 = _apply_templates(v[0], rfs.get('templates', {}))
url2 = blob_mapping.get(url1, None)
if url2 is not None:
v[0] = url2
# Write the updated LINDI file to a temp directory and upload it
with tempfile.TemporaryDirectory() as tmpdir:
rfs_fname = f"{tmpdir}/rfs.lindi.json"
LindiReferenceFileSystemStore.use_templates_in_rfs(rfs)
_write_rfs_to_file(rfs=rfs, output_file_name=rfs_fname)
return on_upload_main(rfs_fname)

def write_lindi_file(self, filename: str, *, generation_metadata: Union[dict, None] = None):
"""
Write the reference file system to a lindi or .lindi.json file.
Expand Down Expand Up @@ -568,15 +497,6 @@ def require_dataset(self, name, shape, dtype, exact=False, **kwds):
raise Exception("Cannot require dataset in read-only mode")
return self._the_group.require_dataset(name, shape, dtype, exact=exact, **kwds)

##############################
# staging store
@property
def staging_store(self):
store = self._zarr_store
if not isinstance(store, LindiStagingStore):
return None
return store


def _download_file(url: str, filename: str) -> None:
headers = {
Expand Down Expand Up @@ -650,35 +570,6 @@ def _deep_copy(obj):
return obj


def _upload_blobs(
blobs: set,
*,
on_upload_blob: UploadFileFunc
) -> dict:
"""
Upload all the blobs in a set to a storage system and return a mapping from
the original file paths to the URLs of the uploaded files.
"""
blob_mapping = {}
for i, blob in enumerate(blobs):
size = os.path.getsize(blob)
print(f'Uploading blob {i + 1} of {len(blobs)} {blob} ({_format_size_bytes(size)})')
blob_url = on_upload_blob(blob)
blob_mapping[blob] = blob_url
return blob_mapping


def _format_size_bytes(size_bytes: int) -> str:
if size_bytes < 1024:
return f"{size_bytes} bytes"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / 1024 / 1024:.1f} MB"
else:
return f"{size_bytes / 1024 / 1024 / 1024:.1f} GB"


def _load_rfs_from_url(url: str):
file_size = _get_file_size_of_remote_file(url)
if file_size < 1024 * 1024 * 2:
Expand Down Expand Up @@ -832,3 +723,10 @@ def _update_internal_references_to_remote_tar_file(rfs: dict, remote_url: str, r
raise Exception(f"Unexpected length for reference: {len(v)}")

LindiReferenceFileSystemStore.use_templates_in_rfs(rfs)


def _apply_templates(x: str, templates: dict) -> str:
if '{{' in x and '}}' in x:
for key, val in templates.items():
x = x.replace('{{' + key + '}}', val)
return x
Loading