-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add fsspec storage plugin #114
base: main
Are you sure you want to change the base?
Conversation
Feature/fsspec plugin
torchsnapshot/storage_plugin.py
Outdated
elif protocol.startswith("fsspec-"): | ||
from torchsnapshot.storage_plugins.fsspec import FSSpecStoragePlugin | ||
|
||
return FSSpecStoragePlugin(root=path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might run into conflicts this PR which adds the storage options to the storage plugin constructor: #108
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. Merged with the branch feature/storage_kwargs.
async def delete(self, path: str) -> None: | ||
await self._init_session() | ||
path = os.path.join(self.root, path) | ||
await self.fs._rm_file(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it certain this path is always a file? do we need an explicit check and handling for directory paths?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the input with directory path should be considered.
I think we can use self.fs._rm(path, recursive=True) to replace this.
Codecov Report
@@ Coverage Diff @@
## main #114 +/- ##
==========================================
- Coverage 91.82% 85.71% -6.11%
==========================================
Files 31 32 +1
Lines 2557 2612 +55
==========================================
- Hits 2348 2239 -109
- Misses 209 373 +164
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @shicheng0829!
path = os.path.join(self.root, write_io.path) | ||
await self.fs._pipe_file(path, bytes(write_io.buf)) | ||
|
||
async def read(self, read_io: ReadIO) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If byte_range
is present in ReadIO
, we need to honor it.
IIUC, not all fsspec
plugins support seek
(or efficient seek
). We could provide an inefficient backoff here (we can look into that later).
@dataclass
class ReadIO:
path: str
buf: io.BytesIO = field(default_factory=io.BytesIO)
byte_range: Optional[Tuple[int, int]] = None
async def write(self, write_io: WriteIO) -> None: | ||
await self._init_session() | ||
path = os.path.join(self.root, write_io.path) | ||
await self.fs._pipe_file(path, bytes(write_io.buf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write_io.buf
can be a memoryview
, in which case bytes(write_io.buf)
will result in an extra copy. Do you know if _pipe_file
accepts memoryview
s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actuallly, when I test my implement with the fsspec's s3fs plugin, it will result to error if we just put a memoryview as a _pipe_file input. Because it seems that the memoryview doesn't have the len method.
the implement of s3fs plugin is here:
https://github.com/fsspec/s3fs/blob/main/s3fs/core.py#L998
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think memoryview
s have the len
method (https://docs.python.org/3/c-api/buffer.html#c.Py_buffer.len). Do you mind double checking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind. After my double checking of using memoryview, I get the exception from botocore.validate:
def serialize_to_request(self, parameters, operation_model):
input_shape = operation_model.input_shape
if input_shape is not None:
report = self._param_validator.validate(
parameters, operation_model.input_shape
)
if report.has_errors():
> raise ParamValidationError(report=report.generate_report())
E botocore.exceptions.ParamValidationError: Parameter validation failed:
E Invalid type for parameter Body, value: <memory at 0x7fbfa202f280>, type: <class 'memoryview'>, valid types: <class 'bytes'>, <class 'bytearray'>, file-like object
../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/botocore/validate.py:381: ParamValidationError
And the call stack
../torchsnapshot/snapshot.py:226: in take
pending_io_work.sync_complete(event_loop=event_loop)
../torchsnapshot/scheduler.py:217: in sync_complete
event_loop.run_until_complete(self.complete())
../../../.conda/envs/torchsnapshot/lib/python3.10/asyncio/base_events.py:646: in run_until_complete
return future.result()
../torchsnapshot/scheduler.py:200: in complete
write_pipeline: _WritePipeline = d.result()
../torchsnapshot/scheduler.py:85: in write_buffer
await self.storage.write(write_io=write_io)
../torchsnapshot/storage_plugins/fsspec.py:51: in write
await self._fs._pipe_file(path, write_io.buf)
../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/s3fs/core.py:1003: in _pipe_file
return await self._call_s3(
../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/s3fs/core.py:338: in _call_s3
return await _error_wrapper(
../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/s3fs/core.py:138: in _error_wrapper
raise err
../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/s3fs/core.py:111: in _error_wrapper
return await func(*args, **kwargs)
../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/aiobotocore/client.py:321: in _make_api_call
request_dict = await self._convert_to_request_dict(
../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/aiobotocore/client.py:386: in _convert_to_request_dict
request_dict = self._serializer.serialize_to_request(
if self._session is None: | ||
self._session = await self.fs.set_session() | ||
|
||
async def write(self, write_io: WriteIO) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StoragePlugin.write
is responsible for creating parent directories if there are any. For example, if write_io.path
is foo/bar/baz
, the method needs to ensure that both foo
and foo/bar
exist.
There's a caveat: not all fsspec
plugins support mkdir
. One example I can think of is s3fs
. For such plugins, we don't need to create directories, and we can't call mkdir
which will raise an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, the write method should be responsible for creating parent path and not all plugins support mkdir.
I think we can raise an exception if the plugin doesn't have mkdir method.
But it seems that the s3fs also provide the mkdir method.
The implement is here:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it seems that the s3fs also provide the mkdir method
Thanks for the pointer. Apparently I remembered it wrong.
I think we can raise an exception if the plugin doesn't have mkdir method.
If a plugin doesn't implement mkdir
, most likely it's not needed and I think we can just suppress the exception.
async def read(self, read_io: ReadIO) -> None: | ||
await self._init_session() | ||
path = os.path.join(self.root, read_io.path) | ||
result = await self.fs._cat_file(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are _pipe_file
, _cat_file
, and _rm_file
async only APIs? IIUC, not all fsspec plugins have async support right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, It may differ according to the implement of fsspec storage plugin. But I investigate some storage plugin of fsspec such as s3, gcsfs and azure blob. At least _pipe, _cat, _rm async method is supported.
all 3rd storage plugin for fsspec is listed here:
https://github.com/fsspec
And for s3fs, they don't support async _open, so I use the _pipe and _cat instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your due diligence! It is very informative.
I think we can start with the assumption that these methods are present in most fsspec plugins. Later we can add a thread pool-based fall back.
Hi @shicheng0829! Thank you for your pull request and welcome to our community. Action RequiredIn order to merge any pull request (code, docs, etc.), we require contributors to sign our Contributor License Agreement, and we don't seem to have one on file for you. ProcessIn order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA. Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with If you have received this in error or have any questions, please contact us at [email protected]. Thanks! |
Thank you for signing our Contributor License Agreement. We can now accept your code for this (and any) Meta Open Source project. Thanks! |
…ture/fsspec_plugin
Fix some bugs and add more unit test. Please review the pr again if you have free time. Thank you! |
Fixes #102