Skip to content

Commit

Permalink
Add Volume atomic multi-slot appends. (#149)
Browse files Browse the repository at this point in the history
* Add MultiAppend to SlotWriter; add Buffer api for varints.

* Add Volume::MultiAppend.

* wip - multi-appends

* wip - sketch of special zero-tokens to indicate atomic range.

* Add unread_begin(), read_token() to DataReader.

* Allow moving IoRing after creating an IoRing::File.

* Create simulated log device storage (low-level).

* Add low-level log device mode to StorageSimulation (untested).

* Fix all remaining bugs in low-level LogDevice simulation.

* Refactor test env config code into llfs::testing::TestConfig.

* Remove excess logging in tests.

* Fix a few more IoRingLogDriver bugs.

* Add comments and clean up code for MR.

* Code review feedback.

* Add missing `Death` keyword to the names of tests that include EXPECT_DEATH.

* Changes in response to review feedback.

* CR feedback.

* Fix .clang-format after Xcode upgrade; CR feedback.

* Use shared_ptr instead of raw ptr in SimulatedLogDeviceStorage (CR feedback).
  • Loading branch information
tonyastolfi authored May 24, 2024
1 parent f03e41c commit 23ecf73
Show file tree
Hide file tree
Showing 60 changed files with 2,998 additions and 450 deletions.
2 changes: 0 additions & 2 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ AllowShortFunctionsOnASingleLine: None
AllowShortIfStatementsOnASingleLine: false
AllowShortLambdasOnASingleLine: false
AllowShortLoopsOnASingleLine: false
AllowAllConstructorInitializersOnNextLine: false
BreakBeforeBraces: Custom
DerivePointerAlignment: false
FixNamespaceComments: true
IndentCaseLabels: false
IndentPPDirectives: None
IndentWidth: 2
PointerAlignment: Left
Expand Down
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def configure(self):


def requirements(self):
self.requires("batteries/0.52.1", **VISIBLE)
self.requires("batteries/0.52.4", **VISIBLE)
self.requires("boost/1.83.0", **VISIBLE)
self.requires("cli11/2.3.2", **VISIBLE)
self.requires("glog/0.6.0", **VISIBLE)
Expand Down
30 changes: 20 additions & 10 deletions src/llfs/basic_log_storage_driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ class BasicLogStorageDriver
//
Status set_trim_pos(slot_offset_type trim_pos)
{
return impl_.set_trim_pos(trim_pos);
return this->impl_.set_trim_pos(trim_pos);
}

slot_offset_type get_trim_pos() const
{
return impl_.get_trim_pos();
return this->impl_.get_trim_pos();
}

StatusOr<slot_offset_type> await_trim_pos(slot_offset_type trim_pos)
{
return impl_.await_trim_pos(trim_pos);
return this->impl_.await_trim_pos(trim_pos);
}

//----
Expand All @@ -63,41 +63,51 @@ class BasicLogStorageDriver

slot_offset_type get_flush_pos() const
{
return impl_.get_flush_pos();
return this->impl_.get_flush_pos();
}

StatusOr<slot_offset_type> await_flush_pos(slot_offset_type flush_pos)
{
return impl_.await_flush_pos(flush_pos);
return this->impl_.await_flush_pos(flush_pos);
}

//----

Status set_commit_pos(slot_offset_type commit_pos)
{
return impl_.set_commit_pos(commit_pos);
return this->impl_.set_commit_pos(commit_pos);
}

slot_offset_type get_commit_pos() const
{
return impl_.get_commit_pos();
return this->impl_.get_commit_pos();
}

StatusOr<slot_offset_type> await_commit_pos(slot_offset_type commit_pos)
{
return impl_.await_commit_pos(commit_pos);
return this->impl_.await_commit_pos(commit_pos);
}

//----

Status open()
{
return impl_.open();
return this->impl_.open();
}

Status close()
{
return impl_.close();
return this->impl_.close();
}

void halt()
{
this->impl_.halt();
}

void join()
{
this->impl_.join();
}
//
//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand Down
13 changes: 11 additions & 2 deletions src/llfs/basic_ring_buffer_log_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <llfs/basic_log_storage_driver.hpp>
#include <llfs/basic_log_storage_reader.hpp>
#include <llfs/log_device.hpp>
#include <llfs/log_storage_driver_context.hpp>
#include <llfs/ring_buffer.hpp>

namespace llfs {
Expand Down Expand Up @@ -62,6 +63,16 @@ class BasicRingBufferLogDevice

Status close() override;

void halt() override
{
this->driver_.halt();
}

void join() override
{
this->driver_.join();
}

Status sync(LogReadMode mode, SlotUpperBoundAt event) override;

driver_type& driver()
Expand Down Expand Up @@ -222,8 +233,6 @@ class BasicRingBufferLogDevice<Impl>::WriterImpl : public LogDevice::Writer

StatusOr<MutableBuffer> prepare(usize byte_count, usize head_room) override
{
BATT_CHECK(!this->prepared_offset_);

if (this->device_->closed_.load()) {
return ::llfs::make_status(StatusCode::kPrepareFailedLogClosed);
}
Expand Down
24 changes: 24 additions & 0 deletions src/llfs/data_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <llfs/logging.hpp>

#include <batteries/assert.hpp>
#include <batteries/slice.hpp>
#include <batteries/type_traits.hpp>

#include <boost/range/iterator_range.hpp>
Expand Down Expand Up @@ -98,6 +99,22 @@ class DataReader
at_end_ = false;
}

/** \brief Tries to consume a prefix of the remaining input that exactly matches `token`.
*
* \return true iff the unread data begins with token and this prefix is successfully consumed.
*/
bool read_token(const batt::Slice<const u8>& token) noexcept
{
if (this->bytes_available() < token.size()) {
return false;
}
if (std::memcmp(this->unread_.begin(), token.begin(), token.size()) != 0) {
return false;
}
this->unread_.advance_begin(token.size());
return true;
}

template <typename T>
[[nodiscard]] const T* read_record(batt::StaticType<T> = {})
{
Expand Down Expand Up @@ -240,6 +257,13 @@ class DataReader
return ConstBuffer{this->unread_.begin(), this->unread_.size()};
}

/** \brief Returns a pointer to the start of unread data.
*/
const u8* unread_begin() const
{
return this->unread_.begin();
}

private:
bool at_end_ = false;
ConstBuffer buffer_;
Expand Down
17 changes: 15 additions & 2 deletions src/llfs/file_log_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,24 @@ StatusOr<slot_offset_type> FileLogDriver::await_commit_pos(slot_offset_type min_
//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
Status FileLogDriver::close()
{
this->halt();
this->join();

return OkStatus();
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
void FileLogDriver::halt()
{
this->shared_state_.halt();
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
void FileLogDriver::join()
{
if (this->trim_task_) {
this->trim_task_->join();
this->trim_task_ = None;
Expand All @@ -352,8 +367,6 @@ Status FileLogDriver::close()
this->flush_task_->join();
this->flush_task_ = None;
}

return OkStatus();
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand Down
4 changes: 4 additions & 0 deletions src/llfs/file_log_driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ class FileLogDriver

Status close();

void halt();

void join();

private:
// See <llfs/file_log_driver/concurrent_shared_state.cpp>
//
Expand Down
12 changes: 7 additions & 5 deletions src/llfs/ioring_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ namespace llfs {

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
IoRing::File::File(const IoRing& io_ring, int fd) noexcept : io_ring_{&io_ring}, fd_{fd}
IoRing::File::File(const IoRing& io_ring, int fd) noexcept
: io_ring_impl_{io_ring.impl_.get()}
, fd_{fd}
{
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
IoRing::File::File(File&& that) noexcept : io_ring_{that.io_ring_}, fd_{that.fd_}
IoRing::File::File(File&& that) noexcept : io_ring_impl_{that.io_ring_impl_}, fd_{that.fd_}
{
that.fd_ = -1;
that.registered_fd_ = -1;
Expand All @@ -37,7 +39,7 @@ auto IoRing::File::operator=(File&& that) noexcept -> File&
{
File copy{std::move(that)};

std::swap(this->io_ring_, copy.io_ring_);
std::swap(this->io_ring_impl_, copy.io_ring_impl_);
std::swap(this->fd_, copy.fd_);
std::swap(this->registered_fd_, copy.registered_fd_);

Expand Down Expand Up @@ -155,7 +157,7 @@ Status IoRing::File::register_fd()
return OkStatus();
}

StatusOr<i32> rfd = this->io_ring_->register_fd(this->fd_);
StatusOr<i32> rfd = this->io_ring_impl_->register_fd(this->fd_);
if (!rfd.ok()) {
LLFS_LOG_ERROR() << "register_fd failed! " << BATT_INSPECT(rfd.status());
}
Expand All @@ -174,7 +176,7 @@ Status IoRing::File::unregister_fd()
return OkStatus();
}

Status status = this->io_ring_->unregister_fd(this->registered_fd_);
Status status = this->io_ring_impl_->unregister_fd(this->registered_fd_);
BATT_REQUIRE_OK(status);

this->registered_fd_ = -1;
Expand Down
54 changes: 27 additions & 27 deletions src/llfs/ioring_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ class IoRing::File

~File() noexcept;

const IoRing& get_io_ring() const
IoRing::Impl* get_io_ring_impl() const
{
return *this->io_ring_;
return this->io_ring_impl_;
}

// Returns the OS-native file descriptor currently owned by this object. Returns -1 if no file is
Expand Down Expand Up @@ -158,7 +158,7 @@ class IoRing::File
}

private:
const IoRing* io_ring_;
IoRing::Impl* io_ring_impl_ = nullptr;
int fd_ = -1;
int registered_fd_ = -1;
bool raw_io_ = true;
Expand All @@ -173,16 +173,16 @@ inline void IoRing::File::async_read_some(i64 offset, MutableBufferSequence&& bu
Handler&& handler)
{
LLFS_DVLOG(1) << "async_read_some(mulitple buffers)";
this->io_ring_->submit(BATT_FORWARD(buffers), BATT_FORWARD(handler),
[offset, this](struct io_uring_sqe* sqe, auto& op) {
if (this->registered_fd_ == -1) {
io_uring_prep_readv(sqe, this->fd_, op.iov_, op.iov_count_, offset);
} else {
io_uring_prep_readv(sqe, this->registered_fd_, op.iov_, op.iov_count_,
offset);
sqe->flags |= IOSQE_FIXED_FILE;
}
});
this->io_ring_impl_->submit(
BATT_FORWARD(buffers), BATT_FORWARD(handler),
[offset, this](struct io_uring_sqe* sqe, auto& op) {
if (this->registered_fd_ == -1) {
io_uring_prep_readv(sqe, this->fd_, op.iov_, op.iov_count_, offset);
} else {
io_uring_prep_readv(sqe, this->registered_fd_, op.iov_, op.iov_count_, offset);
sqe->flags |= IOSQE_FIXED_FILE;
}
});
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand All @@ -194,7 +194,7 @@ inline void IoRing::File::async_read_some(i64 offset, const MutableBuffer& buffe
static const std::vector<MutableBuffer> empty;

LLFS_DVLOG(1) << "async_read_some(single buffer)";
this->io_ring_->submit(
this->io_ring_impl_->submit(
empty, BATT_FORWARD(handler),
[&buffer, offset, this](struct io_uring_sqe* sqe, auto& /*op*/) {
if (this->registered_fd_ == -1) {
Expand All @@ -217,7 +217,7 @@ inline void IoRing::File::async_read_some_fixed(i64 offset, const MutableBuffer&
static const std::vector<MutableBuffer> empty;

LLFS_DVLOG(1) << "async_read_some(single buffer)";
this->io_ring_->submit(
this->io_ring_impl_->submit(
empty, BATT_FORWARD(handler),
[&buffer, offset, buf_index, this](struct io_uring_sqe* sqe, auto& /*op*/) {
if (this->registered_fd_ == -1) {
Expand All @@ -238,16 +238,16 @@ template <typename ConstBufferSequence, typename Handler, typename>
inline void IoRing::File::async_write_some(i64 offset, ConstBufferSequence&& buffers,
Handler&& handler)
{
this->io_ring_->submit(BATT_FORWARD(buffers), BATT_FORWARD(handler),
[offset, this](struct io_uring_sqe* sqe, auto& op) {
if (this->registered_fd_ == -1) {
io_uring_prep_writev(sqe, this->fd_, op.iov_, op.iov_count_, offset);
} else {
io_uring_prep_writev(sqe, this->registered_fd_, op.iov_, op.iov_count_,
offset);
sqe->flags |= IOSQE_FIXED_FILE;
}
});
this->io_ring_impl_->submit(
BATT_FORWARD(buffers), BATT_FORWARD(handler),
[offset, this](struct io_uring_sqe* sqe, auto& op) {
if (this->registered_fd_ == -1) {
io_uring_prep_writev(sqe, this->fd_, op.iov_, op.iov_count_, offset);
} else {
io_uring_prep_writev(sqe, this->registered_fd_, op.iov_, op.iov_count_, offset);
sqe->flags |= IOSQE_FIXED_FILE;
}
});
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand All @@ -257,7 +257,7 @@ inline void IoRing::File::async_write_some(i64 offset, const ConstBuffer& buffer
{
static const std::vector<ConstBuffer> empty;

this->io_ring_->submit(
this->io_ring_impl_->submit(
empty, BATT_FORWARD(handler),
[&buffer, offset, this](struct io_uring_sqe* sqe, auto& /*op*/) {
if (this->registered_fd_ == -1) {
Expand All @@ -277,7 +277,7 @@ inline void IoRing::File::async_write_some_fixed(i64 offset, const ConstBuffer&
{
static const std::vector<ConstBuffer> empty;

this->io_ring_->submit(
this->io_ring_impl_->submit(
empty, BATT_FORWARD(handler),
[&buffer, buf_index, offset, this](struct io_uring_sqe* sqe, auto& /*op*/) {
if (this->registered_fd_ == -1) {
Expand Down
Loading

0 comments on commit 23ecf73

Please sign in to comment.