-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for writable file-like objects #167
Conversation
Using #[pyfunction]
pub(crate) fn open_reader(
py: Python,
store: PyObjectStore,
path: String,
) -> PyObjectStoreResult<PyReadableFile> {
let store = store.into_inner();
let runtime = get_runtime(py)?;
let meta = py.allow_threads(|| runtime.block_on(store.head(&path.into())))?;
let reader = Arc::new(Mutex::new(BufReader::new(store, &meta)));
Ok(PyReadableFile::new(reader, false))
}
#[pyfunction]
pub(crate) fn open_reader_async(py: Python, store: PyObjectStore, path: String) -> PyResult<Bound<PyAny>> {
let store = store.into_inner();
future_into_py(py, async move {
let meta = store
.head(&path.into())
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
let reader = Arc::new(Mutex::new(BufReader::new(store, &meta)));
Ok(PyReadableFile::new(reader, true))
})
}
#[pyfunction]
pub(crate) fn open_writer(
py: Python,
store: PyObjectStore,
path: String,
) -> PyResult<PyWriteableFile> {
let store = store.into_inner();
let store_path = Path::from(path);
let writer = Arc::new(Mutex::new(BufWriter::new(store, store_path)));
Ok(PyWriteableFile::new(writer, false))
}
#[pyfunction]
pub(crate) fn open_writer_async(
py: Python,
store: PyObjectStore,
path: String,
) -> PyResult<Bound<PyAny>> {
let store = store.into_inner();
future_into_py(py, async move {
let store_path = Path::from(path);
let writer = Arc::new(Mutex::new(BufWriter::new(store, store_path)));
Ok(PyWriteableFile::new(writer, true))
})
} |
Yeah pretty much! We'll also add parameters to |
Does this mean the capacity doesn't do anything, because we're always calling `put` directly?
@machichima Would you like to take a look at this before I merge? I think this should be enough to unblock #165? There are two basic tests added here. |
async def close(self) -> None: | ||
"""Close the current file.""" | ||
|
||
async def closed(self) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little unfortunate that this is a method instead of an attribute.
We need an Option
somewhere in order to be able to drop the internal BufWriter
to check that it has already been closed. (The object_store API will error if the file is closed twice, but doesn't give a way to check if the file has already been closed).
This being an async method is an artifact of storing the underlying BufWriter inside of an
Arc<Mutex<Option<BufWriter>>>
where the Mutex
is a tokio::sync::Mutex
.
Thus we need to use async to open the mutex. We could add a second layer of mutex, where the top-level mutex is a std::sync::Mutex
, but I assume that two levels of mutexes would be detrimental for performance.
Sure! I'll have a look today |
@machichima I'm going to merge but any feedback is still welcome and we can make changes in future PRs |
A wrapper around the
object_store
BufWriter
that mimics the PythonBufferedWriter
Some questions:
How should the constructor for
WritableFile
be defined? I'd like to have something like our existingopen
andopen_async
. I think I'd prefer to have a totally separate function entry point, soopen
constructs aReadableFile
and something likeopen_writer
constructs aWritableFile
. The constructors will be different enough that I don't want to have just overloads.I think
open
is too ambiguous. Maybe we should haveopen_reader
,open_reader_async
,open_writer
,open_writer_async
?Todo:
For #165, for #164