Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support obstore as storage for df.to_parquet() #164

Open
machichima opened this issue Jan 23, 2025 · 20 comments
Open

Support obstore as storage for df.to_parquet() #164

machichima opened this issue Jan 23, 2025 · 20 comments

Comments

@machichima
Copy link

Currently, using obstore's AsyncFsspecStore for df.to_parquet() does not work (no error but also no files uploaded). It seems like we only need to add write() method in the BufferedFileSimple class and use the obstore.put() there.

def _open(self, path, mode="rb", **kwargs):
"""Return raw bytes-mode file-like from the file-system"""
return BufferedFileSimple(self, path, mode, **kwargs)
class BufferedFileSimple(fsspec.spec.AbstractBufferedFile):
def __init__(self, fs, path, mode="rb", **kwargs):
if mode != "rb":
raise ValueError("Only 'rb' mode is currently supported")
super().__init__(fs, path, mode, **kwargs)

I would like ask if there is any existing progress on it? If not, I can help working on this.

@machichima machichima changed the title Support use obstore as storage for df.to_parquet() Support obstore as storage for df.to_parquet() Jan 23, 2025
@kylebarron
Copy link
Member

kylebarron commented Jan 23, 2025

A PR for this would be great!

cc @martindurant who implemented it in #63

@martindurant
Copy link
Contributor

Does obstore support multi-part uploads, or would whole files need to be buffered in memory before a call to put()?

@kylebarron
Copy link
Member

kylebarron commented Jan 23, 2025

It uses multipart uploads automatically (there's some mention of this in https://developmentseed.org/obstore/latest/api/put/) (unless use_multipart is False).

You can pass a sync or async iterator to put

@martindurant
Copy link
Contributor

You can pass a sync or async iterator to put

That's not much use from the file API, I'm afraid - you need to be able to write, send some data and return; so the iterator would be the wrong way around.

@machichima
Copy link
Author

Hi @kylebarron ,

I tried it out using the iterator. However, just like what @martindurant metioned, the whole file will be bufferd in the memory, which is not ideal. Probably the Rust part needs to be updated to support this.

@martindurant
Copy link
Contributor

At the moment, obstore doesn't implement a file API at all, I guess this is in the name. As well as buffering in memory, you could also use fsspec's simplecache to buffer to local file and then upload that - also not entirely a perfect solution.

@kylebarron
Copy link
Member

At the moment, obstore doesn't implement a file API at all

obstore implements a file API for reading but it's not yet implemented for writing. See https://developmentseed.org/obstore/v0.3.0/api/file/

I'm open to having a similar API for writing that wraps https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html

@kylebarron
Copy link
Member

so the iterator would be the wrong way around.

I'm not sure I fully understand this use case, but if the iterator is the wrong way around, then maybe instead of a "push-based" iterator, you could pass in a "pull-based" file-like object? That's already supported.

@machichima
Copy link
Author

I'm not sure I fully understand this use case, but if the iterator is the wrong way around, then maybe instead of a "push-based" iterator, you could pass in a "pull-based" file-like object? That's already supported.

Hi,
The problem here is that to_parquet() will call write() multiple times, each time it will provide a chunk of data (bytes). So the thing we can pass to obstore.put() is either bytes or iterator, and not file-like object.
However, when calling obstore.put() multiple times, each with different data chunk, it overwrites the original files. Hence, what I can do now is to collect data chunks get in write() into one and send to obstore.put() at once, which load all the data into memory.

Please let me know if I understand anything wrong. Thanks!

@kylebarron
Copy link
Member

kylebarron commented Jan 28, 2025

Ah that makes sense (sorry, going through my notifications while pretty tired).

I think the easiest way to solve this is to implement the writable file API that https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html implements. (It should be pretty similar to the existing readable file API)

@kylebarron
Copy link
Member

kylebarron commented Jan 28, 2025

#167 is a stub that adds writable file support. We basically just need constructors to create the WritableFile/AsyncWritableFile and to decide on the API.

@kylebarron
Copy link
Member

Also, once we get this working, we should add an example to the docs that it works with pandas (I assume your df.to_parquet example is Pandas)

@machichima
Copy link
Author

Currently, I'm facing a small problem. df.to_parquet and df.read_parquet will pass the path with the bucket name (e.g. bucket_name/path/to/file), which is the format accepted by fsspec. However, obstore expect the path without bucket name, so bucket_name/path/to/file will create a folder named bucket_name.

What s3fs, gcsfs did is to split the path into bucket name and path for all functions that used it. Should we do something like that? And where should we add split_path, in fsspec.py or Rust?
Thanks!

@kylebarron
Copy link
Member

Can we add that as a helper function in the fsspec.py module?

@martindurant
Copy link
Contributor

The filesystems here are not yet registered with fsspec, so you can only use them explicitly. The use them with fsspec.open , you would yes, need to split off the bucket part to pass to obsstore, but also map the URL protocols to implementations.

@machichima
Copy link
Author

I've added the _split_path() helper function to split the bucket name from the path. I only added it in _open() function now. Should I also add it in other functions in fsspec.py in this PR? Or maybe open a new PR after this is merged?

@martindurant
Copy link
Contributor

Where is the PR? I can have a look.

It depends on whether we want to allow creating, say, an "obs-s3" instance which doesn't yet have an obstore instance with bucket configured, but creates them on demand. I assume your _open does something like this, so that if there were an fsspec.open() with a list of various buckets, you would end up making a bunch of obstore instances all at once.

@kylebarron
Copy link
Member

also map the URL protocols to implementations.

We support that for most URL protocols now: https://developmentseed.org/obstore/v0.4.0-beta.1/api/store/#obstore.store.from_url

Where is the PR? I can have a look.

It's the same one: #165

It depends on whether we want to allow creating, say, an "obs-s3" instance which doesn't yet have an obstore instance with bucket configured, but creates them on demand.

That could make sense. We could store a cache of obstore instances, keyed by bucket name.

@machichima
Copy link
Author

The way I use the fsspec with obstore is to create the store instance first and pass the store instance to AsyncFsspecStore. That's why I directly take out the bucket name from the path.

store = S3Store.from_env(
    "my-s3-bucket",
    endpoint="http://localhost:30002",
    access_key_id="minio",
    secret_access_key="miniostorage",
    virtual_hosted_style_request=False, # not include bucket name in the domain name
    client_options={"timeout": "99999s", "allow_http": "true"},
)

fsspec.register_implementation("s3", AsyncFsspecStore)

obstore_fs = fsspec.filesystem("s3", store=store)

path = "s3://my-s3-bucket/uploaded/file.txt" 

with obstore_fs.open(path, "wb") as f:
        f.write(b"something")

I think creating an instance in AsyncFsspecStore is better, will have a look on how to implement this and maybe open new PR? Adding it into #165 may make this PR too large.

@machichima
Copy link
Author

Hi @kylebarron @martindurant ,

I just created another PR here: #198, which use lru_cache for caching the store instance with bucket name as key. Please have a look if this way is ok, then I'll apply on all other functions.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants