diff --git a/conanfile.py b/conanfile.py index 67e8431..004a43d 100644 --- a/conanfile.py +++ b/conanfile.py @@ -7,8 +7,6 @@ #+++++++++++-+-+--+----- --- -- - - - - from conan import ConanFile -from conan.tools.cmake import CMakeToolchain, CMake, cmake_layout, CMakeDeps -from conan.tools.files import copy import os, sys, platform @@ -16,22 +14,37 @@ #==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - # Import batt helper utilities module. # -sys.path.append(os.path.join(os.path.dirname(__file__), 'script')) -import batt +import script.batt as batt +from script.batt import VISIBLE, OVERRIDE # #+++++++++++-+-+--+----- --- -- - - - - class LlfsConan(ConanFile): name = "llfs" + + #----- --- -- - - - - # version is set automatically from Git tags - DO NOT SET IT HERE + #----- --- -- - - - - + license = "Apache Public License 2.0" + author = "The MathWorks, Inc." + url = "https://github.com/mathworks/llfs" + description = "Low-Level File System Utilities (C++)" + settings = "os", "compiler", "build_type", "arch" - options = {"shared": [True, False]} - default_options = {"shared": False} + + options = { + "shared": [True, False], + } + + default_options = { + "shared": False, + } + build_policy = "missing" exports = [ @@ -56,9 +69,7 @@ def configure(self): def requirements(self): - from batt import VISIBLE, OVERRIDE - - self.requires("batteries/0.49.6", **VISIBLE) + self.requires("batteries/0.50.2", **VISIBLE) self.requires("boost/1.83.0", **VISIBLE) self.requires("cli11/2.3.2", **VISIBLE) self.requires("glog/0.6.0", **VISIBLE) @@ -75,12 +86,12 @@ def requirements(self): #+++++++++++-+-+--+----- --- -- - - - - - from batt import set_version_from_git_tags as set_version - from batt import cmake_in_src_layout as layout - from batt import default_cmake_generate as generate - from batt import default_cmake_build as build - from batt import default_cmake_lib_package as package - from batt import default_lib_package_info as package_info - from batt import default_lib_package_id as package_id + from script.batt import set_version_from_git_tags as set_version + from script.batt import cmake_in_src_layout as layout + from script.batt import default_cmake_generate as generate + from script.batt import default_cmake_build as build + from script.batt import default_cmake_lib_package as package + from script.batt import default_lib_package_info as package_info + from script.batt import default_lib_package_id as package_id #+++++++++++-+-+--+----- --- -- - - - - diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 072997b..c70bde4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -170,6 +170,7 @@ endmacro() LLFS_DefineLibrary(llfs ./llfs batteries::batteries liburing::liburing + OpenSSL::Crypto ) #=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- @@ -219,6 +220,7 @@ target_link_libraries(llfs_fuse Boost::context Boost::stacktrace_backtrace libbacktrace::libbacktrace + OpenSSL::Crypto CLI11::CLI11 dl stdc++fs) diff --git a/src/llfs/buffer.hpp b/src/llfs/buffer.hpp index d1c4148..14ee91d 100644 --- a/src/llfs/buffer.hpp +++ b/src/llfs/buffer.hpp @@ -14,6 +14,7 @@ #include +#include #include namespace llfs { @@ -25,14 +26,18 @@ using batt::mutable_buffer_from_struct; using batt::MutableBuffer; using batt::resize_buffer; -// Returns the distance, in bytes, from `begin` to `end`. If `end` is less than `begin`, the result -// is negative. +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // +/** \brief Returns the distance, in bytes, from `begin` to `end`. If `end` is less than `begin`, + * the result is negative. + */ inline isize byte_distance(const void* begin, const void* end) { return static_cast(end) - static_cast(begin); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// /** \brief Returns an empty sequence of ConstBuffer objects. */ inline const std::vector& no_buffers() @@ -42,6 +47,41 @@ inline const std::vector& no_buffers() return no_buffers_; } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/** \brief Returns the least-upper bound (exclusive) address of the passed buffer. + */ +inline const void* get_buffer_end(const batt::ConstBuffer& buffer) noexcept +{ + return static_cast(buffer.data()) + buffer.size(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/** \brief Returns the pointer that is offset from `ptr` by `delta` bytes. + */ +inline const void* advance_pointer(const void* ptr, isize delta) +{ + return static_cast(ptr) + delta; +} + } // namespace llfs +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ + +namespace std { + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/** \brief Resizes the given buffer storage object and returns a MutableBuffer of the same size. + */ +inline llfs::MutableBuffer resize_buffer_storage(std::unique_ptr& p_storage, + llfs::usize size) +{ + p_storage.reset(new llfs::u8[size]); + return llfs::MutableBuffer{p_storage.get(), size}; +} + +} //namespace std + #endif // LLFS_BUFFER_HPP diff --git a/src/llfs/buffer.test.cpp b/src/llfs/buffer.test.cpp new file mode 100644 index 0000000..60e2ec4 --- /dev/null +++ b/src/llfs/buffer.test.cpp @@ -0,0 +1,30 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#include +// +#include + +#include +#include + +namespace { + +using namespace llfs::int_types; + +TEST(BufferTest, ResizeBufferStorage) +{ + std::unique_ptr storage; + llfs::MutableBuffer buffer = resize_buffer_storage(storage, 1977); + + EXPECT_NE(buffer.data(), nullptr); + EXPECT_EQ((void*)buffer.data(), (void*)storage.get()); + EXPECT_EQ(buffer.size(), 1977u); +} + +} // namespace diff --git a/src/llfs/ioring_buffer_pool.cpp b/src/llfs/ioring_buffer_pool.cpp index 3b92bad..67c8f55 100644 --- a/src/llfs/ioring_buffer_pool.cpp +++ b/src/llfs/ioring_buffer_pool.cpp @@ -402,6 +402,16 @@ IoRingBufferPool::Buffer::Buffer() noexcept : allocated_{nullptr} /*explicit*/ IoRingBufferPool::Buffer::Buffer(batt::SharedPtr&& allocated) noexcept : allocated_{std::move(allocated)} { + LLFS_VLOG_IF(1, this->allocated_) + << BATT_INSPECT(this->allocated_->use_count()) << "->" << (this->allocated_->use_count() + 1); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +IoRingBufferPool::Buffer::~Buffer() noexcept +{ + LLFS_VLOG_IF(1, this->allocated_) + << BATT_INSPECT(this->allocated_->use_count()) << "->" << (this->allocated_->use_count() - 1); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - diff --git a/src/llfs/ioring_buffer_pool.hpp b/src/llfs/ioring_buffer_pool.hpp index 7cc1f02..e86ebee 100644 --- a/src/llfs/ioring_buffer_pool.hpp +++ b/src/llfs/ioring_buffer_pool.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -160,6 +161,13 @@ class IoRingBufferPool */ Buffer(const Buffer&) = default; + /** \brief Destroys this Buffer object, decrementing the ref count on the underlying memory by + * one. + */ + ~Buffer() noexcept; + + //----- --- -- - - - - + /** This class is copyable; all copies share the same underlying buffer memory. When the last * copy is destroyed, the buffer is returned to the pool. */ @@ -229,12 +237,12 @@ class IoRingBufferPool using BufferVec = batt::SmallVec; - using AbstractHandler = batt::BasicAbstractHandler&&>; + using AbstractHandler = batt::BasicAbstractHandler&&>; template - using HandlerImpl = batt::BasicHandlerImpl&&>; + using HandlerImpl = batt::BasicHandlerImpl&&>; - using HandlerList = batt::BasicHandlerList&&>; + using HandlerList = batt::BasicHandlerList&&>; using BufferFreePoolList = boost::intrusive::slist, // @@ -246,9 +254,10 @@ class IoRingBufferPool * * Automatically registers the buffers in the pool with the passed IoRing context. */ - static batt::StatusOr> make_new( - const IoRing& io_ring, BufferCount count, - BufferSize size = BufferSize{Self::kMemoryUnitSize}) noexcept; + static StatusOr> make_new(const IoRing& io_ring, + BufferCount count, + BufferSize size = BufferSize{ + Self::kMemoryUnitSize}) noexcept; //+++++++++++-+-+--+----- --- -- - - - - @@ -278,30 +287,30 @@ class IoRingBufferPool /** \brief Asynchronously allocate a new buffer. Waits until a buffer becomes available and then * invokes the passed handler. * - * The signature of the handler is: `void (batt::StatusOr)`. + * The signature of the handler is: `void (StatusOr)`. */ - template &&)> + template &&)> void async_allocate(Handler&& handler) { this->async_allocate( BufferCount{1}, - batt::bind_handler( - BATT_FORWARD(handler), [](Handler&& handler, batt::StatusOr&& buffers) { + batt::bind_handler( // + BATT_FORWARD(handler), [](Handler&& handler, StatusOr&& buffers) { if (!buffers.ok()) { - BATT_FORWARD(handler)(batt::StatusOr{buffers.status()}); + BATT_FORWARD(handler)(StatusOr{buffers.status()}); return; } BATT_CHECK_EQ(buffers->size(), 1u); - BATT_FORWARD(handler)(batt::StatusOr{std::move(buffers->front())}); + BATT_FORWARD(handler)(StatusOr{std::move(buffers->front())}); })); } /** \brief Asynchronously allocate the specified number of buffers. Waits until enough buffers * become available and then invokes the passed handler. * - * The signature of the handler is: `void (batt::StatusOr)`. + * The signature of the handler is: `void (StatusOr)`. */ - template &&)> + template &&)> void async_allocate(BufferCount count, Fn&& fn) { AbstractHandler* handler = HandlerImpl::make_new(BATT_FORWARD(fn)); @@ -316,17 +325,17 @@ class IoRingBufferPool /** \brief Blocks the current Task until a buffer becomes available, then allocates and returns * it. */ - auto await_allocate() -> batt::StatusOr; + auto await_allocate() -> StatusOr; /** \brief Blocks the current Task until a buffer becomes available, then allocates and returns * it. */ - auto await_allocate(BufferCount count) -> batt::StatusOr; + auto await_allocate(BufferCount count) -> StatusOr; /** \brief Attempts to allocate a buffer without blocking; if successful, returns the buffer; else * returns batt::StatusCode::kResourceExhausted. */ - auto try_allocate() -> batt::StatusOr; + auto try_allocate() -> StatusOr; /** \brief Returns the number of buffers in the pool which are currently in use (allocated). */ diff --git a/src/llfs/ioring_buffer_view.cpp b/src/llfs/ioring_buffer_view.cpp new file mode 100644 index 0000000..372146a --- /dev/null +++ b/src/llfs/ioring_buffer_view.cpp @@ -0,0 +1,61 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#include +// + +namespace llfs { + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +template +auto BasicIoRingBufferView::split(usize byte_offset) noexcept -> Self +{ + byte_offset = std::min(byte_offset, this->slice.size()); + + Self prefix{ + this->buffer, + SliceT{this->slice.data(), byte_offset}, + }; + + this->slice += byte_offset; + + return prefix; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +template +bool BasicIoRingBufferView::can_merge_with(const Self& other) const noexcept +{ + return this->buffer == other.buffer // + && get_buffer_end(this->slice) == other.slice.data(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +template +bool BasicIoRingBufferView::merge_with(const Self& other) noexcept +{ + if (!this->can_merge_with(other)) { + return false; + } + this->slice = SliceT{ + this->slice.data(), + this->slice.size() + other.slice.size(), + }; + return true; +} + +//=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- +// Explicitly specialize for MutableBuffer and ConstBuffer only. + +template class BasicIoRingBufferView; +template class BasicIoRingBufferView; + +} //namespace llfs diff --git a/src/llfs/ioring_buffer_view.hpp b/src/llfs/ioring_buffer_view.hpp new file mode 100644 index 0000000..842e8cf --- /dev/null +++ b/src/llfs/ioring_buffer_view.hpp @@ -0,0 +1,117 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#pragma once +#ifndef LLFS_IORING_BUFFER_VIEW_HPP +#define LLFS_IORING_BUFFER_VIEW_HPP + +#include +// +#include +#include +#include + +namespace llfs { + +//=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- +// +/** \brief A read-only view (slice) of a pooled buffer. + */ +template +struct BasicIoRingBufferView { + using Self = BasicIoRingBufferView; + + //+++++++++++-+-+--+----- --- -- - - - - + + IoRingBufferPool::Buffer buffer; + SliceT slice; + + //+++++++++++-+-+--+----- --- -- - - - - + + BasicIoRingBufferView() = default; + + explicit BasicIoRingBufferView(IoRingBufferPool::Buffer&& buffer, const SliceT& slice) noexcept + : buffer{std::move(buffer)} + , slice{slice} + { + } + + explicit BasicIoRingBufferView(const IoRingBufferPool::Buffer& buffer, + const SliceT& slice) noexcept + : buffer{buffer} + , slice{slice} + { + } + + template >> + /*implicit*/ BasicIoRingBufferView(const BasicIoRingBufferView& other) noexcept + : buffer{other.buffer} + , slice{other.slice} + { + } + + template >, + typename = void> + /*implicit*/ BasicIoRingBufferView(BasicIoRingBufferView&& other) noexcept + : buffer{std::move(other.buffer)} + , slice{other.slice} + { + } + + BasicIoRingBufferView(const Self&) = default; + BasicIoRingBufferView& operator=(const Self&) = default; + + //+++++++++++-+-+--+----- --- -- - - - - + + /** \brief Returns a pointer to the start of the viewed data (`this->slice.data()`). + */ + auto* data() const noexcept + { + return this->slice.data(); + } + + /** \brief Returns the size of the viewed data (`this->slice.size()`). + */ + usize size() const noexcept + { + return this->slice.size(); + } + + /** \brief Returns true iff this->size() is 0. + */ + bool empty() const noexcept + { + return this->size() == 0; + } + + /** \brief Returns a new buffer view containing the prefix of `this` ending at + * `byte_offset`; `this` is adjusted to start at `byte_offset` (immediately after the + * returned prefix). + * + * If byte_offset is after the end of `this->slice`, then it is automatically truncated to + * `this->slice.size()`, a copy of `this` is returned, and `this` becomes empty. + */ + Self split(usize byte_offset) noexcept; + + /** \brief Returns true iff other is a view of the same Buffer as this, and other.slice + * comes immediately after this->slice. + */ + bool can_merge_with(const Self& other) const noexcept; + + /** \brief If this->can_merge_with(other) would return true, then this view's slice is + * extended to include other.slice and true is returned. Otherwise returns false. + */ + bool merge_with(const Self& other) noexcept; +}; + +using IoRingConstBufferView = BasicIoRingBufferView; +using IoRingMutableBufferView = BasicIoRingBufferView; + +} //namespace llfs + +#endif // LLFS_IORING_BUFFER_VIEW_HPP diff --git a/src/llfs/ioring_buffer_view.test.cpp b/src/llfs/ioring_buffer_view.test.cpp new file mode 100644 index 0000000..92f207f --- /dev/null +++ b/src/llfs/ioring_buffer_view.test.cpp @@ -0,0 +1,507 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#include +// +#include + +#include +#include + +namespace { + +// Test Plan: +// 1. Construct with empty buffer/slice. +// 2. Construct with non-empty buffer/slice. +// 3. can_merge_with +// a. merge empty with empty, can_merge == true +// b. merge empty with non-empty, can_merge == true +// c. merge non-empty with empty, can_merge == true +// d. merge non-empty with non-empty, can_merge == true +// e. merge empty with empty, can_merge == false +// f. merge empty with non-empty, can_merge == false +// g. merge non-empty with empty, can_merge == false +// h. merge non-empty with non-empty, can_merge == false +// 4. merge_with +// a. merge empty with empty, can_merge == true +// b. merge empty with non-empty, can_merge == true +// c. merge non-empty with empty, can_merge == true +// d. merge non-empty with non-empty, can_merge == true +// e. merge empty with empty, can_merge == false +// f. merge empty with non-empty, can_merge == false +// g. merge non-empty with empty, can_merge == false +// h. merge non-empty with non-empty, can_merge == false + +using namespace llfs::int_types; + +usize kTestBufferCount = 4; +usize kTestBufferSize = 4096; + +class IoringBufferViewTest : public ::testing::Test +{ + public: + void SetUp() override + { + this->scoped_io_ring = + llfs::ScopedIoRing::make_new(llfs::MaxQueueDepth{8}, llfs::ThreadPoolSize{1}); + + ASSERT_TRUE(this->scoped_io_ring.ok()) << BATT_INSPECT(this->scoped_io_ring.status()); + + this->io_ring = std::addressof(this->scoped_io_ring->get_io_ring()); + + this->status_or_buffer_pool = llfs::IoRingBufferPool::make_new( + *this->io_ring, llfs::BufferCount{kTestBufferCount}, llfs::BufferSize{kTestBufferSize}); + + ASSERT_TRUE(this->status_or_buffer_pool.ok()) + << BATT_INSPECT(this->status_or_buffer_pool.status()); + + this->buffer_pool = this->status_or_buffer_pool->get(); + + this->buffer_1 = this->buffer_pool->await_allocate(); + + ASSERT_TRUE(this->buffer_1.ok()) << BATT_INSPECT(this->buffer_1.status()); + } + + llfs::StatusOr scoped_io_ring; + + const llfs::IoRing* io_ring = nullptr; + + llfs::StatusOr> status_or_buffer_pool; + + llfs::IoRingBufferPool* buffer_pool = nullptr; + + llfs::StatusOr buffer_1; +}; + +//+++++++++++-+-+--+----- --- -- - - - - +// 1. Construct with empty buffer/slice. +// +TEST_F(IoringBufferViewTest, ConstructEmpty) +{ + llfs::IoRingConstBufferView view; + + EXPECT_EQ(view.slice.data(), nullptr); + EXPECT_EQ(view.slice.size(), 0u); + EXPECT_EQ(view.data(), nullptr); + EXPECT_EQ(view.size(), 0u); + + EXPECT_FALSE(view.buffer); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 2. Construct with non-empty buffer/slice. +// +TEST_F(IoringBufferViewTest, ConstructNonEmpty) +{ + llfs::IoRingConstBufferView view{ + *this->buffer_1, + this->buffer_1->get(), + }; + + EXPECT_NE(view.slice.data(), nullptr); + EXPECT_EQ(view.slice.data(), this->buffer_1->data()); + EXPECT_EQ(view.slice.size(), kTestBufferSize); + EXPECT_EQ(view.data(), this->buffer_1->data()); + EXPECT_EQ(view.size(), kTestBufferSize); + + EXPECT_TRUE(view.buffer); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 3. can_merge_with +// a. merge empty with empty, can_merge == true +// 4. merge_with +// a. merge empty with empty, can_merge == true +// +TEST_F(IoringBufferViewTest, MergeEmptyWithEmptyTrue) +{ + llfs::IoRingConstBufferView view_1{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 0}, + }; + + llfs::IoRingConstBufferView view_2 = view_1; + + EXPECT_TRUE(view_1.can_merge_with(view_2)); + EXPECT_TRUE(view_1.merge_with(view_2)); + + EXPECT_EQ(view_1.slice.data(), this->buffer_1->data()); + EXPECT_EQ(view_1.slice.size(), 0u); + EXPECT_EQ(view_1.data(), this->buffer_1->data()); + EXPECT_EQ(view_1.size(), 0u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 3. can_merge_with +// b. merge empty with non-empty, can_merge == true +// +// 4. merge_with +// b. merge empty with non-empty, can_merge == true +// +TEST_F(IoringBufferViewTest, MergeEmptyWithNonEmptyTrue) +{ + llfs::IoRingConstBufferView view_1{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 0}, + }; + + llfs::IoRingConstBufferView view_2{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 100}, + }; + + EXPECT_EQ(view_1.slice.size(), 0u); + + EXPECT_TRUE(view_1.can_merge_with(view_2)); + EXPECT_TRUE(view_1.merge_with(view_2)); + + EXPECT_EQ(view_1.slice.data(), this->buffer_1->data()); + EXPECT_EQ(view_1.slice.size(), 100u); + EXPECT_EQ(view_1.data(), this->buffer_1->data()); + EXPECT_EQ(view_1.size(), 100u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 3. can_merge_with +// c. merge non-empty with empty, can_merge == true +// 4. merge_with +// c. merge non-empty with empty, can_merge == true +// +TEST_F(IoringBufferViewTest, MergeNonEmptyWithEmptyTrue) +{ + llfs::IoRingConstBufferView view_1{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 100}, + }; + + llfs::IoRingConstBufferView view_2{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 100} + 100, + }; + + EXPECT_EQ(view_1.slice.size(), 100u); + EXPECT_EQ(view_2.slice.size(), 0u); + EXPECT_EQ(view_1.size(), 100u); + EXPECT_EQ(view_2.size(), 0u); + + EXPECT_TRUE(view_1.can_merge_with(view_2)); + EXPECT_TRUE(view_1.merge_with(view_2)); + + EXPECT_EQ(view_1.slice.data(), this->buffer_1->data()); + EXPECT_EQ(view_1.slice.size(), 100u); + EXPECT_EQ(view_1.data(), this->buffer_1->data()); + EXPECT_EQ(view_1.size(), 100u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 3. can_merge_with +// d. merge non-empty with non-empty, can_merge == true +// 4. merge_with +// d. merge non-empty with non-empty, can_merge == true +// +TEST_F(IoringBufferViewTest, MergeNonEmptyWithNonEmptyTrue) +{ + llfs::IoRingConstBufferView view_1{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 100}, + }; + + llfs::IoRingConstBufferView view_2{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 200} + 100, + }; + + EXPECT_EQ(view_1.slice.size(), 100u); + EXPECT_EQ(view_1.size(), 100u); + + EXPECT_TRUE(view_1.can_merge_with(view_2)); + EXPECT_TRUE(view_1.merge_with(view_2)); + + EXPECT_EQ(view_1.slice.data(), this->buffer_1->data()); + EXPECT_EQ(view_1.slice.size(), 200u); + EXPECT_EQ(view_1.data(), this->buffer_1->data()); + EXPECT_EQ(view_1.size(), 200u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 3. can_merge_with +// e. merge empty with empty, can_merge == false +// 4. merge_with +// e. merge empty with empty, can_merge == false +// +TEST_F(IoringBufferViewTest, MergeEmptyWithEmptyFalse) +{ + llfs::IoRingConstBufferView view_1{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 100} + 100, + }; + + llfs::IoRingConstBufferView view_2{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 200} + 200, + }; + + EXPECT_EQ(view_1.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 100)); + EXPECT_EQ(view_1.slice.size(), 0u); + EXPECT_EQ(view_2.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 200)); + EXPECT_EQ(view_2.slice.size(), 0u); + EXPECT_EQ(view_1.data(), llfs::advance_pointer(this->buffer_1->data(), 100)); + EXPECT_EQ(view_1.size(), 0u); + EXPECT_EQ(view_2.data(), llfs::advance_pointer(this->buffer_1->data(), 200)); + EXPECT_EQ(view_2.size(), 0u); + + EXPECT_FALSE(view_1.can_merge_with(view_2)); + EXPECT_FALSE(view_1.merge_with(view_2)); + + EXPECT_EQ(view_1.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 100)); + EXPECT_EQ(view_1.slice.size(), 0u); + EXPECT_EQ(view_2.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 200)); + EXPECT_EQ(view_2.slice.size(), 0u); + EXPECT_EQ(view_1.data(), llfs::advance_pointer(this->buffer_1->data(), 100)); + EXPECT_EQ(view_1.size(), 0u); + EXPECT_EQ(view_2.data(), llfs::advance_pointer(this->buffer_1->data(), 200)); + EXPECT_EQ(view_2.size(), 0u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 3. can_merge_with +// f. merge empty with non-empty, can_merge == false +// 4. merge_with +// f. merge empty with non-empty, can_merge == false +// +TEST_F(IoringBufferViewTest, MergeEmptyWithNonEmptyFalse) +{ + llfs::IoRingConstBufferView view_1{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 100} + 100, + }; + + llfs::IoRingConstBufferView view_2{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 200} + 150, + }; + + EXPECT_EQ(view_1.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 100)); + EXPECT_EQ(view_1.slice.size(), 0u); + EXPECT_EQ(view_2.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 150)); + EXPECT_EQ(view_2.slice.size(), 50u); + EXPECT_EQ(view_1.data(), llfs::advance_pointer(this->buffer_1->data(), 100)); + EXPECT_EQ(view_1.size(), 0u); + EXPECT_EQ(view_2.data(), llfs::advance_pointer(this->buffer_1->data(), 150)); + EXPECT_EQ(view_2.size(), 50u); + + EXPECT_FALSE(view_1.can_merge_with(view_2)); + EXPECT_FALSE(view_1.merge_with(view_2)); + + EXPECT_EQ(view_1.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 100)); + EXPECT_EQ(view_1.slice.size(), 0u); + EXPECT_EQ(view_2.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 150)); + EXPECT_EQ(view_2.slice.size(), 50u); + EXPECT_EQ(view_1.data(), llfs::advance_pointer(this->buffer_1->data(), 100)); + EXPECT_EQ(view_1.size(), 0u); + EXPECT_EQ(view_2.data(), llfs::advance_pointer(this->buffer_1->data(), 150)); + EXPECT_EQ(view_2.size(), 50u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 3. can_merge_with +// g. merge non-empty with empty, can_merge == false +// 4. merge_with +// g. merge non-empty with empty, can_merge == false +// +TEST_F(IoringBufferViewTest, MergeNonEmptyWithEmptyFalse) +{ + llfs::IoRingConstBufferView view_1{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 100} + 50, + }; + + llfs::IoRingConstBufferView view_2{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 200} + 200, + }; + + EXPECT_EQ(view_1.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 50)); + EXPECT_EQ(view_1.slice.size(), 50u); + EXPECT_EQ(view_2.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 200)); + EXPECT_EQ(view_2.slice.size(), 0u); + EXPECT_EQ(view_1.data(), llfs::advance_pointer(this->buffer_1->data(), 50)); + EXPECT_EQ(view_1.size(), 50u); + EXPECT_EQ(view_2.data(), llfs::advance_pointer(this->buffer_1->data(), 200)); + EXPECT_EQ(view_2.size(), 0u); + + EXPECT_FALSE(view_1.can_merge_with(view_2)); + EXPECT_FALSE(view_1.merge_with(view_2)); + + EXPECT_EQ(view_1.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 50)); + EXPECT_EQ(view_1.slice.size(), 50u); + EXPECT_EQ(view_2.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 200)); + EXPECT_EQ(view_2.slice.size(), 0u); + EXPECT_EQ(view_1.data(), llfs::advance_pointer(this->buffer_1->data(), 50)); + EXPECT_EQ(view_1.size(), 50u); + EXPECT_EQ(view_2.data(), llfs::advance_pointer(this->buffer_1->data(), 200)); + EXPECT_EQ(view_2.size(), 0u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 3. can_merge_with +// h. merge non-empty with non-empty, can_merge == false +// 4. merge_with +// h. merge non-empty with non-empty, can_merge == false +// +TEST_F(IoringBufferViewTest, MergeNonEmptyWithNonEmptyFalse) +{ + llfs::IoRingConstBufferView view_1{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 100} + 50, + }; + + llfs::IoRingConstBufferView view_2{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 200} + 150, + }; + + EXPECT_EQ(view_1.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 50)); + EXPECT_EQ(view_1.slice.size(), 50u); + EXPECT_EQ(view_2.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 150)); + EXPECT_EQ(view_2.slice.size(), 50u); + EXPECT_EQ(view_1.data(), llfs::advance_pointer(this->buffer_1->data(), 50)); + EXPECT_EQ(view_1.size(), 50u); + EXPECT_EQ(view_2.data(), llfs::advance_pointer(this->buffer_1->data(), 150)); + EXPECT_EQ(view_2.size(), 50u); + + EXPECT_FALSE(view_1.can_merge_with(view_2)); + EXPECT_FALSE(view_1.merge_with(view_2)); + + EXPECT_EQ(view_1.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 50)); + EXPECT_EQ(view_1.slice.size(), 50u); + EXPECT_EQ(view_2.slice.data(), llfs::advance_pointer(this->buffer_1->data(), 150)); + EXPECT_EQ(view_2.slice.size(), 50u); + EXPECT_EQ(view_1.data(), llfs::advance_pointer(this->buffer_1->data(), 50)); + EXPECT_EQ(view_1.size(), 50u); + EXPECT_EQ(view_2.data(), llfs::advance_pointer(this->buffer_1->data(), 150)); + EXPECT_EQ(view_2.size(), 50u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// IoRingMutableBufferView => IoRingConstBufferView. +// +TEST_F(IoringBufferViewTest, ConstFromMutable) +{ + const llfs::IoRingMutableBufferView mutable_view{ + *this->buffer_1, + batt::MutableBuffer{this->buffer_1->data(), 100}, + }; + + EXPECT_NE(mutable_view.data(), nullptr); + EXPECT_EQ(mutable_view.data(), this->buffer_1->data()); + EXPECT_EQ(mutable_view.size(), 100u); + EXPECT_EQ(mutable_view.slice.data(), this->buffer_1->data()); + EXPECT_EQ(mutable_view.slice.size(), 100u); + + static_assert(std::is_same_v); + + { + const llfs::IoRingConstBufferView const_view = mutable_view; + + static_assert(std::is_same_v); + + EXPECT_EQ(const_view.data(), this->buffer_1->data()); + EXPECT_EQ(const_view.size(), 100u); + } + { + llfs::IoRingConstBufferView const_view; + const_view = mutable_view; + + EXPECT_EQ(const_view.data(), this->buffer_1->data()); + EXPECT_EQ(const_view.size(), 100u); + } +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringBufferViewTest, Split) +{ + const llfs::IoRingConstBufferView non_empty_view{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 100} + 50, + }; + + const llfs::IoRingConstBufferView empty_view{ + *this->buffer_1, + batt::ConstBuffer{this->buffer_1->data(), 0}, + }; + + // split: empty -> at 0 + { + llfs::IoRingConstBufferView view = empty_view; + llfs::IoRingConstBufferView prefix = view.split(0); + + EXPECT_EQ(prefix.data(), this->buffer_1->data()); + EXPECT_EQ(prefix.size(), 0u); + EXPECT_EQ(view.data(), this->buffer_1->data()); + EXPECT_EQ(view.size(), 0u); + } + + // split: empty -> past 0 + { + llfs::IoRingConstBufferView view = empty_view; + llfs::IoRingConstBufferView prefix = view.split(1); + + EXPECT_EQ(prefix.data(), this->buffer_1->data()); + EXPECT_EQ(prefix.size(), 0u); + EXPECT_EQ(view.data(), this->buffer_1->data()); + EXPECT_EQ(view.size(), 0u); + } + + // split: non-empty -> at 0 + { + llfs::IoRingConstBufferView view = non_empty_view; + llfs::IoRingConstBufferView prefix = view.split(0); + + EXPECT_EQ(prefix.data(), llfs::advance_pointer(this->buffer_1->data(), 50)); + EXPECT_EQ(prefix.size(), 0u); + EXPECT_EQ(view.data(), llfs::advance_pointer(this->buffer_1->data(), 50)); + EXPECT_EQ(view.size(), 50u); + } + + // split: non-empty -> >0 buffer_1->data(), 50)); + EXPECT_EQ(prefix.size(), 10u); + EXPECT_EQ(view.data(), llfs::advance_pointer(this->buffer_1->data(), 60)); + EXPECT_EQ(view.size(), 40u); + } + + // split: non-empty -> at end + { + llfs::IoRingConstBufferView view = non_empty_view; + llfs::IoRingConstBufferView prefix = view.split(50); + + EXPECT_EQ(prefix.data(), llfs::advance_pointer(this->buffer_1->data(), 50)); + EXPECT_EQ(prefix.size(), 50u); + EXPECT_EQ(view.data(), llfs::advance_pointer(this->buffer_1->data(), 100)); + EXPECT_EQ(view.size(), 0u); + } + + // split: non-empty -> past end + { + llfs::IoRingConstBufferView view = non_empty_view; + llfs::IoRingConstBufferView prefix = view.split(51); + + EXPECT_EQ(prefix.data(), llfs::advance_pointer(this->buffer_1->data(), 50)); + EXPECT_EQ(prefix.size(), 50u); + EXPECT_EQ(view.data(), llfs::advance_pointer(this->buffer_1->data(), 100)); + EXPECT_EQ(view.size(), 0u); + } +} + +} // namespace diff --git a/src/llfs/ioring_stream_buffer.cpp b/src/llfs/ioring_stream_buffer.cpp new file mode 100644 index 0000000..80cf8bd --- /dev/null +++ b/src/llfs/ioring_stream_buffer.cpp @@ -0,0 +1,299 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#include +// +#include + +#include + +namespace llfs { + +//=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- +// class IoRingStreamBuffer + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/*explicit*/ IoRingStreamBuffer::IoRingStreamBuffer(IoRingBufferPool& buffer_pool) + : shared_buffer_pool_{buffer_pool} +{ +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +Status IoRingStreamBuffer::initialize() noexcept +{ + if (!this->private_buffer_pool_) { + BATT_ASSIGN_OK_RESULT( + IoRingBufferPool::BufferVec buffers, + this->shared_buffer_pool_.await_allocate(BufferCount{this->queue_capacity_})); + + this->private_buffer_pool_.emplace(std::move(buffers)); + } + return batt::OkStatus(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +usize IoRingStreamBuffer::size() const noexcept +{ + // NOTE: we must read these in this order to make sure we don't (falsely) observe an + // inconsistent state due to race conditions. + // + const i64 observed_consume_pos = this->consume_pos_.get_value(); + const i64 observed_commit_pos = this->commit_pos_.get_value(); + + BATT_CHECK_GE(observed_commit_pos, observed_consume_pos); + + // Clamp to the known maximum capacity. + // + return std::min(this->max_size(), + BATT_CHECKED_CAST(usize, observed_commit_pos - observed_consume_pos)); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void IoRingStreamBuffer::close() +{ + { + batt::ScopedLock locked{this->queue_}; + this->end_of_stream_.store(true); + this->check_for_end_of_stream(locked); + } +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +StatusOr IoRingStreamBuffer::prepare() +{ + LLFS_VLOG(1) << "minimum consume pos reached; allocating buffer..."; + + if (this->end_of_stream_.load()) { + return {batt::StatusCode::kClosed}; + } + + // Allocate a buffer from the pool. + // + BATT_REQUIRE_OK(this->initialize()); + BATT_ASSIGN_OK_RESULT(IoRingBufferPool::Buffer buffer, + this->private_buffer_pool_->await_allocate()); + + LLFS_VLOG(1) << "buffer allocated;" << BATT_INSPECT(buffer.size()); + + MutableBuffer slice = buffer.get(); + + return IoRingMutableBufferView{std::move(buffer), slice}; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void IoRingStreamBuffer::commit(BufferView&& view) +{ + BATT_CHECK_EQ(std::addressof(view.buffer.pool()), std::addressof(*this->private_buffer_pool_)) + << "IoRingStreamBuffer::commit only accepts buffer view objects for buffers returned from " + "IoRingStreamBuffer::prepare on the same stream."; + + usize byte_count = view.slice.size(); + { + batt::ScopedLock locked{this->queue_}; + + locked->push(std::move(view)); + + BATT_CHECK_LE(locked->view_count(), this->queue_capacity_); + } + this->commit_pos_.fetch_add(byte_count); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +auto IoRingStreamBuffer::consume(i64 start, i64 end) -> StatusOr +{ + LLFS_VLOG(1) << "IoRingStreamBuffer::consume(" << start << ", " << end << ")" + << BATT_INSPECT(this->commit_pos_.get_value()); + + // Wait for the target range to be committed (i.e., wait for the commit pos to be >= end). + // + StatusOr final_commit_pos = this->commit_pos_.await_true([&](i64 observed_commit_pos) { + return observed_commit_pos - end >= 0; + }); + BATT_REQUIRE_OK(final_commit_pos); + + LLFS_VLOG(1) + << "IoRingStreamBuffer::consume() commit_pos reached; waiting for all prev consumers to " + "finish..."; + + // Wait for all consumers of lower ranges to update the consume pos. + // + Status consume_pos_reached = this->consume_pos_.await_equal(start); + BATT_REQUIRE_OK(consume_pos_reached); + + LLFS_VLOG(1) << "IoRingStreamBuffer::consume() grabbing data and returning!" + << BATT_INSPECT(this->consume_pos_.get_value()); + + Fragment result; + { + batt::ScopedLock locked{this->queue_}; + + BATT_CHECK_GE(end, start); + const usize n_to_pop = end - start; + + result = locked->pop(n_to_pop); + this->check_for_end_of_stream(locked); + } + this->consume_pos_.set_value(end); + + return result; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +auto IoRingStreamBuffer::consume_some() -> StatusOr +{ + const i64 observed_consume_pos = this->consume_pos_.get_value(); + + // Wait for the commit pos to advance past the consume pos. + // + StatusOr final_commit_pos = + this->commit_pos_.await_true([&observed_consume_pos](i64 observed_commit_pos) { + return observed_commit_pos - observed_consume_pos > 0; + }); + + if (BATT_HINT_FALSE(final_commit_pos.status() == batt::StatusCode::kClosed)) { + const i64 observed_commit_pos = this->commit_pos_.get_value(); + if (observed_commit_pos - observed_consume_pos > 0) { + BATT_UNTESTED_LINE(); + final_commit_pos = observed_commit_pos; + } + } + BATT_REQUIRE_OK(final_commit_pos); + + Fragment result; + { + batt::ScopedLock locked{this->queue_}; + + std::swap(result, *locked); + this->consume_pos_.fetch_add(BATT_CHECKED_CAST(i64, result.byte_size())); + // ^^ + // We must check for end-of-stream *after* updating consume_pos to make sure no data is + // dropped at the end of the stream. + // vv + this->check_for_end_of_stream(locked); + + if (BATT_HINT_FALSE(result.empty() && this->commit_pos_.is_closed())) { + BATT_UNTESTED_LINE(); + return {batt::StatusCode::kClosed}; + } + } + + return result; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +usize IoRingStreamBuffer::buffer_size() const noexcept +{ + return this->shared_buffer_pool_.buffer_size(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void IoRingStreamBuffer::check_for_end_of_stream(batt::ScopedLock& locked) +{ + if (this->end_of_stream_.load() && locked->empty()) { + this->commit_pos_.close(); + this->consume_pos_.close(); + } +} + +//=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- +// class IoRingStreamBuffer::Fragment + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +bool IoRingStreamBuffer::Fragment::empty() const noexcept +{ + return this->views_.empty(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +usize IoRingStreamBuffer::Fragment::view_count() const noexcept +{ + return this->views_.size(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void IoRingStreamBuffer::Fragment::push(BufferView&& view) +{ + if (!this->views_.empty() && this->views_.back().merge_with(view)) { + return; + } + this->views_.emplace_back(std::move(view)); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +auto IoRingStreamBuffer::Fragment::pop(usize max_byte_count) -> Fragment +{ + usize bytes_popped = 0; + Fragment result; + + while (!this->views_.empty() && bytes_popped < max_byte_count) { + BufferView& this_view = this->views_.front(); + + const usize bytes_this_view = std::min(this_view.slice.size(), max_byte_count - bytes_popped); + + result.views_.emplace_back(BufferView{ + this_view.buffer, + ConstBuffer{ + this_view.slice.data(), + bytes_this_view, + }, + }); + + bytes_popped += bytes_this_view; + this_view.slice += bytes_this_view; + + if (this_view.slice.size() == 0) { + this->views_.erase(this->views_.begin()); + } + } + + return result; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +usize IoRingStreamBuffer::Fragment::byte_size() const noexcept +{ + return this->as_seq() // + | batt::seq::map([](const BufferView& view) -> usize { + return view.slice.size(); + }) // + | batt::seq::sum(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +ConstBuffer IoRingStreamBuffer::Fragment::gather_impl(MutableBuffer dst) const noexcept +{ + const void* dst_begin = dst.data(); + usize n_copied = 0; + + for (const BufferView& view : this->views_) { + const usize n_to_copy = std::min(view.slice.size(), dst.size()); + std::memcpy(dst.data(), view.slice.data(), n_to_copy); + dst += n_to_copy; + n_copied += n_to_copy; + } + + return ConstBuffer{dst_begin, n_copied}; +} + +} //namespace llfs diff --git a/src/llfs/ioring_stream_buffer.hpp b/src/llfs/ioring_stream_buffer.hpp new file mode 100644 index 0000000..f7351d6 --- /dev/null +++ b/src/llfs/ioring_stream_buffer.hpp @@ -0,0 +1,216 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#pragma once +#ifndef LLFS_IORING_STREAM_BUFFER_HPP +#define LLFS_IORING_STREAM_BUFFER_HPP + +#include +// +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace llfs { + +//=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- +// +/** \brief A zero-copy stream of IoRing pooled buffers. + */ +class IoRingStreamBuffer +{ + public: + /** \brief The maximum number of buffers of committed data (i.e., instances of + * IoRingBufferPool::Buffer allocated from the underlying pool passed in at construction time) + * that a stream can contain. + */ + static constexpr usize kMaxBuffersCapacity = 2; + + using BufferView = IoRingConstBufferView; + using PreparedView = IoRingMutableBufferView; + + //+++++++++++-+-+--+----- --- -- - - - - + // + /** \brief A (short) sequence of IoRingStreamBuffer::BufferView slices. + */ + struct Fragment { + public: + /** \brief Returns the contents of this queue as a Seq of const BufferView&. + */ + auto as_seq() const noexcept + { + return batt::as_seq(this->views_); + } + + /** \brief Returns true iff this sequence is empty. + */ + bool empty() const noexcept; + + /** \brief Returns the number of BufferView slices in this queue. + */ + usize view_count() const noexcept; + + /** \brief Returns the total bytes count across all views in this. + */ + usize byte_size() const noexcept; + + /** \brief Pushes the specified BufferView onto the end of this sequence. + */ + void push(BufferView&& view); + + /** \brief Removes up to the specified number of bytes from the beginning of this sequence, + * returning the resulting BufferView slices as a Fragment. + */ + Fragment pop(usize max_byte_count); + + /** \brief Consolidates the contents of this Fragment as a single contiguous chunk of + * data. + */ + template + ConstBuffer gather(std::variant& storage) const noexcept + { + if (this->views_.empty()) { + return ConstBuffer{}; + } + + if (this->views_.size() == 1) { + storage.template emplace(this->views_.front().buffer); + return this->views_.front().slice; + } + + T& typed_storage = storage.template emplace(); + + return this->gather_impl(resize_buffer_storage(typed_storage, this->byte_size())); + } + + //----- --- -- - - - - + private: + ConstBuffer gather_impl(MutableBuffer dst) const noexcept; + + //----- --- -- - - - - + + batt::SmallVec views_; + }; + + //+++++++++++-+-+--+----- --- -- - - - - + + explicit IoRingStreamBuffer(IoRingBufferPool& buffer_pool); + + /** \brief IoRingStreamBuffer is not copyable. + */ + IoRingStreamBuffer(const IoRingStreamBuffer&) = delete; + + /** \brief IoRingStreamBuffer is not copyable. + */ + IoRingStreamBuffer& operator=(const IoRingStreamBuffer&) = delete; + + //+++++++++++-+-+--+----- --- -- - - - - + + /** \brief Returns the maximum size (in bytes) that the stream buffer can hold. + */ + usize max_size() const noexcept + { + return this->queue_capacity_ * this->buffer_size(); + } + + /** \brief Returns the current size (in bytes) of the buffered data available in this stream. + */ + usize size() const noexcept; + + /** \brief Closes the stream for writing. If there is data in the stream, it is still consumable; + * the stream is now in "draining" mode. + */ + void close(); + + /** \brief Allocate from the pool a buffer that will later be inserted into the stream via + * IoRingStreamBuffer::commit. + * + * NOT SAFE to invoke concurrently on the same object. + */ + StatusOr prepare(); + + /** \brief Inserts part of all of a previously allocated IoRingBufferPool::Buffer into the stream. + * BufferView objects committed to the stream using this function must be adjacent, + * in-order slices of Buffers allocated by `this->prepare()`. + */ + void commit(BufferView&& view); + + /** \brief Consume a specific range of the stream. Will block until this range is available. + */ + StatusOr consume(i64 start, i64 end); + + /** \brief Returns (and removes) the next non-empty sequence of available committed data from the + * stream. + */ + StatusOr consume_some(); + + /** \brief The size of the buffers in the pool from which this stream allocates. + */ + usize buffer_size() const noexcept; + + //+++++++++++-+-+--+----- --- -- - - - - + private: + /** \brief Checks to see if the stream has been closed for write _and_ all data has been drained; + * if both of these conditions are met, then all waiting consumers are unblocked (with kClosed + * status code). + */ + void check_for_end_of_stream(batt::ScopedLock& locked); + + /** \brief Performs one-time initialization of this->private_buffer_pool_. + */ + Status initialize() noexcept; + + //+++++++++++-+-+--+----- --- -- - - - - + + const usize queue_capacity_ = kMaxBuffersCapacity; + + /** \brief The pool from which registered memory buffers are allocated. + */ + IoRingBufferPool& shared_buffer_pool_; + + /** \brief A subpool of `this->shared_buffer_pool_` that guarantees that the maximum number of + * buffers will be available for allocation via this->prepare. + */ + Optional private_buffer_pool_; + + /** \brief The committed data in the stream. + */ + batt::Mutex queue_; + + /** \brief The offset in bytes of the first byte in `this->queue_`, relative to the start of the + * stream. + */ + batt::Watch consume_pos_; + + /** \brief One past the offset in bytes of last byte in `this->queue_`, relative to the start of + * the stream. This represents the stream offset of the _next_ slice of data that will be passed + * to this->commit. + */ + batt::Watch commit_pos_; + + /** \brief Set to true when `this->close()` is called. Used by `this->check_for_end_of_stream()` + * to detect when we are transitioning from "normal" (open) to "draining" + * (closed-for-write/commit) to "drained" (fully-closed/end-of-stream) states. + */ + std::atomic end_of_stream_{false}; +}; + +} //namespace llfs + +#endif // LLFS_IORING_STREAM_BUFFER_HPP diff --git a/src/llfs/ioring_stream_buffer.test.cpp b/src/llfs/ioring_stream_buffer.test.cpp new file mode 100644 index 0000000..f4e07d7 --- /dev/null +++ b/src/llfs/ioring_stream_buffer.test.cpp @@ -0,0 +1,399 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#include +// +#include + +#include + +#include +#include + +#include +#include + +namespace { + +using namespace llfs::int_types; + +using llfs::testing::IoringStreamBufferClosedEmptyTest; +using llfs::testing::IoringStreamBufferEmptyTest; +using llfs::testing::IoringStreamBufferFullTest; +using llfs::testing::IoringStreamBufferNotEmptyTest; +using llfs::testing::IoringStreamBufferTest; + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferTest, CreateDestroy) +{ + llfs::IoRingStreamBuffer stream_buffer{*this->buffer_pool_}; + + EXPECT_EQ(stream_buffer.buffer_size(), this->buffer_size_); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferTest, EmptyFragment) +{ + llfs::IoRingStreamBuffer::Fragment fragment; + + EXPECT_TRUE(fragment.empty()); + EXPECT_EQ(fragment.view_count(), 0u); + EXPECT_EQ(fragment.byte_size(), 0u); + + std::variant> storage; + llfs::ConstBuffer gathered = fragment.gather(storage); + + EXPECT_EQ(gathered.size(), 0u); + + llfs::IoRingStreamBuffer::Fragment fragment2 = fragment.pop(1); + + EXPECT_TRUE(fragment.empty()); + EXPECT_EQ(fragment.view_count(), 0u); + EXPECT_EQ(fragment.byte_size(), 0u); + EXPECT_TRUE(fragment2.empty()); + EXPECT_EQ(fragment2.view_count(), 0u); + EXPECT_EQ(fragment2.byte_size(), 0u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferNotEmptyTest, ConsumeSomeOkNoBlock) +{ + llfs::StatusOr fragment = + this->stream_buffer_->consume_some(); + + ASSERT_TRUE(fragment.ok()) << BATT_INSPECT(fragment.status()); + EXPECT_GT(fragment->byte_size(), 0u); + EXPECT_EQ(fragment->byte_size(), this->unverified_data_.size()); + + this->verify_data(*fragment); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferFullTest, ConsumeSomeOkNoBlockFull) +{ + llfs::StatusOr fragment = + this->stream_buffer_->consume_some(); + + ASSERT_TRUE(fragment.ok()) << BATT_INSPECT(fragment.status()); + EXPECT_GT(fragment->byte_size(), 0u); + EXPECT_EQ(fragment->byte_size(), this->buffer_size_ * 2); + EXPECT_EQ(fragment->byte_size(), this->unverified_data_.size()); + + this->verify_data(*fragment); + + EXPECT_EQ(this->unverified_data_.size(), 0u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferNotEmptyTest, ConsumeSomeOkNoBlockAfterClosed) +{ + this->stream_buffer_->close(); + + llfs::StatusOr fragment = + this->stream_buffer_->consume_some(); + + ASSERT_TRUE(fragment.ok()) << BATT_INSPECT(fragment.status()); + EXPECT_GT(fragment->byte_size(), 0u); + EXPECT_EQ(fragment->byte_size(), this->unverified_data_.size()); + + this->verify_data(*fragment); + + llfs::StatusOr fragment2 = + this->stream_buffer_->consume_some(); + + EXPECT_EQ(fragment2.status(), batt::StatusCode::kClosed); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferClosedEmptyTest, ConsumeSomeClosedNoBlock) +{ + llfs::StatusOr fragment = + this->stream_buffer_->consume_some(); + + EXPECT_EQ(fragment.status(), batt::StatusCode::kClosed); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferEmptyTest, ConsumeSomeWaitClosed) +{ + batt::StatusOr fragment; + + this->run_blocking_test( + //----- --- -- - - - - + /*blocked_op=*/ + [&] { + fragment = this->stream_buffer_->consume_some(); + }, + //----- --- -- - - - - + /*unblock_op=*/ + [&] { + EXPECT_EQ(fragment.status(), batt::StatusCode::kUnknown); + + this->stream_buffer_->close(); + }); + + EXPECT_EQ(fragment.status(), batt::StatusCode::kClosed); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferEmptyTest, ConsumeSomeWaitOk) +{ + batt::StatusOr fragment; + + this->run_blocking_test( + //----- --- -- - - - - + /*blocked_op=*/ + [&] { + fragment = this->stream_buffer_->consume_some(); + }, + //----- --- -- - - - - + /*unblock_op=*/ + [&] { + EXPECT_EQ(fragment.status(), batt::StatusCode::kUnknown); + + LLFS_VLOG(1) << "committing some test data"; + + this->commit_test_data(10); + }); + + ASSERT_TRUE(fragment.ok()); + EXPECT_EQ(fragment->byte_size(), 10); + + this->verify_data(*fragment); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferEmptyTest, ConsumeSomeWaitOkThenClose) +{ + batt::StatusOr fragment; + + this->run_blocking_test( + //----- --- -- - - - - + /*blocked_op=*/ + [&] { + fragment = this->stream_buffer_->consume_some(); + }, + //----- --- -- - - - - + /*unblock_op=*/ + [&] { + EXPECT_EQ(fragment.status(), batt::StatusCode::kUnknown); + + LLFS_VLOG(1) << "committing some test data"; + + this->commit_test_data(10); + this->stream_buffer_->close(); + + EXPECT_EQ(fragment.status(), batt::StatusCode::kUnknown); + }); + + ASSERT_TRUE(fragment.ok()); + EXPECT_EQ(fragment->byte_size(), 10); + + this->verify_data(*fragment); + + //+++++++++++-+-+--+----- --- -- - - - - + + fragment = batt::Status{batt::StatusCode::kUnknown}; + fragment = this->stream_buffer_->consume_some(); + + EXPECT_EQ(fragment.status(), batt::StatusCode::kClosed); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferFullTest, ConsumeRangesInOrder) +{ + constexpr usize kSizePerConsume = 8; + + const usize n_to_consume = this->stream_buffer_->size(); + const usize n_iter = n_to_consume / kSizePerConsume; + + BATT_CHECK_EQ(n_iter * kSizePerConsume, n_to_consume); + + for (usize i = 0; i < n_iter; ++i) { + if (i == n_iter / 2) { + this->stream_buffer_->close(); + } + + batt::StatusOr fragment = this->stream_buffer_->consume( + /*start=*/i * kSizePerConsume, // + /*end=*/(i + 1) * kSizePerConsume); + + ASSERT_TRUE(fragment.ok()) << BATT_INSPECT(fragment.status()); + + this->verify_data(*fragment); + } + + batt::StatusOr fragment = + this->stream_buffer_->consume(n_to_consume, n_to_consume + 1); + + EXPECT_EQ(fragment.status(), batt::StatusCode::kClosed); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferNotEmptyTest, ConsumeRangeWaitOk) +{ + batt::StatusOr fragment1; + batt::StatusOr fragment2; + + const i64 fragment1_start = 0; + const i64 fragment1_end = this->stream_buffer_->size() / 2; + + const i64 fragment2_start = fragment1_end; + const i64 fragment2_end = this->stream_buffer_->size(); + + this->run_blocking_test( + //----- --- -- - - - - + /*blocked_op=*/ + [&] { + fragment2 = this->stream_buffer_->consume(fragment2_start, fragment2_end); + }, + //----- --- -- - - - - + /*unblock_op=*/ + [&] { + EXPECT_EQ(fragment1.status(), batt::StatusCode::kUnknown); + EXPECT_EQ(fragment2.status(), batt::StatusCode::kUnknown); + + fragment1 = this->stream_buffer_->consume(fragment1_start, fragment1_end); + }); + + ASSERT_TRUE(fragment1.ok()) << BATT_INSPECT(fragment1.status()); + ASSERT_TRUE(fragment2.ok()) << BATT_INSPECT(fragment2.status()); + + this->verify_data(*fragment1); + this->verify_data(*fragment2); + + EXPECT_EQ(this->unverified_data_.size(), 0u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferNotEmptyTest, ConsumeRangeWaitClosed) +{ + batt::StatusOr fragment1; + batt::StatusOr fragment2; + + const i64 fragment1_start = 0; + const i64 fragment1_end = this->stream_buffer_->size(); + + const i64 fragment2_start = this->stream_buffer_->size(); + const i64 fragment2_end = this->stream_buffer_->size() + 1; + + this->run_blocking_test( + //----- --- -- - - - - + /*blocked_op=*/ + [&] { + fragment2 = this->stream_buffer_->consume(fragment2_start, fragment2_end); + }, + //----- --- -- - - - - + /*unblock_op=*/ + [&] { + EXPECT_EQ(fragment1.status(), batt::StatusCode::kUnknown); + EXPECT_EQ(fragment2.status(), batt::StatusCode::kUnknown); + + this->stream_buffer_->close(); + fragment1 = this->stream_buffer_->consume(fragment1_start, fragment1_end); + }); + + EXPECT_EQ(fragment2.status(), batt::StatusCode::kClosed); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferNotEmptyTest, PrepareAfterClose) +{ + this->stream_buffer_->close(); + + batt::StatusOr view = this->stream_buffer_->prepare(); + + EXPECT_EQ(view.status(), batt::StatusCode::kClosed); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferFullTest, PrepareWaitOk1) +{ + batt::StatusOr view; + + this->run_blocking_test( + //----- --- -- - - - - + /*blocked_op=*/ + [&] { + view = this->stream_buffer_->prepare(); + }, + //----- --- -- - - - - + /*unblock_op=*/ + [&] { + EXPECT_EQ(view.status(), batt::StatusCode::kUnknown); + + batt::StatusOr fragment = + this->stream_buffer_->consume_some(); + + ASSERT_TRUE(fragment.ok()) << BATT_INSPECT(fragment.status()); + + this->verify_data(*fragment); + + EXPECT_EQ(this->unverified_data_.size(), 0u); + }); + + EXPECT_TRUE(view.ok()) << BATT_INSPECT(view.ok()); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +TEST_F(IoringStreamBufferFullTest, PrepareWaitOk2) +{ + batt::StatusOr fragment; + batt::StatusOr view; + + this->run_blocking_test( + //----- --- -- - - - - + /*blocked_op=*/ + [&] { + fragment = this->stream_buffer_->consume_some(); + + ASSERT_TRUE(fragment.ok()) << BATT_INSPECT(fragment.status()); + EXPECT_EQ(fragment->byte_size(), this->stream_buffer_->max_size()); + + this->verify_data(*fragment); + + EXPECT_EQ(this->unverified_data_.size(), 0u); + EXPECT_EQ(this->stream_buffer_->size(), 0u); + + LLFS_VLOG(1) << "About to call prepare (this should block)..."; + + view = this->stream_buffer_->prepare(); + }, + //----- --- -- - - - - + /*unblock_op=*/ + [&] { + EXPECT_EQ(view.status(), batt::StatusCode::kUnknown); + EXPECT_TRUE(fragment.ok()); + EXPECT_FALSE(this->prepared_view_); + + LLFS_VLOG(1) << "Setting fragment to unknown status (this should free the Fragment)"; + + fragment = batt::StatusCode::kUnknown; + + LLFS_VLOG(1) << "Leaving the unblock op..."; + }); + + EXPECT_TRUE(view.ok()) << BATT_INSPECT(view.ok()); +} + +} // namespace diff --git a/src/llfs/ioring_stream_buffer.test.hpp b/src/llfs/ioring_stream_buffer.test.hpp new file mode 100644 index 0000000..78bd82f --- /dev/null +++ b/src/llfs/ioring_stream_buffer.test.hpp @@ -0,0 +1,301 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#pragma once +#ifndef LLFS_IORING_STREAM_BUFFER_TEST_HPP +#define LLFS_IORING_STREAM_BUFFER_TEST_HPP + +#include +#include + +#include + +#include +#include + +#include + +namespace llfs { +namespace testing { + +constexpr usize kTestQueueDepth = 16; +constexpr usize kTestNumThreads = 1; +constexpr usize kTestBufferCount = 4; +constexpr usize kTestBufferSize = 64; +constexpr usize kTestDataSize = 4096; +constexpr u32 kTestDataRandomSeed = 0; + +//=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- +// +class IoringStreamBufferTest : public ::testing::Test +{ + public: + using Self = IoringStreamBufferTest; + + //+++++++++++-+-+--+----- --- -- - - - - + + static const std::array& test_data() + { + static std::array data; + + [[maybe_unused]] const bool initialized = [] { + std::default_random_engine rng{kTestDataRandomSeed}; + std::uniform_int_distribution pick_byte{0x00, 0xff}; + + for (u8& value : data) { + value = pick_byte(rng); + } + + return true; + }(); + + return data; + } + + //+++++++++++-+-+--+----- --- -- - - - - + + void SetUp() override + { + batt::enable_dump_tasks(); + + StatusOr status_or_scoped_io_ring = + ScopedIoRing::make_new(MaxQueueDepth{kTestQueueDepth}, ThreadPoolSize{kTestNumThreads}); + + ASSERT_TRUE(status_or_scoped_io_ring.ok()) << BATT_INSPECT(status_or_scoped_io_ring.status()); + + this->scoped_io_ring_ = std::move(*status_or_scoped_io_ring); + this->io_ring_ = std::addressof(this->scoped_io_ring_.get_io_ring()); + + StatusOr> status_or_buffer_pool = + IoRingBufferPool::make_new(*this->io_ring_, this->buffer_count_, this->buffer_size_); + + ASSERT_TRUE(status_or_buffer_pool.ok()) << BATT_INSPECT(status_or_buffer_pool.status()); + + this->buffer_pool_ = std::move(*status_or_buffer_pool); + } + + void data_written(usize byte_count) + { + this->unwritten_data_ += byte_count; + this->unverified_data_ = ConstBuffer{ + this->unverified_data_.data(), + this->unverified_data_.size() + byte_count, + }; + } + + void verify_data(const ConstBuffer& to_verify) + { + ASSERT_LE(to_verify.size(), this->unverified_data_.size()); + ASSERT_EQ(0, std::memcmp(to_verify.data(), this->unverified_data_.data(), to_verify.size())); + + this->unverified_data_ += to_verify.size(); + } + + void verify_data(const IoRingStreamBuffer::Fragment& fragment) + { + std::variant> storage; + llfs::ConstBuffer to_verify = fragment.gather(storage); + + ASSERT_EQ(to_verify.size(), fragment.byte_size()); + + this->verify_data(to_verify); + } + + void run_blocking_test(const std::function& blocked_op, + const std::function& unblock_op) + { + boost::asio::io_context io; + + LLFS_VLOG(1) << "test entered"; + + bool op_entered = false; + + batt::Task blocked_op_task{ + io.get_executor(), + [&] { + LLFS_VLOG(1) << "inside blocked_op_task"; + op_entered = true; + blocked_op(); + + LLFS_VLOG(1) << "leaving blocked_op_task"; + }, + "blocked_op_task", + }; + + //+++++++++++-+-+--+----- --- -- - - - - + LLFS_VLOG(1) << "calling poll the first time"; + + io.poll(); + io.reset(); + + EXPECT_TRUE(op_entered); + EXPECT_FALSE(blocked_op_task.try_join()); + + //+++++++++++-+-+--+----- --- -- - - - - + LLFS_VLOG(1) << "unblocking the operation..."; + + unblock_op(); + + //+++++++++++-+-+--+----- --- -- - - - - + LLFS_VLOG(1) << "calling poll the second time"; + + io.poll(); + io.reset(); + + //+++++++++++-+-+--+----- --- -- - - - - + LLFS_VLOG(1) << "joining the consumer task"; + + BATT_CHECK(blocked_op_task.try_join()); + } + + //+++++++++++-+-+--+----- --- -- - - - - + + ScopedIoRing scoped_io_ring_; + + const IoRing* io_ring_ = nullptr; + + BufferCount buffer_count_{kTestBufferCount}; + + BufferSize buffer_size_{kTestBufferSize}; + + std::unique_ptr buffer_pool_; + + ConstBuffer unwritten_data_{Self::test_data().data(), Self::test_data().size()}; + + ConstBuffer unverified_data_{Self::test_data().data(), 0u}; +}; + +//=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- +// +class IoringStreamBufferEmptyTest : public IoringStreamBufferTest +{ + public: + using Super = IoringStreamBufferTest; + using Self = IoringStreamBufferEmptyTest; + + //+++++++++++-+-+--+----- --- -- - - - - + + void SetUp() override + { + ASSERT_NO_FATAL_FAILURE(Super::SetUp()); + ASSERT_NE(this->buffer_pool_, nullptr); + + this->stream_buffer_.emplace(*this->buffer_pool_); + + EXPECT_EQ(this->stream_buffer_->size(), 0u); + } + + void commit_test_data(usize n_to_commit) + { + BATT_CHECK(this->stream_buffer_); + BATT_CHECK_LE(n_to_commit, this->unwritten_data_.size()); + BATT_CHECK_LE(n_to_commit + this->stream_buffer_->size(), this->stream_buffer_->max_size()); + + while (n_to_commit > 0) { + // If there is no prepared buffer we can use, then allocate one now. + // + if (!this->prepared_view_) { + StatusOr view = this->stream_buffer_->prepare(); + + ASSERT_TRUE(view.ok()) << BATT_INSPECT(view.status()); + + this->prepared_view_.emplace(std::move(*view)); + } + + // Copy test data into the prepared buffer. + // + const usize n_to_copy = std::min(this->prepared_view_->size(), n_to_commit); + std::memcpy(this->prepared_view_->data(), this->unwritten_data_.data(), n_to_copy); + + // Commit the data. + // + this->stream_buffer_->commit(this->prepared_view_->split(n_to_copy)); + + // Update steam states. + // + n_to_commit -= n_to_copy; + this->data_written(n_to_copy); + if (this->prepared_view_->empty()) { + this->prepared_view_ = None; + } + } + } + + //+++++++++++-+-+--+----- --- -- - - - - + + Optional stream_buffer_; + + Optional prepared_view_; +}; + +//=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- +// +class IoringStreamBufferClosedEmptyTest : public IoringStreamBufferEmptyTest +{ + public: + using Super = IoringStreamBufferEmptyTest; + using Self = IoringStreamBufferClosedEmptyTest; + + //+++++++++++-+-+--+----- --- -- - - - - + + void SetUp() override + { + ASSERT_NO_FATAL_FAILURE(Super::SetUp()); + + this->stream_buffer_->close(); + } +}; + +//=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- +// +class IoringStreamBufferNotEmptyTest : public IoringStreamBufferEmptyTest +{ + public: + using Super = IoringStreamBufferEmptyTest; + using Self = IoringStreamBufferNotEmptyTest; + + //+++++++++++-+-+--+----- --- -- - - - - + + void SetUp() override + { + ASSERT_NO_FATAL_FAILURE(Super::SetUp()); + + // Commit the test data in two steps, to exercise code paths that merge buffer views inside a + // fragment. + // + this->commit_test_data(this->buffer_size_ - 1); + this->commit_test_data(1); + + EXPECT_EQ(this->stream_buffer_->size(), this->buffer_size_); + } +}; + +//=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- +// +class IoringStreamBufferFullTest : public IoringStreamBufferEmptyTest +{ + public: + using Super = IoringStreamBufferEmptyTest; + using Self = IoringStreamBufferFullTest; + + //+++++++++++-+-+--+----- --- -- - - - - + + void SetUp() override + { + ASSERT_NO_FATAL_FAILURE(Super::SetUp()); + + this->commit_test_data(this->stream_buffer_->max_size()); + + EXPECT_EQ(this->stream_buffer_->size(), this->stream_buffer_->max_size()); + } +}; + +} //namespace testing +} //namespace llfs + +#endif // LLFS_IORING_STREAM_BUFFER_TEST_HPP diff --git a/src/llfs/logging.hpp b/src/llfs/logging.hpp index 8d74cb6..b8de2d8 100644 --- a/src/llfs/logging.hpp +++ b/src/llfs/logging.hpp @@ -94,6 +94,7 @@ struct NullStream { #define LLFS_LOG_WARNING_FIRST_N(n) LLFS_LOG_NO_OUTPUT() #define LLFS_LOG_INFO_FIRST_N(n) LLFS_LOG_NO_OUTPUT() #define LLFS_VLOG_EVERY_N(verbosity, n) LLFS_LOG_NO_OUTPUT() +#define LLFS_VLOG_IF(verbosity, condition) LLFS_LOG_NO_OUTPUT() //=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- #elif defined(LLFS_USE_GLOG) @@ -110,6 +111,7 @@ struct NullStream { #define LLFS_LOG_WARNING_FIRST_N(n) LOG_FIRST_N(WARNING, (n)) #define LLFS_LOG_INFO_FIRST_N(n) LOG_FIRST_N(INFO, (n)) #define LLFS_VLOG_EVERY_N(verbosity, n) VLOG_EVERY_N((verbosity), (n)) +#define LLFS_VLOG_IF(verbosity, condition) VLOG_IF((verbosity), (condition)) //=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- #elif defined(LLFS_USE_BOOST_LOG)