From 870269d73a78878e2554b150aa0ad379d832dcc1 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Fri, 20 Sep 2024 07:21:20 -0400 Subject: [PATCH 1/2] remove staging area functionality Delete LindiStagingStore functionality --- examples/lindi_demo.ipynb | 44 ---- lindi/File/File.py | 34 --- lindi/File/__init__.py | 0 lindi/LindiH5pyFile/LindiH5pyFile.py | 124 +---------- lindi/LindiStagingStore/LindiStagingStore.py | 218 ------------------- lindi/LindiStagingStore/StagingArea.py | 109 ---------- lindi/LindiStagingStore/__init__.py | 1 - lindi/__init__.py | 2 - tests/test_staging_area.py | 66 ------ 9 files changed, 11 insertions(+), 587 deletions(-) delete mode 100644 lindi/File/File.py delete mode 100644 lindi/File/__init__.py delete mode 100644 lindi/LindiStagingStore/LindiStagingStore.py delete mode 100644 lindi/LindiStagingStore/StagingArea.py delete mode 100644 lindi/LindiStagingStore/__init__.py delete mode 100644 tests/test_staging_area.py diff --git a/examples/lindi_demo.ipynb b/examples/lindi_demo.ipynb index 5082386..460a5e1 100644 --- a/examples/lindi_demo.ipynb +++ b/examples/lindi_demo.ipynb @@ -138,50 +138,6 @@ "# Save the changes to a new .nwb.lindi.json file\n", "client.write_lindi_file('new.nwb.lindi.json')" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "fe19e0f6-1c62-42e9-9af0-4a57c8a61364", - "metadata": {}, - "outputs": [], - "source": [ - "# Now load that file\n", - "client2 = lindi.LindiH5pyFile.from_lindi_file('new.nwb.lindi.json')\n", - "print(client2.attrs['new_attribute'])" - ] - }, - { - "cell_type": "markdown", - "id": "4a2addfc-58ed-4e79-9c64-b7ec95cb12f5", - "metadata": {}, - "source": [ - "### Add datasets to a .nwb.lindi.json file using a local staging area" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "6e87640d-1927-43c1-89c1-c1274a11f185", - "metadata": {}, - "outputs": [], - "source": [ - "import lindi\n", - "\n", - "# URL of the remote .nwb.lindi.json file\n", - "url = 'https://lindi.neurosift.org/dandi/dandisets/000939/assets/56d875d6-a705-48d3-944c-53394a389c85/nwb.lindi.json'\n", - "\n", - "# Load the h5py-like client for the reference file system\n", - "# in read-write mode with a staging area\n", - "with lindi.StagingArea.create(base_dir='lindi_staging') as staging_area:\n", - " client = lindi.LindiH5pyFile.from_lindi_file(\n", - " url,\n", - " mode=\"r+\",\n", - " staging_area=staging_area\n", - " )\n", - " # add datasets to client using pynwb or other tools\n", - " # upload the changes to the remote .nwb.lindi.json file" - ] } ], "metadata": { diff --git a/lindi/File/File.py b/lindi/File/File.py deleted file mode 100644 index 59e0f4c..0000000 --- a/lindi/File/File.py +++ /dev/null @@ -1,34 +0,0 @@ -from typing import Literal -import os -import h5py -from ..LindiH5pyFile.LindiH5pyFile import LindiH5pyFile -from ..LindiStagingStore.StagingArea import StagingArea -from ..LocalCache.LocalCache import LocalCache - - -class File(h5py.File): - """ - A drop-in replacement for h5py.File that is either a lindi.LindiH5pyFile or - h5py.File depending on whether the file name ends with .lindi.json or not. - """ - def __new__(cls, name, mode: Literal['r', 'r+', 'w', 'w-', 'x', 'a'] = 'r', **kwds): - if isinstance(name, str) and name.endswith('.lindi.json'): - # should we raise exceptions on select unsupported kwds? or just go with the flow? - if mode != 'r': - staging_area = StagingArea.create(dir=name + '.d') - else: - staging_area = None - local_cache_dir = os.environ.get('LINDI_LOCAL_CACHE_DIR', None) - if local_cache_dir is not None: - local_cache = LocalCache(cache_dir=local_cache_dir) - else: - local_cache = None - - return LindiH5pyFile.from_lindi_file( - name, - mode=mode, - staging_area=staging_area, - local_cache=local_cache - ) - else: - return h5py.File(name, mode=mode, **kwds) diff --git a/lindi/File/__init__.py b/lindi/File/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/lindi/LindiH5pyFile/LindiH5pyFile.py b/lindi/LindiH5pyFile/LindiH5pyFile.py index 37681c7..6d7ecc7 100644 --- a/lindi/LindiH5pyFile/LindiH5pyFile.py +++ b/lindi/LindiH5pyFile/LindiH5pyFile.py @@ -1,4 +1,4 @@ -from typing import Union, Literal, Callable +from typing import Union, Literal import os import json import tempfile @@ -12,8 +12,6 @@ from .LindiH5pyReference import LindiH5pyReference from .LindiReferenceFileSystemStore import LindiReferenceFileSystemStore -from ..LindiStagingStore.StagingArea import StagingArea -from ..LindiStagingStore.LindiStagingStore import LindiStagingStore, _apply_templates from ..LindiH5ZarrStore.LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts from ..LocalCache.LocalCache import LocalCache @@ -26,10 +24,6 @@ LindiFileMode = Literal["r", "r+", "w", "w-", "x", "a"] -# Accepts a string path to a file, uploads (or copies) it somewhere, and returns a string URL -# (or local path) -UploadFileFunc = Callable[[str], str] - class LindiH5pyFile(h5py.File): def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None, _close_source_tar_file_on_close: bool = False): @@ -53,7 +47,7 @@ def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, Non self._is_open = True @staticmethod - def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None): + def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None): """ Create a LindiH5pyFile from a URL or path to a .lindi.json file. @@ -62,7 +56,6 @@ def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area return LindiH5pyFile.from_reference_file_system( url_or_path, mode=mode, - staging_area=staging_area, local_cache=local_cache ) @@ -108,7 +101,7 @@ def from_hdf5_file( ) @staticmethod - def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None, _close_source_tar_file_on_close: bool = False): + def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None, _close_source_tar_file_on_close: bool = False): """ Create a LindiH5pyFile from a reference file system. @@ -120,9 +113,6 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo be created. mode : Literal["r", "r+", "w", "w-", "x", "a"], optional The mode to open the file object in, by default "r". - staging_area : Union[StagingArea, None], optional - The staging area to use for writing data, preparing for upload. This - is only used in write mode, by default None. local_cache : Union[LocalCache, None], optional The local cache to use for caching data, by default None. _source_url_or_path : Union[str, None], optional @@ -152,7 +142,6 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo return LindiH5pyFile.from_reference_file_system( data, mode=mode, - staging_area=staging_area, local_cache=local_cache, _source_tar_file=tar_file, _source_url_or_path=rfs, @@ -193,7 +182,6 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo return LindiH5pyFile.from_reference_file_system( data, mode=mode, - staging_area=staging_area, local_cache=local_cache, _source_url_or_path=rfs, _source_tar_file=tar_file, @@ -208,11 +196,7 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo _source_tar_file=_source_tar_file ) source_is_url = _source_url_or_path is not None and (_source_url_or_path.startswith("http://") or _source_url_or_path.startswith("https://")) - if staging_area: - if _source_tar_file and not source_is_url: - raise Exception("Cannot use staging area when source is a local tar file") - store = LindiStagingStore(base_store=store, staging_area=staging_area) - elif _source_url_or_path and _source_tar_file and not source_is_url: + if _source_url_or_path and _source_tar_file and not source_is_url: store = LindiTarStore(base_store=store, tar_file=_source_tar_file) return LindiH5pyFile.from_zarr_store( store, @@ -274,9 +258,6 @@ def to_reference_file_system(self): if self._zarr_store is None: raise Exception("Cannot convert to reference file system without zarr store") zarr_store = self._zarr_store - if isinstance(zarr_store, LindiStagingStore): - zarr_store.consolidate_chunks() - zarr_store = zarr_store._base_store if isinstance(zarr_store, LindiTarStore): zarr_store = zarr_store._base_store if isinstance(zarr_store, LindiH5ZarrStore): @@ -289,58 +270,6 @@ def to_reference_file_system(self): LindiReferenceFileSystemStore.use_templates_in_rfs(rfs_copy) return rfs_copy - def upload( - self, - *, - on_upload_blob: UploadFileFunc, - on_upload_main: UploadFileFunc - ): - """ - Consolidate the chunks in the staging area, upload them to a storage - system, updating the references in the base store, and then upload the - updated reference file system .json file. - - Parameters - ---------- - on_upload_blob : StoreFileFunc - A function that takes a string path to a blob file, uploads or copies it - somewhere, and returns a string URL (or local path). - on_upload_main : StoreFileFunc - A function that takes a string path to the main .json file, stores - it somewhere, and returns a string URL (or local path). - - Returns - ------- - str - The URL (or local path) of the uploaded reference file system .json - file. - """ - rfs = self.to_reference_file_system() - blobs_to_upload = set() - # Get the set of all local URLs in rfs['refs'] - for k, v in rfs['refs'].items(): - if isinstance(v, list) and len(v) == 3: - url = _apply_templates(v[0], rfs.get('templates', {})) - if not url.startswith("http://") and not url.startswith("https://"): - local_path = url - blobs_to_upload.add(local_path) - # Upload each of the local blobs using the given upload function and get a mapping from - # the original file paths to the URLs of the uploaded files - blob_mapping = _upload_blobs(blobs_to_upload, on_upload_blob=on_upload_blob) - # Replace the local URLs in rfs['refs'] with URLs of the uploaded files - for k, v in rfs['refs'].items(): - if isinstance(v, list) and len(v) == 3: - url1 = _apply_templates(v[0], rfs.get('templates', {})) - url2 = blob_mapping.get(url1, None) - if url2 is not None: - v[0] = url2 - # Write the updated LINDI file to a temp directory and upload it - with tempfile.TemporaryDirectory() as tmpdir: - rfs_fname = f"{tmpdir}/rfs.lindi.json" - LindiReferenceFileSystemStore.use_templates_in_rfs(rfs) - _write_rfs_to_file(rfs=rfs, output_file_name=rfs_fname) - return on_upload_main(rfs_fname) - def write_lindi_file(self, filename: str, *, generation_metadata: Union[dict, None] = None): """ Write the reference file system to a lindi or .lindi.json file. @@ -568,15 +497,6 @@ def require_dataset(self, name, shape, dtype, exact=False, **kwds): raise Exception("Cannot require dataset in read-only mode") return self._the_group.require_dataset(name, shape, dtype, exact=exact, **kwds) - ############################## - # staging store - @property - def staging_store(self): - store = self._zarr_store - if not isinstance(store, LindiStagingStore): - return None - return store - def _download_file(url: str, filename: str) -> None: headers = { @@ -650,35 +570,6 @@ def _deep_copy(obj): return obj -def _upload_blobs( - blobs: set, - *, - on_upload_blob: UploadFileFunc -) -> dict: - """ - Upload all the blobs in a set to a storage system and return a mapping from - the original file paths to the URLs of the uploaded files. - """ - blob_mapping = {} - for i, blob in enumerate(blobs): - size = os.path.getsize(blob) - print(f'Uploading blob {i + 1} of {len(blobs)} {blob} ({_format_size_bytes(size)})') - blob_url = on_upload_blob(blob) - blob_mapping[blob] = blob_url - return blob_mapping - - -def _format_size_bytes(size_bytes: int) -> str: - if size_bytes < 1024: - return f"{size_bytes} bytes" - elif size_bytes < 1024 * 1024: - return f"{size_bytes / 1024:.1f} KB" - elif size_bytes < 1024 * 1024 * 1024: - return f"{size_bytes / 1024 / 1024:.1f} MB" - else: - return f"{size_bytes / 1024 / 1024 / 1024:.1f} GB" - - def _load_rfs_from_url(url: str): file_size = _get_file_size_of_remote_file(url) if file_size < 1024 * 1024 * 2: @@ -832,3 +723,10 @@ def _update_internal_references_to_remote_tar_file(rfs: dict, remote_url: str, r raise Exception(f"Unexpected length for reference: {len(v)}") LindiReferenceFileSystemStore.use_templates_in_rfs(rfs) + + +def _apply_templates(x: str, templates: dict) -> str: + if '{{' in x and '}}' in x: + for key, val in templates.items(): + x = x.replace('{{' + key + '}}', val) + return x diff --git a/lindi/LindiStagingStore/LindiStagingStore.py b/lindi/LindiStagingStore/LindiStagingStore.py deleted file mode 100644 index 019d982..0000000 --- a/lindi/LindiStagingStore/LindiStagingStore.py +++ /dev/null @@ -1,218 +0,0 @@ -import os -from zarr.storage import Store as ZarrStore -from ..LindiH5pyFile.LindiReferenceFileSystemStore import LindiReferenceFileSystemStore -from .StagingArea import StagingArea, _random_str - - -class LindiStagingStore(ZarrStore): - """ - A Zarr store that allows supplementing a base LindiReferenceFileSystemStore - where the large data blobs are stored in a staging area. After writing new - data to the store, the data blobs can be consolidated into larger files and - then uploaded to a custom storage system, for example DANDI or a cloud - bucket. - """ - def __init__(self, *, base_store: LindiReferenceFileSystemStore, staging_area: StagingArea): - """ - Create a LindiStagingStore. - - Parameters - ---------- - base_store : LindiReferenceFileSystemStore - The base store that this store supplements. - staging_area : StagingArea - The staging area where large data blobs are stored. - """ - self._base_store = base_store - self._staging_area = staging_area - - def __getitem__(self, key: str): - return self._base_store.__getitem__(key) - - def __setitem__(self, key: str, value: bytes): - key_parts = key.split("/") - key_base_name = key_parts[-1] - if key_base_name.startswith('.') or key_base_name.endswith('.json'): # always inline .zattrs, .zgroup, .zarray, zarr.json - inline = True - else: - # presumably it is a chunk of an array - if not isinstance(value, bytes): - raise ValueError("Value must be bytes") - size = len(value) - inline = size < 1000 # this should be a configurable threshold - if inline: - # If inline, save in memory - return self._base_store.__setitem__(key, value) - else: - # If not inline, save it as a file in the staging directory - key_without_initial_slash = key if not key.startswith("/") else key[1:] - stored_file_path = self._staging_area.store_file(key_without_initial_slash, value) - - self._set_ref_reference(key_without_initial_slash, stored_file_path, 0, len(value)) - - def __delitem__(self, key: str): - # We don't delete the file from the staging directory, because that - # would be dangerous if the file was part of a consolidated file. - return self._base_store.__delitem__(key) - - def __iter__(self): - return self._base_store.__iter__() - - def __len__(self): - return self._base_store.__len__() - - # These methods are overridden from BaseStore - def is_readable(self): - return True - - def is_writeable(self): - return True - - def is_listable(self): - return True - - def is_erasable(self): - return False - - def _set_ref_reference(self, key: str, filename: str, offset: int, size: int): - rfs = self._base_store.rfs - if 'refs' not in rfs: - # this shouldn't happen, but we'll be defensive - rfs['refs'] = {} - rfs['refs'][key] = [ - filename, - offset, - size - ] - - def consolidate_chunks(self): - """ - Consolidate the chunks in the staging area. - """ - rfs = self._base_store.rfs - refs_keys_by_reference_parent_path = {} - for k, v in rfs['refs'].items(): - if isinstance(v, list) and len(v) == 3: - url = v[0] - if not url.startswith(self._staging_area.directory + '/'): - continue - parent_path = os.path.dirname(url) - if parent_path not in refs_keys_by_reference_parent_path: - refs_keys_by_reference_parent_path[parent_path] = [] - refs_keys_by_reference_parent_path[parent_path].append(k) - for root, dirs, files1 in os.walk(self._staging_area._directory): - files = [ - f for f in files1 - if not f.startswith('.') and not f.endswith('.json') and not f.startswith('consolidated.') - ] - if len(files) <= 1: - continue - refs_keys_for_this_dir = refs_keys_by_reference_parent_path.get(root, []) - if len(refs_keys_for_this_dir) <= 1: - continue - - # sort so that the files are in order 0.0.0, 0.0.1, 0.0.2, ... - files = _sort_by_chunk_key(files) - - print(f'Consolidating {len(files)} files in {root}') - - offset = 0 - offset_maps = {} - consolidated_id = _random_str(8) - consolidated_index = 0 - max_size_of_consolidated_file = 1024 * 1024 * 1024 # 1 GB, a good size for cloud bucket files - consolidated_fname = f"{root}/consolidated.{consolidated_id}.{consolidated_index}" - consolidated_f = open(consolidated_fname, "wb") - try: - for fname in files: - full_fname = f"{root}/{fname}" - with open(full_fname, "rb") as f2: - consolidated_f.write(f2.read()) - offset_maps[full_fname] = (consolidated_fname, offset) - offset += os.path.getsize(full_fname) - if offset > max_size_of_consolidated_file: - consolidated_f.close() - consolidated_index += 1 - consolidated_fname = f"{root}/consolidated.{consolidated_id}.{consolidated_index}" - consolidated_f = open(consolidated_fname, "wb") - offset = 0 - finally: - consolidated_f.close() - for key in refs_keys_for_this_dir: - filename, old_offset, old_size = rfs['refs'][key] - if filename not in offset_maps: - continue - consolidated_fname, new_offset = offset_maps[filename] - rfs['refs'][key] = [consolidated_fname, new_offset + old_offset, old_size] - # remove the old files - for fname in files: - os.remove(f"{root}/{fname}") - - def copy_chunks_to_staging_area(self, *, download_remote: bool): - """ - Copy the chunks in the base store to the staging area. This is done - in preparation for uploading to a storage system. - - Parameters - ---------- - download_remote : bool - If True, download the remote chunks to the staging area. If False, - just copy the local chunks. - """ - if download_remote: - raise NotImplementedError("Downloading remote chunks not yet implemented") - rfs = self._base_store.rfs - templates = rfs.get('templates', {}) - for k, v in rfs['refs'].items(): - if isinstance(v, list) and len(v) == 3: - url = _apply_templates(v[0], templates) - if url.startswith('http://') or url.startswith('https://'): - if download_remote: - raise NotImplementedError("Downloading remote chunks not yet implemented") - continue - elif url.startswith(self._staging_area.directory + '/'): - # already in the staging area - continue - else: - # copy the local file to the staging area - path0 = url - chunk_data = _read_chunk_data(path0, v[1], v[2]) - stored_file_path = self._staging_area.store_file(k, chunk_data) - self._set_ref_reference(k, stored_file_path, 0, v[2]) - - -def _apply_templates(x: str, templates: dict) -> str: - if '{{' in x and '}}' in x: - for key, val in templates.items(): - x = x.replace('{{' + key + '}}', val) - return x - - -def _sort_by_chunk_key(files: list) -> list: - # first verify that all the files have the same number of parts - num_parts = None - for fname in files: - parts = fname.split('.') - if num_parts is None: - num_parts = len(parts) - elif len(parts) != num_parts: - raise ValueError(f"Files have different numbers of parts: {files}") - # Verify that all the parts are integers - for fname in files: - parts = fname.split('.') - for p in parts: - try: - int(p) - except ValueError: - raise ValueError(f"File part is not an integer: {fname}") - - def _chunk_key(fname: str) -> tuple: - parts = fname.split('.') - return tuple(int(p) for p in parts) - return sorted(files, key=_chunk_key) - - -def _read_chunk_data(filename: str, offset: int, size: int) -> bytes: - with open(filename, "rb") as f: - f.seek(offset) - return f.read(size) diff --git a/lindi/LindiStagingStore/StagingArea.py b/lindi/LindiStagingStore/StagingArea.py deleted file mode 100644 index 460bc9b..0000000 --- a/lindi/LindiStagingStore/StagingArea.py +++ /dev/null @@ -1,109 +0,0 @@ -from typing import Union -import os -import random -import string -import datetime -import shutil - - -class StagingArea: - """ - A staging area where files can be stored temporarily before being - consolidated and uploaded to a storage system. - - This class is a context manager, so it can be used in a `with` statement to - ensure that the staging area is cleaned up when it is no longer needed. - """ - def __init__(self, *, _directory: str) -> None: - """ - Do not call this constructor directly. Instead, use the `create` method - to create a new staging area. - """ - self._directory = os.path.abspath(_directory) - - @staticmethod - def create(*, base_dir: Union[str, None] = None, dir: Union[str, None] = None) -> 'StagingArea': - """ - Create a new staging area. Provide either `base_dir` or `dir`, but not - both. - - Parameters - ---------- - base_dir : str or None - If provided, the base directory where the staging area will be - created. The staging directory will be a subdirectory of this - directory. - dir : str or None - If provided, the exact directory where the staging area will be - created. It is okay if this directory already exists. - """ - if base_dir is not None and dir is not None: - raise ValueError("Provide either base_dir or dir, but not both") - if base_dir is not None: - dir = os.path.join(base_dir, _create_random_id()) - if dir is None: - raise ValueError("Provide either base_dir or dir") - return StagingArea(_directory=dir) - - def cleanup(self) -> None: - """ - Clean up the staging area, deleting all files in it. This method is - called automatically when the staging area is used as a context manager - in a `with` statement. - """ - if os.path.exists(self._directory): - shutil.rmtree(self._directory) - - def __enter__(self) -> 'StagingArea': - return self - - def __exit__(self, exc_type, exc_value, traceback) -> None: - self.cleanup() - - @property - def directory(self) -> str: - """ - The directory where the files are stored. - """ - return self._directory - - def store_file(self, relpath: str, value: bytes) -> str: - """ - Store a file in the staging area. - - Parameters - ---------- - relpath : str - The relative path to the file, relative to the staging area root. - value : bytes - The contents of the file. - """ - path = os.path.join(self._directory, relpath) - os.makedirs(os.path.dirname(path), exist_ok=True) - with open(path, 'wb') as f: - f.write(value) - return path - - def get_full_path(self, relpath: str) -> str: - """ - Get the full path to a file in the staging area. - - Parameters - ---------- - relpath : str - The relative path to the file, relative to the staging area root. - """ - return os.path.join(self._directory, relpath) - - -def _create_random_id(): - # This is going to be a timestamp suitable for alphabetical chronological order plus a random string - return f"{_timestamp_str()}-{_random_str(8)}" - - -def _timestamp_str(): - return datetime.datetime.now().strftime("%Y%m%d%H%M%S") - - -def _random_str(n): - return ''.join(random.choices(string.ascii_lowercase + string.digits, k=n)) diff --git a/lindi/LindiStagingStore/__init__.py b/lindi/LindiStagingStore/__init__.py deleted file mode 100644 index 7ab3d49..0000000 --- a/lindi/LindiStagingStore/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .LindiStagingStore import LindiStagingStore, StagingArea # noqa: F401 diff --git a/lindi/__init__.py b/lindi/__init__.py index be60471..6e5e7f6 100644 --- a/lindi/__init__.py +++ b/lindi/__init__.py @@ -1,6 +1,4 @@ from .LindiH5ZarrStore import LindiH5ZarrStore, LindiH5ZarrStoreOpts # noqa: F401 from .LindiH5pyFile import LindiH5pyFile, LindiH5pyGroup, LindiH5pyDataset, LindiH5pyHardLink, LindiH5pySoftLink # noqa: F401 -from .LindiStagingStore import LindiStagingStore, StagingArea # noqa: F401 from .LocalCache.LocalCache import LocalCache, ChunkTooLargeError # noqa: F401 -from .File.File import File # noqa: F401 from .LindiRemfile.additional_url_resolvers import add_additional_url_resolver # noqa: F401 diff --git a/tests/test_staging_area.py b/tests/test_staging_area.py deleted file mode 100644 index 763cb45..0000000 --- a/tests/test_staging_area.py +++ /dev/null @@ -1,66 +0,0 @@ -import tempfile -import os -import numpy as np -import lindi -import shutil - - -def test_staging_area(): - with tempfile.TemporaryDirectory() as tmpdir: - staging_area = lindi.StagingArea.create(base_dir=tmpdir + '/staging_area') - client = lindi.LindiH5pyFile.from_reference_file_system(None, mode='r+', staging_area=staging_area) - X = np.random.randn(1000, 1000).astype(np.float32) - client.create_dataset('large_array', data=X, chunks=(400, 400)) - total_size = _get_total_size_of_directory(tmpdir) - assert total_size >= X.nbytes * 0.5, f'{total_size} < {X.nbytes} * 0.5' # take into consideration compression - rfs = client.to_reference_file_system() - client2 = lindi.LindiH5pyFile.from_reference_file_system(rfs, mode='r') - assert isinstance(client2, lindi.LindiH5pyFile) - X1 = client['large_array'] - assert isinstance(X1, lindi.LindiH5pyDataset) - X2 = client2['large_array'] - assert isinstance(X2, lindi.LindiH5pyDataset) - assert np.allclose(X1[:], X2[:]) - - upload_dir = f'{tmpdir}/upload_dir' - os.makedirs(upload_dir, exist_ok=True) - output_fname = f'{tmpdir}/output.lindi.json' - - def on_upload_blob(fname: str): - random_fname = f'{upload_dir}/{_random_string(10)}' - shutil.copy(fname, random_fname) - return random_fname - - def on_upload_main(fname: str): - shutil.copy(fname, output_fname) - return output_fname - - assert client.staging_store - client.upload( - on_upload_blob=on_upload_blob, - on_upload_main=on_upload_main - ) - - client3 = lindi.LindiH5pyFile.from_lindi_file(output_fname, mode='r') - X3 = client3['large_array'] - assert isinstance(X3, lindi.LindiH5pyDataset) - assert np.allclose(X1[:], X3[:]) - - -def _get_total_size_of_directory(directory): - total_size = 0 - for dirpath, dirnames, filenames in os.walk(directory): - for f in filenames: - fp = os.path.join(dirpath, f) - total_size += os.path.getsize(fp) - return total_size - - -def _random_string(n): - import random - import string - return ''.join(random.choices(string.ascii_uppercase + string.digits, k=n)) - - -if __name__ == '__main__': - test_staging_area() From 1da28ecbc773a62b5f2825bfabcf2332f677d480 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Fri, 20 Sep 2024 07:24:27 -0400 Subject: [PATCH 2/2] remove test_lindi_file.py Remove test_lindi_file.py unit test --- tests/test_lindi_file.py | 23 ----------------------- 1 file changed, 23 deletions(-) delete mode 100644 tests/test_lindi_file.py diff --git a/tests/test_lindi_file.py b/tests/test_lindi_file.py deleted file mode 100644 index 720579a..0000000 --- a/tests/test_lindi_file.py +++ /dev/null @@ -1,23 +0,0 @@ -import tempfile -import numpy as np -import h5py -import lindi - - -def test_lindi_file(): - with tempfile.TemporaryDirectory() as tmpdir: - fname = f'{tmpdir}/test.lindi.json' - with lindi.File(fname, 'w') as f: - f.create_dataset('data', data=np.arange(500000, dtype=np.uint32), chunks=(100000,)) - - with lindi.File(fname, 'r') as f: - ds = f['data'] - assert isinstance(ds, h5py.Dataset) - assert ds.shape == (500000,) - assert ds.chunks == (100000,) - assert ds.dtype == np.uint32 - assert np.all(ds[:] == np.arange(500000, dtype=np.uint32)) - - -if __name__ == '__main__': - test_lindi_file()