Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for writable file-like objects #167

Merged
merged 12 commits into from
Jan 29, 2025
Merged

Conversation

kylebarron
Copy link
Member

@kylebarron kylebarron commented Jan 28, 2025

A wrapper around the object_store BufWriter that mimics the Python BufferedWriter

Some questions:

  • How should the constructor for WritableFile be defined? I'd like to have something like our existing open and open_async. I think I'd prefer to have a totally separate function entry point, so open constructs a ReadableFile and something like open_writer constructs a WritableFile. The constructors will be different enough that I don't want to have just overloads.

    I think open is too ambiguous. Maybe we should have open_reader, open_reader_async, open_writer, open_writer_async?

Todo:

  • Add Python type hinting
  • Add tests
  • Add to docs
  • Context manager (abort on error)
  • BufWriter params

For #165, for #164

@machichima
Copy link

Using open_reader, open_reader_async, open_writer, open_writer_async looks good!
Probably it will be something like this? I renamed open and open_async to open_reader and open_reader_async and add open_writer and open_writer_async.

#[pyfunction]
pub(crate) fn open_reader(
    py: Python,
    store: PyObjectStore,
    path: String,
) -> PyObjectStoreResult<PyReadableFile> {
    let store = store.into_inner();
    let runtime = get_runtime(py)?;
    let meta = py.allow_threads(|| runtime.block_on(store.head(&path.into())))?;
    let reader = Arc::new(Mutex::new(BufReader::new(store, &meta)));
    Ok(PyReadableFile::new(reader, false))
}

#[pyfunction]
pub(crate) fn open_reader_async(py: Python, store: PyObjectStore, path: String) -> PyResult<Bound<PyAny>> {
    let store = store.into_inner();
    future_into_py(py, async move {
        let meta = store
            .head(&path.into())
            .await
            .map_err(PyObjectStoreError::ObjectStoreError)?;
        let reader = Arc::new(Mutex::new(BufReader::new(store, &meta)));
        Ok(PyReadableFile::new(reader, true))
    })
}

#[pyfunction]
pub(crate) fn open_writer(
    py: Python,
    store: PyObjectStore,
    path: String,
) -> PyResult<PyWriteableFile> {
    let store = store.into_inner();
    let store_path = Path::from(path);

    let writer = Arc::new(Mutex::new(BufWriter::new(store, store_path)));
    Ok(PyWriteableFile::new(writer, false))
}

#[pyfunction]
pub(crate) fn open_writer_async(
    py: Python,
    store: PyObjectStore,
    path: String,
) -> PyResult<Bound<PyAny>> {
    let store = store.into_inner();
    future_into_py(py, async move {
        let store_path = Path::from(path);
        let writer = Arc::new(Mutex::new(BufWriter::new(store, store_path)));
        Ok(PyWriteableFile::new(writer, true))
    })
}

@kylebarron
Copy link
Member Author

Probably it will be something like this?

Yeah pretty much! We'll also add parameters to open_writer for max_concurrency, tags, and attributes https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html#method.with_max_concurrency

@kylebarron
Copy link
Member Author

@machichima Would you like to take a look at this before I merge? I think this should be enough to unblock #165?

There are two basic tests added here.

async def close(self) -> None:
"""Close the current file."""

async def closed(self) -> bool:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little unfortunate that this is a method instead of an attribute.

We need an Option somewhere in order to be able to drop the internal BufWriter to check that it has already been closed. (The object_store API will error if the file is closed twice, but doesn't give a way to check if the file has already been closed).

This being an async method is an artifact of storing the underlying BufWriter inside of an

Arc<Mutex<Option<BufWriter>>>

where the Mutex is a tokio::sync::Mutex.

Thus we need to use async to open the mutex. We could add a second layer of mutex, where the top-level mutex is a std::sync::Mutex, but I assume that two levels of mutexes would be detrimental for performance.

@kylebarron kylebarron changed the title Writable file Add support for writable file-like objects Jan 28, 2025
@machichima
Copy link

Sure! I'll have a look today

@kylebarron
Copy link
Member Author

@machichima I'm going to merge but any feedback is still welcome and we can make changes in future PRs

@kylebarron kylebarron enabled auto-merge (squash) January 29, 2025 16:42
@kylebarron kylebarron merged commit 012226a into main Jan 29, 2025
4 checks passed
@kylebarron kylebarron deleted the kyle/writable-file branch January 29, 2025 16:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants