From c251205cf41ca4ae262887324742ae2b082be27b Mon Sep 17 00:00:00 2001 From: Stefano Cristiano Date: Tue, 10 Sep 2024 22:07:03 +0200 Subject: [PATCH] Async: Use offsets only if explicitly set in AsyncFile{Read | Write} Also give up trying to use EVFILT_READ for buffered files on kqueue (macOS) as it can't report EOF reliably. --- Libraries/Async/Async.h | 30 +++++- Libraries/Async/Internal/AsyncLinux.inl | 4 +- Libraries/Async/Internal/AsyncPosix.inl | 52 +++++++--- Libraries/Async/Internal/AsyncWindows.inl | 12 ++- Libraries/Async/Tests/AsyncTest.cpp | 120 +++++++++++++++++++++- 5 files changed, 196 insertions(+), 22 deletions(-) diff --git a/Libraries/Async/Async.h b/Libraries/Async/Async.h index fffb024a..22488d0e 100644 --- a/Libraries/Async/Async.h +++ b/Libraries/Async/Async.h @@ -740,15 +740,27 @@ struct AsyncFileRead : public AsyncRequest Function callback; /// Callback called when some data has been read from the file into the buffer Span buffer; /// The writeable span of memory where to data will be written - uint64_t offset = 0; /// Offset from file start where to start reading. Not supported on pipes. FileDescriptor::Handle fileDescriptor; /// The file/pipe descriptor handle to read data from. /// Use SC::FileDescriptor or SC::PipeDescriptor to open it, with /// SC::FileDescriptorOpenOptions::blocking == false + /// @brief Returns the last offset set with AsyncFileRead::setOffset + uint64_t getOffset() const { return offset; } + + /// @brief Sets the offset in bytes at which start reading. + /// @note Setting file offset when reading is only possible on seekable files + void setOffset(uint64_t fileOffset) + { + useOffset = true; + offset = fileOffset; + } + private: friend struct AsyncEventLoop; - + bool useOffset = false; + uint64_t offset = 0; /// Offset from file start where to start reading. Not supported on pipes. #if SC_PLATFORM_WINDOWS + uint64_t readCursor = 0; detail::WinOverlappedOpaque overlapped; #endif }; @@ -812,13 +824,25 @@ struct AsyncFileWrite : public AsyncRequest Function callback; /// Callback called when descriptor is ready to be written with more data Span buffer; /// The read-only span of memory where to read the data from - uint64_t offset = 0; /// Offset to start writing from. Not supported on pipes. FileDescriptor::Handle fileDescriptor; /// The file/pipe descriptor to write data to. /// Use SC::FileDescriptor or SC::PipeDescriptor to open it, with /// SC::FileDescriptorOpenOptions::blocking == false + /// @brief Returns the last offset set with AsyncFileWrite::setOffset + uint64_t getOffset() const { return offset; } + + /// @brief Sets the offset in bytes at which start writing. + /// @note Setting write file offset when reading is only possible on seekable files + void setOffset(uint64_t fileOffset) + { + useOffset = true; + offset = fileOffset; + } + private: friend struct AsyncEventLoop; + bool useOffset = false; + uint64_t offset = 0xffffffffffffffff; /// Offset to start writing from. Not supported on pipes. #if SC_PLATFORM_WINDOWS detail::WinOverlappedOpaque overlapped; #endif diff --git a/Libraries/Async/Internal/AsyncLinux.inl b/Libraries/Async/Internal/AsyncLinux.inl index eadd35a6..1106a77d 100644 --- a/Libraries/Async/Internal/AsyncLinux.inl +++ b/Libraries/Async/Internal/AsyncLinux.inl @@ -471,7 +471,7 @@ struct SC::AsyncEventLoop::Internal::KernelEventsIoURing io_uring_sqe* submission; SC_TRY(getNewSubmission(async, submission)); globalLibURing.io_uring_prep_read(submission, async.fileDescriptor, async.buffer.data(), - async.buffer.sizeInBytes(), async.offset); + async.buffer.sizeInBytes(), async.useOffset ? async.offset : -1); globalLibURing.io_uring_sqe_set_data(submission, &async); return Result(true); } @@ -495,7 +495,7 @@ struct SC::AsyncEventLoop::Internal::KernelEventsIoURing io_uring_sqe* submission; SC_TRY(getNewSubmission(async, submission)); globalLibURing.io_uring_prep_write(submission, async.fileDescriptor, async.buffer.data(), - async.buffer.sizeInBytes(), 0); + async.buffer.sizeInBytes(), async.useOffset ? async.offset : -1); globalLibURing.io_uring_sqe_set_data(submission, &async); return Result(true); } diff --git a/Libraries/Async/Internal/AsyncPosix.inl b/Libraries/Async/Internal/AsyncPosix.inl index d6aaa197..3a218c5d 100644 --- a/Libraries/Async/Internal/AsyncPosix.inl +++ b/Libraries/Async/Internal/AsyncPosix.inl @@ -20,9 +20,10 @@ #include // For error handling #include // socketlen_t/getsocketopt/send/recv #include // kqueue -#include // timespec -#include // WIFEXITED / WEXITSTATUS -#include // read/write/pread/pwrite +#include +#include // timespec +#include // WIFEXITED / WEXITSTATUS +#include // read/write/pread/pwrite #endif @@ -327,7 +328,7 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix return KernelQueuePosix::setEventWatcher(async, fileDescriptor, filter); } - [[nodiscard]] static bool isDescriptorWatchable(int fd, bool& canBeWatched) + [[nodiscard]] static bool isDescriptorWriteWatchable(int fd, bool& canBeWatched) { struct stat file_stat; if (::fstat(fd, &file_stat) == -1) @@ -386,7 +387,7 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix return Result(true); } - [[nodiscard]] static constexpr bool isDescriptorWatchable(int, bool& canBeWatched) + [[nodiscard]] static constexpr bool isDescriptorWriteWatchable(int, bool& canBeWatched) { canBeWatched = true; // kevent can also watch regular buffered files (differently from epoll) return true; @@ -409,6 +410,19 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix } #endif + [[nodiscard]] static bool isDescriptorReadWatchable(int fd, bool& canBeWatched) + { + struct stat file_stat; + if (::fstat(fd, &file_stat) == -1) + { + return false; + } + // epoll doesn't support regular file descriptors + // kqueue doesn't report EOF on vnodes (regular files) for EVFILT_READ + canBeWatched = S_ISREG(file_stat.st_mode) == 0; + return true; + } + static struct timespec timerToTimespec(const Time::HighResolutionCounter& loopTime, const Time::HighResolutionCounter* nextTimer) { @@ -683,7 +697,7 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix [[nodiscard]] Result setupAsync(AsyncFileRead& async) { bool canBeWatched; - SC_TRY(isDescriptorWatchable(async.fileDescriptor, canBeWatched)); + SC_TRY(isDescriptorReadWatchable(async.fileDescriptor, canBeWatched)); if (canBeWatched) { return setEventWatcher(async, async.fileDescriptor, INPUT_EVENTS_MASK); @@ -705,19 +719,25 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix return KernelQueuePosix::stopSingleWatcherImmediate(*async.eventLoop, async.fileDescriptor, INPUT_EVENTS_MASK); } + [[nodiscard]] static Result teardownAsync(AsyncFileRead*, AsyncTeardown& teardown) + { + return KernelQueuePosix::stopSingleWatcherImmediate(*teardown.eventLoop, teardown.fileHandle, + INPUT_EVENTS_MASK); + } + [[nodiscard]] static Result executeOperation(AsyncFileRead& async, AsyncFileRead::CompletionData& completionData) { auto span = async.buffer; ssize_t res; do { - if (async.offset == 0) + if (async.useOffset) { - res = ::read(async.fileDescriptor, span.data(), span.sizeInBytes()); + res = ::pread(async.fileDescriptor, span.data(), span.sizeInBytes(), static_cast(async.offset)); } else { - res = ::pread(async.fileDescriptor, span.data(), span.sizeInBytes(), static_cast(async.offset)); + res = ::read(async.fileDescriptor, span.data(), span.sizeInBytes()); } } while ((res == -1) and (errno == EINTR)); @@ -736,7 +756,7 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix [[nodiscard]] Result setupAsync(AsyncFileWrite& async) { bool canBeWatched; - SC_TRY(isDescriptorWatchable(async.fileDescriptor, canBeWatched)); + SC_TRY(isDescriptorWriteWatchable(async.fileDescriptor, canBeWatched)); if (canBeWatched) { return setEventWatcher(async, async.fileDescriptor, OUTPUT_EVENTS_MASK); @@ -758,19 +778,25 @@ struct SC::AsyncEventLoop::Internal::KernelEventsPosix return KernelQueuePosix::stopSingleWatcherImmediate(*async.eventLoop, async.fileDescriptor, OUTPUT_EVENTS_MASK); } + [[nodiscard]] static Result teardownAsync(AsyncFileWrite*, AsyncTeardown& teardown) + { + return KernelQueuePosix::stopSingleWatcherImmediate(*teardown.eventLoop, teardown.fileHandle, + OUTPUT_EVENTS_MASK); + } + [[nodiscard]] static Result executeOperation(AsyncFileWrite& async, AsyncFileWrite::CompletionData& completionData) { auto span = async.buffer; ssize_t res; do { - if (async.offset == 0) + if (async.useOffset) { - res = ::write(async.fileDescriptor, span.data(), span.sizeInBytes()); + res = ::pwrite(async.fileDescriptor, span.data(), span.sizeInBytes(), static_cast(async.offset)); } else { - res = ::pwrite(async.fileDescriptor, span.data(), span.sizeInBytes(), static_cast(async.offset)); + res = ::write(async.fileDescriptor, span.data(), span.sizeInBytes()); } } while ((res == -1) and (errno == EINTR)); SC_TRY_MSG(res >= 0, "::write failed"); diff --git a/Libraries/Async/Internal/AsyncWindows.inl b/Libraries/Async/Internal/AsyncWindows.inl index 09c6eb6e..0203b96d 100644 --- a/Libraries/Async/Internal/AsyncWindows.inl +++ b/Libraries/Async/Internal/AsyncWindows.inl @@ -581,7 +581,13 @@ struct SC::AsyncEventLoop::Internal::KernelEvents [[nodiscard]] static Result executeOperation(AsyncFileRead& async, AsyncFileRead::CompletionData& completionData, bool synchronous = true) { - return executeFileOperation(&::ReadFile, async, completionData, synchronous, &completionData.endOfFile); + if (not async.useOffset) + { + async.offset = async.readCursor; + } + SC_TRY(executeFileOperation(&::ReadFile, async, completionData, synchronous, &completionData.endOfFile)); + async.readCursor = async.offset + async.buffer.sizeInBytes(); + return Result(true); } [[nodiscard]] static Result completeAsync(AsyncFileRead::Result& result) @@ -601,6 +607,10 @@ struct SC::AsyncEventLoop::Internal::KernelEvents [[nodiscard]] static Result executeOperation(AsyncFileWrite& async, AsyncFileWrite::CompletionData& completionData, bool synchronous = true) { + // TODO: Do the same as AsyncFileRead + // To write to the end of file, specify both the Offset and OffsetHigh members of the OVERLAPPED structure as + // 0xFFFFFFFF. This is functionally equivalent to previously calling the CreateFile function to open hFile using + // FILE_APPEND_DATA access. return executeFileOperation(&::WriteFile, async, completionData, synchronous, nullptr); } diff --git a/Libraries/Async/Tests/AsyncTest.cpp b/Libraries/Async/Tests/AsyncTest.cpp index b8b8f059..f2d706d1 100644 --- a/Libraries/Async/Tests/AsyncTest.cpp +++ b/Libraries/Async/Tests/AsyncTest.cpp @@ -84,6 +84,11 @@ struct SC::AsyncTest : public SC::TestCase fileReadWrite(false); // do not use thread-pool fileReadWrite(true); // use thread-pool } + if (test_section("file endOfFile")) + { + fileEndOfFile(false); // do not use thread-pool + fileEndOfFile(true); // use thread-pool + } if (test_section("file close")) { fileClose(); @@ -111,6 +116,7 @@ struct SC::AsyncTest : public SC::TestCase void socketClose(); void socketSendReceiveError(); void fileReadWrite(bool useThreadPool); + void fileEndOfFile(bool useThreadPool); void fileClose(); }; @@ -914,7 +920,7 @@ void SC::AsyncTest::fileReadWrite(bool useThreadPool) { SC_TEST_EXPECT(readData.sizeInBytes() == 1); params.readBuffer[params.readCount++] = readData.data()[0]; - res.getAsync().offset += readData.sizeInBytes(); + res.getAsync().setOffset(res.getAsync().getOffset() + readData.sizeInBytes()); res.reactivateRequest(true); } else @@ -949,6 +955,114 @@ void SC::AsyncTest::fileReadWrite(bool useThreadPool) SC_TEST_EXPECT(fs.removeEmptyDirectory(name)); } +void SC::AsyncTest::fileEndOfFile(bool useThreadPool) +{ + // This tests a weird edge case where doing a single file read of the entire size of file + // will not produce end of file flag + + // 1. Create ThreadPool and tasks + ThreadPool threadPool; + if (useThreadPool) + { + SC_TEST_EXPECT(threadPool.create(4)); + } + + // 2. Create EventLoop + AsyncEventLoop eventLoop; + SC_TEST_EXPECT(eventLoop.create(options)); + + // 3. Create some files on disk + StringNative<255> filePath = StringEncoding::Native; + StringNative<255> dirPath = StringEncoding::Native; + const StringView name = "AsyncTest"; + const StringView fileName = "test.txt"; + SC_TEST_EXPECT(Path::join(dirPath, {report.applicationRootDirectory, name})); + SC_TEST_EXPECT(Path::join(filePath, {dirPath.view(), fileName})); + + FileSystem fs; + SC_TEST_EXPECT(fs.init(report.applicationRootDirectory)); + SC_TEST_EXPECT(fs.makeDirectoryIfNotExists(name)); + SC_TEST_EXPECT(fs.changeDirectory(dirPath.view())); + { + char data[1024] = {0}; + SC_TEST_EXPECT(fs.write(fileName, {data, sizeof(data)})); + } + + FileDescriptor::OpenOptions openOptions; + openOptions.blocking = useThreadPool; + + FileDescriptor::Handle handle = FileDescriptor::Invalid; + FileDescriptor fd; + SC_TEST_EXPECT(fd.open(filePath.view(), FileDescriptor::ReadOnly, openOptions)); + if (not useThreadPool) + { + SC_TEST_EXPECT(eventLoop.associateExternallyCreatedFileDescriptor(fd)); + } + SC_TEST_EXPECT(fd.get(handle, Result::Error("asd"))); + + struct Context + { + int readCount = 0; + size_t readSize = 0; + } context; + AsyncFileRead asyncReadFile; + AsyncFileRead::Task asyncReadTask; + asyncReadFile.setDebugName("FileRead"); + asyncReadFile.callback = [this, &context](AsyncFileRead::Result& res) + { + Span readData; + SC_TEST_EXPECT(res.get(readData)); + if (context.readCount == 0) + { + context.readSize += readData.sizeInBytes(); + res.reactivateRequest(true); + } + else if (context.readCount == 1) + { + context.readSize += readData.sizeInBytes(); + } + else if (context.readCount == 2) + { + SC_TEST_EXPECT(res.completionData.endOfFile); + SC_TEST_EXPECT(readData.empty()); // EOF + } + else + { + SC_TEST_EXPECT(context.readCount <= 3); + } + context.readCount++; + }; + char buffer[512] = {0}; + asyncReadFile.fileDescriptor = handle; + asyncReadFile.buffer = {buffer, sizeof(buffer)}; + if (useThreadPool) + { + SC_TEST_EXPECT(asyncReadFile.start(eventLoop, threadPool, asyncReadTask)); + } + else + { + SC_TEST_EXPECT(asyncReadFile.start(eventLoop)); + } + + SC_TEST_EXPECT(eventLoop.run()); + SC_TEST_EXPECT(context.readCount == 2); + if (useThreadPool) + { + SC_TEST_EXPECT(asyncReadFile.start(eventLoop, threadPool, asyncReadTask)); + } + else + { + SC_TEST_EXPECT(asyncReadFile.start(eventLoop)); + } + SC_TEST_EXPECT(eventLoop.run()); + SC_TEST_EXPECT(context.readCount == 3); + SC_TEST_EXPECT(fd.close()); + + SC_TEST_EXPECT(fs.removeFile(fileName)); + SC_TEST_EXPECT(fs.changeDirectory(report.applicationRootDirectory)); + SC_TEST_EXPECT(fs.removeEmptyDirectory(name)); +} + void SC::AsyncTest::fileClose() { AsyncEventLoop eventLoop; @@ -1287,8 +1401,8 @@ asyncReadFile.callback = [&](AsyncFileRead::Result& res) // readData is a slice of receivedData with the received bytes console.print("Read {} bytes from file", readData.sizeInBytes()); - // IMPORTANT: Update file offset to receive next range of bytes - res.getAsync().offset += readData.sizeInBytes(); + // OPTIONAL: Update file offset to receive a different range of bytes + res.getAsync().setOffset(res.getAsync().getOffset() + readData.sizeInBytes()); // IMPORTANT: Reactivate the request to receive more data res.reactivateRequest(true);