Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fsspec storage plugin #114

Open
wants to merge 31 commits into
base: main
Choose a base branch
from

Conversation

shicheng0829
Copy link

@shicheng0829 shicheng0829 commented Oct 21, 2022

Fixes #102

dev-requirements.txt Outdated Show resolved Hide resolved
elif protocol.startswith("fsspec-"):
from torchsnapshot.storage_plugins.fsspec import FSSpecStoragePlugin

return FSSpecStoragePlugin(root=path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might run into conflicts this PR which adds the storage options to the storage plugin constructor: #108

Copy link
Author

@shicheng0829 shicheng0829 Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Merged with the branch feature/storage_kwargs.

async def delete(self, path: str) -> None:
await self._init_session()
path = os.path.join(self.root, path)
await self.fs._rm_file(path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it certain this path is always a file? do we need an explicit check and handling for directory paths?

Copy link
Author

@shicheng0829 shicheng0829 Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the input with directory path should be considered.

I think we can use self.fs._rm(path, recursive=True) to replace this.

@codecov
Copy link

codecov bot commented Oct 21, 2022

Codecov Report

Merging #114 (32358df) into main (7fd1ece) will decrease coverage by 6.10%.
The diff coverage is 26.78%.

❗ Current head 32358df differs from pull request most recent head f282366. Consider uploading reports for the commit f282366 to get more accurate results

@@            Coverage Diff             @@
##             main     #114      +/-   ##
==========================================
- Coverage   91.82%   85.71%   -6.11%     
==========================================
  Files          31       32       +1     
  Lines        2557     2612      +55     
==========================================
- Hits         2348     2239     -109     
- Misses        209      373     +164     
Impacted Files Coverage Δ
torchsnapshot/storage_plugins/gcs.py 0.00% <ø> (-78.90%) ⬇️
torchsnapshot/storage_plugin.py 44.73% <25.00%> (-23.84%) ⬇️
torchsnapshot/storage_plugins/fsspec.py 26.92% <26.92%> (ø)
torchsnapshot/storage_plugins/s3.py 26.82% <0.00%> (-63.42%) ⬇️
torchsnapshot/memoryview_stream.py 63.33% <0.00%> (-6.67%) ⬇️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@yifuwang yifuwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @shicheng0829!

path = os.path.join(self.root, write_io.path)
await self.fs._pipe_file(path, bytes(write_io.buf))

async def read(self, read_io: ReadIO) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If byte_range is present in ReadIO, we need to honor it.

IIUC, not all fsspec plugins support seek (or efficient seek). We could provide an inefficient backoff here (we can look into that later).

@dataclass
class ReadIO:
    path: str
    buf: io.BytesIO = field(default_factory=io.BytesIO)
    byte_range: Optional[Tuple[int, int]] = None

async def write(self, write_io: WriteIO) -> None:
await self._init_session()
path = os.path.join(self.root, write_io.path)
await self.fs._pipe_file(path, bytes(write_io.buf))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write_io.buf can be a memoryview, in which case bytes(write_io.buf) will result in an extra copy. Do you know if _pipe_file accepts memoryviews?

Copy link
Author

@shicheng0829 shicheng0829 Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actuallly, when I test my implement with the fsspec's s3fs plugin, it will result to error if we just put a memoryview as a _pipe_file input. Because it seems that the memoryview doesn't have the len method.

the implement of s3fs plugin is here:
https://github.com/fsspec/s3fs/blob/main/s3fs/core.py#L998

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think memoryviews have the len method (https://docs.python.org/3/c-api/buffer.html#c.Py_buffer.len). Do you mind double checking?

Copy link
Author

@shicheng0829 shicheng0829 Oct 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. After my double checking of using memoryview, I get the exception from botocore.validate:

    def serialize_to_request(self, parameters, operation_model):
        input_shape = operation_model.input_shape
        if input_shape is not None:
            report = self._param_validator.validate(
                parameters, operation_model.input_shape
            )
            if report.has_errors():
>               raise ParamValidationError(report=report.generate_report())
E               botocore.exceptions.ParamValidationError: Parameter validation failed:
E               Invalid type for parameter Body, value: <memory at 0x7fbfa202f280>, type: <class 'memoryview'>, valid types: <class 'bytes'>, <class 'bytearray'>, file-like object

../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/botocore/validate.py:381: ParamValidationError

And the call stack

../torchsnapshot/snapshot.py:226: in take
    pending_io_work.sync_complete(event_loop=event_loop)
../torchsnapshot/scheduler.py:217: in sync_complete
    event_loop.run_until_complete(self.complete())
../../../.conda/envs/torchsnapshot/lib/python3.10/asyncio/base_events.py:646: in run_until_complete
    return future.result()
../torchsnapshot/scheduler.py:200: in complete
    write_pipeline: _WritePipeline = d.result()
../torchsnapshot/scheduler.py:85: in write_buffer
    await self.storage.write(write_io=write_io)
../torchsnapshot/storage_plugins/fsspec.py:51: in write
    await self._fs._pipe_file(path, write_io.buf)
../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/s3fs/core.py:1003: in _pipe_file
    return await self._call_s3(
../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/s3fs/core.py:338: in _call_s3
    return await _error_wrapper(
../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/s3fs/core.py:138: in _error_wrapper
    raise err
../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/s3fs/core.py:111: in _error_wrapper
    return await func(*args, **kwargs)
../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/aiobotocore/client.py:321: in _make_api_call
    request_dict = await self._convert_to_request_dict(
../../../.conda/envs/torchsnapshot/lib/python3.10/site-packages/aiobotocore/client.py:386: in _convert_to_request_dict
    request_dict = self._serializer.serialize_to_request(

if self._session is None:
self._session = await self.fs.set_session()

async def write(self, write_io: WriteIO) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StoragePlugin.write is responsible for creating parent directories if there are any. For example, if write_io.path is foo/bar/baz, the method needs to ensure that both foo and foo/bar exist.

There's a caveat: not all fsspec plugins support mkdir. One example I can think of is s3fs. For such plugins, we don't need to create directories, and we can't call mkdir which will raise an exception.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, the write method should be responsible for creating parent path and not all plugins support mkdir.

I think we can raise an exception if the plugin doesn't have mkdir method.

But it seems that the s3fs also provide the mkdir method.

The implement is here:

https://github.com/fsspec/s3fs/blob/main/s3fs/core.py#L803

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it seems that the s3fs also provide the mkdir method

Thanks for the pointer. Apparently I remembered it wrong.

I think we can raise an exception if the plugin doesn't have mkdir method.

If a plugin doesn't implement mkdir, most likely it's not needed and I think we can just suppress the exception.

async def read(self, read_io: ReadIO) -> None:
await self._init_session()
path = os.path.join(self.root, read_io.path)
result = await self.fs._cat_file(path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are _pipe_file, _cat_file, and _rm_file async only APIs? IIUC, not all fsspec plugins have async support right?

Copy link
Author

@shicheng0829 shicheng0829 Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, It may differ according to the implement of fsspec storage plugin. But I investigate some storage plugin of fsspec such as s3, gcsfs and azure blob. At least _pipe, _cat, _rm async method is supported.

all 3rd storage plugin for fsspec is listed here:
https://github.com/fsspec

And for s3fs, they don't support async _open, so I use the _pipe and _cat instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your due diligence! It is very informative.

I think we can start with the assumption that these methods are present in most fsspec plugins. Later we can add a thread pool-based fall back.

@facebook-github-bot
Copy link
Contributor

Hi @shicheng0829!

Thank you for your pull request and welcome to our community.

Action Required

In order to merge any pull request (code, docs, etc.), we require contributors to sign our Contributor License Agreement, and we don't seem to have one on file for you.

Process

In order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA.

Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with CLA signed. The tagging process may take up to 1 hour after signing. Please give it that time before contacting us about it.

If you have received this in error or have any questions, please contact us at [email protected]. Thanks!

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Oct 24, 2022
@facebook-github-bot
Copy link
Contributor

Thank you for signing our Contributor License Agreement. We can now accept your code for this (and any) Meta Open Source project. Thanks!

@shicheng0829 shicheng0829 changed the title add fsspec storage plugin [WIP] Add fsspec storage plugin Oct 26, 2022
@shicheng0829 shicheng0829 changed the title [WIP] Add fsspec storage plugin Add fsspec storage plugin Oct 26, 2022
@shicheng0829
Copy link
Author

Fix some bugs and add more unit test. Please review the pr again if you have free time. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FSSpec support for TorchSnapshot?
5 participants