diff --git a/CHANGELOG.md b/CHANGELOG.md index c8d3e79c..c5c22482 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,13 @@ Release notes ============= +Version 0.8.66 +-------------- + +IMPROVEMENT + +* Improve memory requirements by using vector I/O + Version 0.8.65 -------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index a11d4ef4..80ee470b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 2.8) set(APP_VERSION_MAJOR "0") set(APP_VERSION_MINOR "8") -set(APP_VERSION_PATCH "65") +set(APP_VERSION_PATCH "66") set(APP_VERSION "${APP_VERSION_MAJOR}.${APP_VERSION_MINOR}.${APP_VERSION_PATCH}") add_definitions(-DAKU_VERSION="${APP_VERSION}") diff --git a/README.md b/README.md index 257ae618..1cc9a24a 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ Features |Incremental backup |- |+ | |Clustering |- |+ | |Replication |- |+ | -|ARM support |- |+ | +|ARM support |+ |+ | |Windows support |- |+ | |Query language features |Current version|Future versions| @@ -51,7 +51,7 @@ Features |Aggregate series |+ |+ | |Merge & aggregate |+ |+ | |Group-aggregate |+ |+ | -|Group-aggregate & merge |- |+ | +|Group-aggregate & merge |+ |+ | |Join |+ |+ | |Join & merge |- |+ | |Join & group-aggregate |- |+ | @@ -75,6 +75,7 @@ are available via packagecloud: * Ubuntu 14.04 * Ubuntu 16.04 +* Ubuntu 18.04 * Debian Jessie * Debian Stretch * CentOS 7 @@ -87,5 +88,6 @@ Tools for monitoring Akumuli supports OpenTSDB telnet-style API for writing. This means that many collectors works with it without any trouble, for instance `netdata`, `collectd`, and `tcollector`. Grafana [datasource](https://github.com/akumuli/akumuli-datasource) plugin is availabe as well. +Akumuli can be used as a long-term storage for Prometheus using [akumuli-prometheus-adapter](https://github.com/akumuli/akumuli-prometheus-adapter). [Google group](https://groups.google.com/forum/#!forum/akumuli) diff --git a/libakumuli/storage2.cpp b/libakumuli/storage2.cpp index 953b7651..3d18f4b1 100644 --- a/libakumuli/storage2.cpp +++ b/libakumuli/storage2.cpp @@ -900,18 +900,18 @@ void dump_tree(std::ostream &stream, }; if (type == StackItemType::NORMAL || type == StackItemType::RECOVERY) { - std::shared_ptr block; + std::unique_ptr block; aku_Status status; - std::tie(status, block) = bstore->read_block(curr); + std::tie(status, block) = bstore->read_iovec_block(curr); if (status != AKU_SUCCESS) { stream << _tag("addr") << afmt(curr) << "" << std::endl; stream << _tag("fail") << StatusUtil::c_str(status) << "" << std::endl; continue; } - auto subtreeref = reinterpret_cast(block->get_cdata()); + auto subtreeref = block->get_cheader(); if (subtreeref->type == NBTreeBlockType::LEAF) { // Dump leaf node's content - NBTreeLeaf leaf(block); + IOVecLeaf leaf(std::move(block)); SubtreeRef const* ref = leaf.get_leafmeta(); stream << _tag("type") << "Leaf" << "\n"; stream << _tag("addr") << afmt(curr) << "\n"; @@ -940,19 +940,19 @@ void dump_tree(std::ostream &stream, stack.push(std::make_tuple(EMPTY_ADDR, indent + 1, StackItemType::CLOSE_NODE)); stack.push(std::make_tuple(prev, indent + 2, StackItemType::NORMAL)); stack.push(std::make_tuple(EMPTY_ADDR, indent + 1, StackItemType::OPEN_NODE)); - std::tie(status, block) = bstore->read_block(prev); + std::tie(status, block) = bstore->read_iovec_block(prev); if (status != AKU_SUCCESS) { // Block was deleted but it should be on the stack anyway break; } - NBTreeLeaf lnext(block); + IOVecLeaf lnext(std::move(block)); prev = lnext.get_prev_addr(); } stack.push(std::make_tuple(EMPTY_ADDR, indent, StackItemType::OPEN_FANOUT)); } } else { // Dump inner node's content and children - NBTreeSuperblock sblock(block); + IOVecSuperblock sblock(std::move(block)); SubtreeRef const* ref = sblock.get_sblockmeta(); stream << _tag("addr") << afmt(curr) << "\n"; stream << _tag("type") << "Superblock" << "\n"; @@ -981,12 +981,12 @@ void dump_tree(std::ostream &stream, stack.push(std::make_tuple(EMPTY_ADDR, indent + 1, StackItemType::CLOSE_NODE)); stack.push(std::make_tuple(prev, indent + 2, StackItemType::NORMAL)); stack.push(std::make_tuple(EMPTY_ADDR, indent + 1, StackItemType::OPEN_NODE)); - std::tie(status, block) = bstore->read_block(prev); + std::tie(status, block) = bstore->read_iovec_block(prev); if (status != AKU_SUCCESS) { // Block was deleted but it should be on the stack anyway break; } - NBTreeSuperblock sbnext(block); + IOVecSuperblock sbnext(std::move(block)); prev = sbnext.get_prev_addr(); } stack.push(std::make_tuple(EMPTY_ADDR, indent, StackItemType::OPEN_FANOUT)); diff --git a/libakumuli/storage_engine/blockstore.cpp b/libakumuli/storage_engine/blockstore.cpp index 16d4e2a5..5e579a51 100644 --- a/libakumuli/storage_engine/blockstore.cpp +++ b/libakumuli/storage_engine/blockstore.cpp @@ -100,55 +100,6 @@ BlockCache::PBlock BlockCache::loockup(LogicAddr addr) { } -Block::Block(LogicAddr addr, std::vector&& data) - : data_(std::move(data)) - , addr_(addr) - , zptr_(nullptr) -{ -} - -Block::Block(LogicAddr addr, const u8* ptr) - : addr_(addr) - , zptr_(ptr) -{ -} - -Block::Block() - : data_(static_cast(AKU_BLOCK_SIZE), 0) - , addr_(EMPTY_ADDR) - , zptr_(nullptr) -{ -} - -const u8* Block::get_data() const { - return zptr_ ? zptr_ : data_.data(); -} - -const u8* Block::get_cdata() const { - return zptr_ ? zptr_ : data_.data(); -} - -bool Block::is_readonly() const { - return zptr_ != nullptr || addr_ != EMPTY_ADDR; -} - -u8* Block::get_data() { - assert(is_readonly() == false); - return data_.data(); -} - -size_t Block::get_size() const { - return zptr_ ? AKU_BLOCK_SIZE : data_.size(); -} - -LogicAddr Block::get_addr() const { - return addr_; -} - -void Block::set_addr(LogicAddr addr) { - addr_ = addr; -} - FileStorage::FileStorage(std::shared_ptr meta) @@ -264,42 +215,20 @@ static LogicAddr make_logic(u32 gen, BlockAddr addr) { return static_cast(gen) << 32 | addr; } -std::tuple FileStorage::append_block(std::shared_ptr data) { - std::lock_guard guard(lock_); AKU_UNUSED(guard); - BlockAddr block_addr; - aku_Status status; - std::tie(status, block_addr) = volumes_[current_volume_]->append_block(data->get_data()); - if (status == AKU_EOVERFLOW) { - // transition to new/next volume - handle_volume_transition(); - std::tie(status, block_addr) = volumes_.at(current_volume_)->append_block(data->get_data()); - if (status != AKU_SUCCESS) { - return std::make_tuple(status, 0ull); - } - } - data->set_addr(block_addr); - status = meta_->set_nblocks(current_volume_, block_addr + 1); - if (status != AKU_SUCCESS) { - AKU_PANIC("Invalid BlockStore state, " + StatusUtil::str(status)); - } - dirty_[current_volume_]++; - return std::make_tuple(status, make_logic(current_gen_, block_addr)); -} - -std::tuple FileStorage::append_block(std::shared_ptr data) { +std::tuple FileStorage::append_block(IOVecBlock& data) { std::lock_guard guard(lock_); AKU_UNUSED(guard); BlockAddr block_addr; aku_Status status; - std::tie(status, block_addr) = volumes_[current_volume_]->append_block(data.get()); + std::tie(status, block_addr) = volumes_[current_volume_]->append_block(&data); if (status == AKU_EOVERFLOW) { // transition to new/next volume handle_volume_transition(); - std::tie(status, block_addr) = volumes_.at(current_volume_)->append_block(data.get()); + std::tie(status, block_addr) = volumes_.at(current_volume_)->append_block(&data); if (status != AKU_SUCCESS) { return std::make_tuple(status, 0ull); } } - data->set_addr(block_addr); + data.set_addr(block_addr); status = meta_->set_nblocks(current_volume_, block_addr + 1); if (status != AKU_SUCCESS) { AKU_PANIC("Invalid BlockStore state, " + StatusUtil::str(status)); @@ -427,45 +356,7 @@ bool FixedSizeFileStorage::exists(LogicAddr addr) const { return actual_gen == gen && vol < nblocks; } -std::tuple> FixedSizeFileStorage::read_block(LogicAddr addr) { - std::lock_guard guard(lock_); AKU_UNUSED(guard); - aku_Status status; - auto gen = extract_gen(addr); - auto vol = extract_vol(addr); - auto volix = gen % static_cast(volumes_.size()); - u32 actual_gen; - u32 nblocks; - std::tie(status, actual_gen) = meta_->get_generation(volix); - if (status != AKU_SUCCESS) { - return std::make_tuple(AKU_EBAD_ARG, std::unique_ptr()); - } - std::tie(status, nblocks) = meta_->get_nblocks(volix); - if (status != AKU_SUCCESS) { - return std::make_tuple(AKU_EBAD_ARG, std::unique_ptr()); - } - if (actual_gen != gen || vol >= nblocks) { - return std::make_tuple(AKU_EUNAVAILABLE, std::unique_ptr()); - } - // Try to use zero-copy if possible - const u8* mptr; - std::tie(status, mptr) = volumes_[volix]->read_block_zero_copy(vol); - if (status == AKU_SUCCESS) { - std::shared_ptr zblock = std::make_shared(addr, mptr); - return std::make_tuple(status, std::move(zblock)); - } else if (status == AKU_EUNAVAILABLE) { - // Fallback to copying if not possible - std::vector dest(AKU_BLOCK_SIZE, 0); - status = volumes_[volix]->read_block(vol, dest.data()); - if (status != AKU_SUCCESS) { - return std::make_tuple(status, std::unique_ptr()); - } - auto block = std::make_shared(addr, std::move(dest)); - return std::make_tuple(status, std::move(block)); - } - return std::make_tuple(status, std::unique_ptr()); -} - -std::tuple> FixedSizeFileStorage::read_iovec_block(LogicAddr addr) { +std::tuple> FixedSizeFileStorage::read_iovec_block(LogicAddr addr) { std::lock_guard guard(lock_); AKU_UNUSED(guard); aku_Status status; auto gen = extract_gen(addr); @@ -529,44 +420,7 @@ bool ExpandableFileStorage::exists(LogicAddr addr) const { return actual_gen == gen && vol < nblocks; } -std::tuple> ExpandableFileStorage::read_block(LogicAddr addr) { - std::lock_guard guard(lock_); AKU_UNUSED(guard); - aku_Status status; - auto gen = extract_gen(addr); - auto vol = extract_vol(addr); - u32 actual_gen; - u32 nblocks; - std::tie(status, actual_gen) = meta_->get_generation(gen); - if (status != AKU_SUCCESS) { - return std::make_tuple(AKU_EBAD_ARG, std::unique_ptr()); - } - std::tie(status, nblocks) = meta_->get_nblocks(gen); - if (status != AKU_SUCCESS) { - return std::make_tuple(AKU_EBAD_ARG, std::unique_ptr()); - } - if (actual_gen != gen || vol >= nblocks) { - return std::make_tuple(AKU_EUNAVAILABLE, std::unique_ptr()); - } - // Try to use zero-copy if possible - const u8* mptr; - std::tie(status, mptr) = volumes_[gen]->read_block_zero_copy(vol); - if (status == AKU_SUCCESS) { - std::shared_ptr zblock = std::make_shared(addr, mptr); - return std::make_tuple(status, std::move(zblock)); - } else if (status == AKU_EUNAVAILABLE) { - // Fallback to copying if not possible - std::vector dest(AKU_BLOCK_SIZE, 0); - status = volumes_[gen]->read_block(vol, dest.data()); - if (status != AKU_SUCCESS) { - return std::make_tuple(status, std::unique_ptr()); - } - auto block = std::make_shared(addr, std::move(dest)); - return std::make_tuple(status, std::move(block)); - } - return std::make_tuple(status, std::unique_ptr()); -} - -std::tuple> ExpandableFileStorage::read_iovec_block(LogicAddr addr) { +std::tuple> ExpandableFileStorage::read_iovec_block(LogicAddr addr) { std::lock_guard guard(lock_); AKU_UNUSED(guard); aku_Status status; auto gen = extract_gen(addr); @@ -586,7 +440,7 @@ std::tuple> ExpandableFileStorage::read_ } // Read the volume std::unique_ptr block; - std::tie(status, block) = volumes_[vol]->read_block(vol); + std::tie(status, block) = volumes_[gen]->read_block(vol); if (status == AKU_SUCCESS) { return std::make_tuple(status, std::move(block)); } @@ -676,30 +530,7 @@ u32 MemStore::checksum(const IOVecBlock& block, size_t offset , size_t size) con return crc32; } -std::tuple> MemStore::read_block(LogicAddr addr) { - addr -= MEMSTORE_BASE; - std::lock_guard guard(lock_); AKU_UNUSED(guard); - std::shared_ptr block; - u32 offset = static_cast(AKU_BLOCK_SIZE * addr); - if (addr < removed_pos_) { - return std::make_tuple(AKU_EUNAVAILABLE, block); - } - if (buffer_.size() < (offset + AKU_BLOCK_SIZE)) { - return std::make_tuple(AKU_EBAD_ARG, block); - } - std::vector data; - data.reserve(AKU_BLOCK_SIZE); - auto begin = buffer_.begin() + offset; - auto end = begin + AKU_BLOCK_SIZE; - std::copy(begin, end, std::back_inserter(data)); - block.reset(new Block(addr + MEMSTORE_BASE, std::move(data))); - if (read_callback_) { - read_callback_(addr); - } - return std::make_tuple(AKU_SUCCESS, block); -} - -std::tuple> MemStore::read_iovec_block(LogicAddr addr) { +std::tuple> MemStore::read_iovec_block(LogicAddr addr) { addr -= MEMSTORE_BASE; std::lock_guard guard(lock_); AKU_UNUSED(guard); u32 offset = static_cast(AKU_BLOCK_SIZE * addr); @@ -722,24 +553,11 @@ std::tuple> MemStore::read_iovec_block(L return std::make_tuple(AKU_SUCCESS, std::move(block)); } -std::tuple MemStore::append_block(std::shared_ptr data) { - std::lock_guard guard(lock_); AKU_UNUSED(guard); - assert(data->get_size() == AKU_BLOCK_SIZE); - std::copy(data->get_data(), data->get_data() + AKU_BLOCK_SIZE, std::back_inserter(buffer_)); - if (append_callback_) { - append_callback_(write_pos_ + MEMSTORE_BASE); - } - auto addr = write_pos_++; - addr += MEMSTORE_BASE; - data->set_addr(addr); - return std::make_tuple(AKU_SUCCESS, addr); -} - -std::tuple MemStore::append_block(std::shared_ptr data) { +std::tuple MemStore::append_block(IOVecBlock &data) { std::lock_guard guard(lock_); AKU_UNUSED(guard); for (int i = 0; i < IOVecBlock::NCOMPONENTS; i++) { - if (data->get_size(i) != 0) { - const u8* p = data->get_cdata(i); + if (data.get_size(i) != 0) { + const u8* p = data.get_cdata(i); std::copy(p, p + IOVecBlock::COMPONENT_SIZE, std::back_inserter(buffer_)); } else { std::fill_n(std::back_inserter(buffer_), IOVecBlock::COMPONENT_SIZE, 0); @@ -750,7 +568,7 @@ std::tuple MemStore::append_block(std::shared_ptrset_addr(addr); + data.set_addr(addr); return std::make_tuple(AKU_SUCCESS, addr); } diff --git a/libakumuli/storage_engine/blockstore.h b/libakumuli/storage_engine/blockstore.h index 728da5d1..70fbc8ca 100644 --- a/libakumuli/storage_engine/blockstore.h +++ b/libakumuli/storage_engine/blockstore.h @@ -27,10 +27,8 @@ namespace Akumuli { namespace StorageEngine { -class Block; - struct BlockCache { - typedef std::shared_ptr PBlock; + typedef std::shared_ptr PBlock; std::vector block_cache_; const u32 bits_; // RNG @@ -71,17 +69,13 @@ struct BlockStore { /** Read block from blockstore */ - virtual std::tuple> read_block(LogicAddr addr) = 0; - - virtual std::tuple> read_iovec_block(LogicAddr addr) = 0; + virtual std::tuple> read_iovec_block(LogicAddr addr) = 0; /** Add block to blockstore. * @param data Pointer to buffer. * @return Status and block's logic address. */ - virtual std::tuple append_block(std::shared_ptr data) = 0; - - virtual std::tuple append_block(std::shared_ptr data) = 0; + virtual std::tuple append_block(IOVecBlock& data) = 0; //! Flush all pending changes. virtual void flush() = 0; @@ -89,8 +83,8 @@ struct BlockStore { //! Check if addr exists in block-store virtual bool exists(LogicAddr addr) const = 0; - //! Compute checksum of the input data. - virtual u32 checksum(u8 const* begin, size_t size) const = 0; + //! Compute checksum + virtual u32 checksum(u8 const* data, size_t size) const = 0; //! Compute checksum of the iovec block virtual u32 checksum(const IOVecBlock& block, size_t offset, size_t size) const = 0; @@ -134,9 +128,7 @@ class FileStorage : public BlockStore { * @param data Pointer to buffer. * @return Status and block's logic address. */ - virtual std::tuple append_block(std::shared_ptr data); - - virtual std::tuple append_block(std::shared_ptr data); + virtual std::tuple append_block(IOVecBlock &data); virtual void flush(); @@ -168,8 +160,7 @@ class FixedSizeFileStorage : public FileStorage, /** Read block from blockstore */ - virtual std::tuple> read_block(LogicAddr addr); - virtual std::tuple> read_iovec_block(LogicAddr addr); + virtual std::tuple > read_iovec_block(LogicAddr addr); }; class ExpandableFileStorage : public FileStorage, @@ -197,8 +188,7 @@ class ExpandableFileStorage : public FileStorage, /** Read block from blockstore */ - virtual std::tuple> read_block(LogicAddr addr); - virtual std::tuple> read_iovec_block(LogicAddr addr); + virtual std::tuple > read_iovec_block(LogicAddr addr); }; @@ -218,10 +208,8 @@ struct MemStore : BlockStore, std::enable_shared_from_this { MemStore(std::function append_cb, std::function read_cb); - virtual std::tuple > read_block(LogicAddr addr); - virtual std::tuple> read_iovec_block(LogicAddr addr); - virtual std::tuple append_block(std::shared_ptr data); - virtual std::tuple append_block(std::shared_ptr data); + virtual std::tuple> read_iovec_block(LogicAddr addr); + virtual std::tuple append_block(IOVecBlock& data); virtual void flush(); virtual bool exists(LogicAddr addr) const; virtual u32 checksum(const IOVecBlock &block, size_t offset, size_t size) const; @@ -240,37 +228,6 @@ struct MemStore : BlockStore, std::enable_shared_from_this { u32 reset_write_pos(u32 pos); }; - -//! Represents memory block -class Block { - std::vector data_; - LogicAddr addr_; - const u8* zptr_; - -public: - Block(LogicAddr addr, std::vector&& data); - - //! This c-tor is used in zero-copy mechanism, ptr should outlive the Block object - Block(LogicAddr addr, const u8* ptr); - - Block(); - - bool is_readonly() const; - - const u8* get_data() const; - - const u8* get_cdata() const; - - u8* get_data(); - - size_t get_size() const; - - LogicAddr get_addr() const; - - void set_addr(LogicAddr addr); -}; - - //! Should be used to create blockstore struct BlockStoreBuilder { static std::shared_ptr create_memstore(); diff --git a/libakumuli/storage_engine/compression.h b/libakumuli/storage_engine/compression.h index 6f23de52..ba408b96 100644 --- a/libakumuli/storage_engine/compression.h +++ b/libakumuli/storage_engine/compression.h @@ -484,6 +484,12 @@ struct IOVecVByteStreamWriter { u32 cnt_; u64 prev_; + IOVecVByteStreamWriter() + : block_(nullptr) + , cnt_(0) + , prev_(0) + {} + IOVecVByteStreamWriter(BlockT* block) : block_(block) , cnt_(0) @@ -505,7 +511,7 @@ struct IOVecVByteStreamWriter { return block_->allocate(n); } - bool empty() const { return block_->size() == 0; } + bool empty() const { return !block_ || block_->size() == 0; } //! Perform combined write (TVal should be integer) template bool encode(TVal fst, TVal snd) { diff --git a/libakumuli/storage_engine/nbtree.cpp b/libakumuli/storage_engine/nbtree.cpp index 955bf16c..3459cefd 100644 --- a/libakumuli/storage_engine/nbtree.cpp +++ b/libakumuli/storage_engine/nbtree.cpp @@ -37,9 +37,9 @@ namespace StorageEngine { std::ostream& operator << (std::ostream& out, NBTreeBlockType blocktype) { if (blocktype == NBTreeBlockType::LEAF) { - out << "NBTreeLeaf"; + out << "Leaf"; } else { - out << "NBTreeSuperblock"; + out << "Superblock"; } return out; } @@ -106,80 +106,58 @@ static std::string to_string(const SubtreeRef& ref) { return fmt.str(); } -static SubtreeRef* subtree_cast(u8* p) { - return reinterpret_cast(p); -} - -static SubtreeRef const* subtree_cast(u8 const* p) { - return reinterpret_cast(p); -} - -static std::tuple> read_and_check(std::shared_ptr bstore, LogicAddr curr) { +static std::tuple> read_and_check(std::shared_ptr bstore, LogicAddr curr) { aku_Status status; - std::shared_ptr block; - std::tie(status, block) = bstore->read_block(curr); + std::unique_ptr block; + std::tie(status, block) = bstore->read_iovec_block(curr); if (status != AKU_SUCCESS) { - return std::tie(status, block); - } - // Check consistency (works with both inner and leaf nodes). - u8 const* data = block->get_cdata(); - SubtreeRef const* subtree = subtree_cast(data); - u32 crc = bstore->checksum(data + sizeof(SubtreeRef), subtree->payload_size); - if (crc != subtree->checksum) { - std::stringstream fmt; - fmt << "Invalid checksum (addr: " << curr << ", level: " << subtree->level << ")"; - Logger::msg(AKU_LOG_ERROR, fmt.str()); - status = AKU_EBAD_DATA; + return std::make_tuple(status, std::move(block)); + } + if (block->get_size(0) == AKU_BLOCK_SIZE) { + // This check only makes sense when reading data back. In this case IOVecBlock will + // contain one large component. + u8 const* data = block->get_cdata(0); + SubtreeRef const* subtree = block->get_cheader(); + u32 crc = bstore->checksum(data + sizeof(SubtreeRef), subtree->payload_size); + if (crc != subtree->checksum) { + std::stringstream fmt; + fmt << "Invalid checksum (addr: " << curr << ", level: " << subtree->level << ")"; + Logger::msg(AKU_LOG_ERROR, fmt.str()); + status = AKU_EBAD_DATA; + } } - return std::tie(status, block); + return std::make_tuple(status, std::move(block)); } //! Read block from blockstoroe with all the checks. Panic on error! -static std::shared_ptr read_block_from_bstore(std::shared_ptr bstore, LogicAddr curr) { - aku_Status status; - std::shared_ptr block; - std::tie(status, block) = bstore->read_block(curr); - if (status != AKU_SUCCESS) { - Logger::msg(AKU_LOG_ERROR, "Can't read block @" + std::to_string(curr) + ", error: " + StatusUtil::str(status)); - AKU_PANIC("Can't read block - " + StatusUtil::str(status)); - } - // Check consistency (works with both inner and leaf nodes). - u8 const* data = block->get_cdata(); - SubtreeRef const* subtree = subtree_cast(data); - u32 crc = bstore->checksum(data + sizeof(SubtreeRef), subtree->payload_size); - if (crc != subtree->checksum) { - std::stringstream fmt; - fmt << "Invalid checksum (addr: " << curr << ", level: " << subtree->level << ")"; - AKU_PANIC(fmt.str()); - } - return block; -} - -//! Read block from blockstoroe with all the checks. Panic on error! -static std::shared_ptr read_iovec_block_from_bstore(std::shared_ptr bstore, LogicAddr curr) { +static std::unique_ptr read_iovec_block_from_bstore(std::shared_ptr bstore, LogicAddr curr) { aku_Status status; - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = bstore->read_iovec_block(curr); if (status != AKU_SUCCESS) { Logger::msg(AKU_LOG_ERROR, "Can't read block @" + std::to_string(curr) + ", error: " + StatusUtil::str(status)); AKU_PANIC("Can't read block - " + StatusUtil::str(status)); } // Check consistency (works with both inner and leaf nodes). - u8 const* data = block->get_cdata(0); - SubtreeRef const* subtree = subtree_cast(data); - u32 crc = bstore->checksum(data + sizeof(SubtreeRef), subtree->payload_size); - if (crc != subtree->checksum) { - std::stringstream fmt; - fmt << "Invalid checksum (addr: " << curr << ", level: " << subtree->level << ")"; - AKU_PANIC(fmt.str()); + if (block->get_size(0) == AKU_BLOCK_SIZE) { + // This check only makes sense when reading data back. In this case IOVecBlock will + // contain one large component. + u8 const* data = block->get_cdata(0); + SubtreeRef const* subtree = block->get_cheader(); + u32 crc = bstore->checksum(data + sizeof(SubtreeRef), subtree->payload_size); + if (crc != subtree->checksum) { + std::stringstream fmt; + fmt << "Invalid checksum (addr: " << curr << ", level: " << subtree->level << ")"; + AKU_PANIC(fmt.str()); + } } return block; } //! Initialize object from leaf node -aku_Status init_subtree_from_leaf(const NBTreeLeaf& leaf, SubtreeRef& out) { +aku_Status init_subtree_from_leaf(const IOVecLeaf& leaf, SubtreeRef& out) { if (leaf.nelements() == 0) { return AKU_EBAD_ARG; } @@ -192,21 +170,7 @@ aku_Status init_subtree_from_leaf(const NBTreeLeaf& leaf, SubtreeRef& out) { return AKU_SUCCESS; } -//! Initialize object from leaf node -static aku_Status init_subtree_from_leaf(const IOVecLeaf& leaf, SubtreeRef& out) { - if (leaf.nelements() == 0) { - return AKU_EBAD_ARG; - } - SubtreeRef const* meta = leaf.get_leafmeta(); - out = *meta; - out.payload_size = 0; - out.checksum = 0; - out.addr = EMPTY_ADDR; // Leaf metadta stores address of the previous node! - out.type = NBTreeBlockType::LEAF; - return AKU_SUCCESS; -} - -aku_Status init_subtree_from_subtree(const NBTreeSuperblock& node, SubtreeRef& backref) { +aku_Status init_subtree_from_subtree(const IOVecSuperblock& node, SubtreeRef& backref) { std::vector refs; aku_Status status = node.read_all(&refs); if (status != AKU_SUCCESS) { @@ -556,7 +520,8 @@ struct NBTreeSBlockIteratorBase : SeriesOperator { { } - NBTreeSBlockIteratorBase(std::shared_ptr bstore, NBTreeSuperblock const& sblock, aku_Timestamp begin, aku_Timestamp end) + template + NBTreeSBlockIteratorBase(std::shared_ptr bstore, SuperblockT const& sblock, aku_Timestamp begin, aku_Timestamp end) : begin_(begin) , end_(end) , addr_(EMPTY_ADDR) @@ -575,12 +540,12 @@ struct NBTreeSBlockIteratorBase : SeriesOperator { aku_Status init() { aku_Status status; - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore_, addr_); if (status != AKU_SUCCESS) { return status; } - NBTreeSuperblock current(block); + IOVecSuperblock current(std::move(block)); status = current.read_all(&refs_); refs_pos_ = begin_ < end_ ? 0 : static_cast(refs_.size()) - 1; return status; @@ -684,7 +649,8 @@ struct NBTreeSBlockIterator : NBTreeSBlockIteratorBase { { } - NBTreeSBlockIterator(std::shared_ptr bstore, NBTreeSuperblock const& sblock, aku_Timestamp begin, aku_Timestamp end) + template + NBTreeSBlockIterator(std::shared_ptr bstore, SuperblockT const& sblock, aku_Timestamp begin, aku_Timestamp end) : NBTreeSBlockIteratorBase(bstore, sblock, begin, end) { } @@ -693,15 +659,15 @@ struct NBTreeSBlockIterator : NBTreeSBlockIteratorBase { virtual std::tuple make_leaf_iterator(const SubtreeRef &ref) { assert(ref.type == NBTreeBlockType::LEAF); aku_Status status; - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore_, ref.addr); if (status != AKU_SUCCESS) { return std::make_tuple(status, std::unique_ptr()); } - auto blockref = subtree_cast(block->get_cdata()); + auto blockref = block->get_cheader(); assert(blockref->type == ref.type); AKU_UNUSED(blockref); - NBTreeLeaf leaf(block); + IOVecLeaf leaf(std::move(block)); std::unique_ptr result; result.reset(new NBTreeLeafIterator(begin_, end_, leaf)); return std::make_tuple(AKU_SUCCESS, std::move(result)); @@ -780,8 +746,9 @@ struct NBTreeSBlockFilter : NBTreeSBlockIteratorBase { { } + template NBTreeSBlockFilter(std::shared_ptr bstore, - NBTreeSuperblock const& sblock, + SuperblockT const& sblock, aku_Timestamp begin, aku_Timestamp end, const ValueFilter& filter) @@ -794,24 +761,24 @@ struct NBTreeSBlockFilter : NBTreeSBlockIteratorBase { virtual std::tuple make_leaf_iterator(const SubtreeRef &ref) { assert(ref.type == NBTreeBlockType::LEAF); aku_Status status; - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore_, ref.addr); if (status != AKU_SUCCESS) { return std::make_tuple(status, std::unique_ptr()); } - auto blockref = subtree_cast(block->get_cdata()); + auto blockref = block->get_cheader(); assert(blockref->type == ref.type); std::unique_ptr result; switch (filter_.get_overlap(*blockref)) { case RangeOverlap::FULL_OVERLAP: { // Return normal leaf iterator because it's faster - NBTreeLeaf leaf(block); + IOVecLeaf leaf(std::move(block)); result.reset(new NBTreeLeafIterator(begin_, end_, leaf)); break; } case RangeOverlap::PARTIAL_OVERLAP: { // Return filtering leaf operator - NBTreeLeaf leaf(block); + IOVecLeaf leaf(std::move(block)); result.reset(new NBTreeLeafFilter(begin_, end_, filter_, leaf)); break; } @@ -991,8 +958,9 @@ class NBTreeSBlockAggregator : public NBTreeSBlockIteratorBase NBTreeSBlockAggregator(std::shared_ptr bstore, - NBTreeSuperblock const& sblock, + SuperblockT const& sblock, aku_Timestamp begin, aku_Timestamp end) : NBTreeSBlockIteratorBase(bstore, sblock, begin, end) @@ -1067,13 +1035,13 @@ std::tuple > NBTreeSBlockAggregat return std::make_tuple(AKU_EUNAVAILABLE, std::move(empty)); } aku_Status status; - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore_, ref.addr); if (status != AKU_SUCCESS) { return std::make_tuple(status, std::unique_ptr()); } leftmost_leaf_found_ = true; - NBTreeLeaf leaf(block); + IOVecLeaf leaf(std::move(block)); std::unique_ptr result; result.reset(new NBTreeLeafAggregator(begin_, end_, leaf)); return std::make_tuple(AKU_SUCCESS, std::move(result)); @@ -1241,8 +1209,9 @@ class NBTreeSBlockGroupAggregator : public NBTreeSBlockIteratorBase NBTreeSBlockGroupAggregator(std::shared_ptr bstore, - NBTreeSuperblock const& sblock, + SuperblockT const& sblock, aku_Timestamp begin, aku_Timestamp end, u64 step) @@ -1426,12 +1395,12 @@ std::tuple NBTreeSBlockGroupAggregator::read(aku_Timestamp * std::tuple> NBTreeSBlockGroupAggregator::make_leaf_iterator(SubtreeRef const& ref) { aku_Status status; - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore_, ref.addr); if (status != AKU_SUCCESS) { return std::make_tuple(status, std::unique_ptr()); } - NBTreeLeaf leaf(block); + IOVecLeaf leaf(std::move(block)); std::unique_ptr result; result.reset(new NBTreeLeafGroupAggregator(begin_, end_, step_, leaf)); return std::make_tuple(AKU_SUCCESS, std::move(result)); @@ -1519,8 +1488,9 @@ class NBTreeGroupAggregateFilter : public AggregateOperator { class NBTreeSBlockCandlesticsIter : public NBTreeSBlockIteratorBase { NBTreeCandlestickHint hint_; public: + template NBTreeSBlockCandlesticsIter(std::shared_ptr bstore, - NBTreeSuperblock const& sblock, + SuperblockT const& sblock, aku_Timestamp begin, aku_Timestamp end, NBTreeCandlestickHint hint) @@ -1582,18 +1552,20 @@ std::tuple NBTreeSBlockCandlesticsIter::read(aku_Timestamp * } -// //////////////// // -// NBTreeLeaf // -// //////////////// // +// ///////// // +// IOVecLeaf // -NBTreeLeaf::NBTreeLeaf(aku_ParamId id, LogicAddr prev, u16 fanout_index) +IOVecLeaf::IOVecLeaf(aku_ParamId id, LogicAddr prev, u16 fanout_index) : prev_(prev) - , block_(std::make_shared()) - , writer_(id, block_->get_data() + sizeof(SubtreeRef), AKU_BLOCK_SIZE - sizeof(SubtreeRef)) + , block_(new IOVecBlock()) + , writer_(block_.get()) , fanout_index_(fanout_index) { // Check that invariant holds. - SubtreeRef* subtree = subtree_cast(block_->get_data()); + SubtreeRef* subtree = block_->allocate(); + if (subtree == nullptr) { + AKU_PANIC("Can't allocate space in IOVecBlock"); + } subtree->addr = prev; subtree->level = 0; // Leaf node subtree->type = NBTreeBlockType::LEAF; @@ -1612,41 +1584,45 @@ NBTreeLeaf::NBTreeLeaf(aku_ParamId id, LogicAddr prev, u16 fanout_index) subtree->max_time = std::numeric_limits::lowest(); subtree->first = .0; subtree->last = .0; + + // Initialize the writer + writer_.init(id); } -NBTreeLeaf::NBTreeLeaf(std::shared_ptr bstore, LogicAddr curr) - : NBTreeLeaf(read_block_from_bstore(bstore, curr)) +IOVecLeaf::IOVecLeaf(std::shared_ptr bstore, LogicAddr curr) + : IOVecLeaf(read_iovec_block_from_bstore(bstore, curr)) { } -NBTreeLeaf::NBTreeLeaf(std::shared_ptr block) +IOVecLeaf::IOVecLeaf(std::unique_ptr block) : prev_(EMPTY_ADDR) + , block_(std::move(block)) { - block_ = block; - const SubtreeRef* subtree = subtree_cast(block_->get_cdata()); + const SubtreeRef* subtree = block_->get_cheader(); prev_ = subtree->addr; fanout_index_ = subtree->fanout_index; } -static std::shared_ptr clone(std::shared_ptr block) { - auto res = std::make_shared(); - memcpy(res->get_data(), block->get_cdata(), AKU_BLOCK_SIZE); +static std::unique_ptr clone(const std::unique_ptr& block) { + std::unique_ptr res(new IOVecBlock()); + res->copy_from(*block); return res; } -static aku_ParamId getid(std::shared_ptr const& block) { - auto ptr = reinterpret_cast(block->get_cdata()); +static aku_ParamId getid(std::unique_ptr const& block) { + auto ptr = block->get_header(); return ptr->id; } -NBTreeLeaf::NBTreeLeaf(std::shared_ptr block, NBTreeLeaf::CloneTag) +IOVecLeaf::IOVecLeaf(std::unique_ptr block, IOVecLeaf::CloneTag) : prev_(EMPTY_ADDR) , block_(clone(block)) - , writer_(getid(block_), block_->get_data() + sizeof(SubtreeRef), AKU_BLOCK_SIZE - sizeof(SubtreeRef)) + , writer_(block_.get()) { + writer_.init(getid(block)); // Re-insert the data - DataBlockReader reader(block->get_cdata() + sizeof(SubtreeRef), block->get_size()); + IOVecBlockReader reader(block.get(), static_cast(sizeof(SubtreeRef))); size_t sz = reader.nelements(); for (size_t ix = 0; ix < sz; ix++) { aku_Status status; @@ -1666,332 +1642,7 @@ NBTreeLeaf::NBTreeLeaf(std::shared_ptr block, NBTreeLeaf::CloneTag) } } - const SubtreeRef* subtree = subtree_cast(block_->get_cdata()); - prev_ = subtree->addr; - fanout_index_ = subtree->fanout_index; -} - -size_t NBTreeLeaf::_get_uncommitted_size() const { - return static_cast(writer_.get_write_index()); -} - -SubtreeRef const* NBTreeLeaf::get_leafmeta() const { - return subtree_cast(block_->get_cdata()); -} - -size_t NBTreeLeaf::nelements() const { - SubtreeRef const* subtree = subtree_cast(block_->get_cdata()); - return subtree->count; -} - -u16 NBTreeLeaf::get_fanout() const { - return fanout_index_; -} - -aku_ParamId NBTreeLeaf::get_id() const { - SubtreeRef const* subtree = subtree_cast(block_->get_cdata()); - return subtree->id; -} - -std::tuple NBTreeLeaf::get_timestamps() const { - SubtreeRef const* subtree = subtree_cast(block_->get_cdata()); - return std::make_tuple(subtree->begin, subtree->end); -} - -void NBTreeLeaf::set_prev_addr(LogicAddr addr) { - prev_ = addr; - SubtreeRef* subtree = subtree_cast(block_->get_data()); - subtree->addr = addr; -} - -void NBTreeLeaf::set_node_fanout(u16 fanout) { - assert(fanout <= AKU_NBTREE_FANOUT); - fanout_index_ = fanout; - SubtreeRef* subtree = subtree_cast(block_->get_data()); - subtree->fanout_index = fanout; -} - -LogicAddr NBTreeLeaf::get_addr() const { - return block_->get_addr(); -} - -LogicAddr NBTreeLeaf::get_prev_addr() const { - // Should be set correctly no metter how NBTreeLeaf was created. - return prev_; -} - - -aku_Status NBTreeLeaf::read_all(std::vector* timestamps, - std::vector* values) const -{ - int windex = writer_.get_write_index(); - DataBlockReader reader(block_->get_cdata() + sizeof(SubtreeRef), block_->get_size()); - size_t sz = reader.nelements(); - timestamps->reserve(sz); - values->reserve(sz); - for (size_t ix = 0; ix < sz; ix++) { - aku_Status status; - aku_Timestamp ts; - double value; - std::tie(status, ts, value) = reader.next(); - if (status != AKU_SUCCESS) { - return status; - } - timestamps->push_back(ts); - values->push_back(value); - } - // Read tail elements from `writer_` - if (windex != 0) { - writer_.read_tail_elements(timestamps, values); - } - return AKU_SUCCESS; -} - -aku_Status NBTreeLeaf::append(aku_Timestamp ts, double value) { - aku_Status status = writer_.put(ts, value); - if (status == AKU_SUCCESS) { - SubtreeRef* subtree = subtree_cast(block_->get_data()); - subtree->end = ts; - subtree->last = value; - if (subtree->count == 0) { - subtree->begin = ts; - subtree->first = value; - } - subtree->count++; - subtree->sum += value; - if (subtree->max < value) { - subtree->max = value; - subtree->max_time = ts; - } - if (subtree->min > value) { - subtree->min = value; - subtree->min_time = ts; - } - } - return status; -} - -std::tuple NBTreeLeaf::commit(std::shared_ptr bstore) { - assert(nelements() != 0); - u16 size = static_cast(writer_.commit()); - assert(size); - SubtreeRef* subtree = subtree_cast(block_->get_data()); - subtree->payload_size = size; - if (prev_ != EMPTY_ADDR && fanout_index_ > 0) { - subtree->addr = prev_; - } else { - // addr = EMPTY indicates that there is - // no link to previous node. - subtree->addr = EMPTY_ADDR; - // Invariant: fanout index should be 0 in this case. - } - subtree->version = AKUMULI_VERSION; - subtree->level = 0; - subtree->type = NBTreeBlockType::LEAF; - subtree->fanout_index = fanout_index_; - // Compute checksum - subtree->checksum = bstore->checksum(block_->get_cdata() + sizeof(SubtreeRef), size); - return bstore->append_block(block_); -} - - -std::unique_ptr NBTreeLeaf::range(aku_Timestamp begin, aku_Timestamp end) const { - std::unique_ptr it; - it.reset(new NBTreeLeafIterator(begin, end, *this)); - return it; -} - -std::unique_ptr NBTreeLeaf::filter(aku_Timestamp begin, - aku_Timestamp end, - const ValueFilter& filter) const -{ - std::unique_ptr it; - it.reset(new NBTreeLeafFilter(begin, end, filter, *this)); - return it; -} - -std::unique_ptr NBTreeLeaf::aggregate(aku_Timestamp begin, aku_Timestamp end) const { - std::unique_ptr it; - it.reset(new NBTreeLeafAggregator(begin, end, *this)); - return it; -} - -std::unique_ptr NBTreeLeaf::candlesticks(aku_Timestamp begin, aku_Timestamp end, NBTreeCandlestickHint hint) const { - AKU_UNUSED(hint); - auto agg = INIT_AGGRES; - const SubtreeRef* subtree = subtree_cast(block_->get_cdata()); - agg.copy_from(*subtree); - std::unique_ptr result; - AggregateOperator::Direction dir = begin < end ? AggregateOperator::Direction::FORWARD : AggregateOperator::Direction::BACKWARD; - result.reset(new ValueAggregator(subtree->end, agg, dir)); - return result; -} - -std::unique_ptr NBTreeLeaf::group_aggregate(aku_Timestamp begin, aku_Timestamp end, u64 step) const { - std::unique_ptr it; - it.reset(new NBTreeLeafGroupAggregator(begin, end, step, *this)); - return it; -} - -std::tuple NBTreeLeaf::split_into(std::shared_ptr bstore, - aku_Timestamp pivot, - bool preserve_backrefs, - u16 *fanout_index, - NBTreeSuperblock* top_level) -{ - /* When the method is called from NBTreeSuperblock::split method, the - * top_level node will be provided. Otherwise it will be null. - */ - aku_Status status; - std::vector xss; - std::vector tss; - status = read_all(&tss, &xss); - if (status != AKU_SUCCESS || tss.size() == 0) { - return std::make_tuple(status, EMPTY_ADDR); - } - // Make new superblock with two leafs - // Left hand side leaf node - u32 ixbase = 0; - NBTreeLeaf lhs(get_id(), preserve_backrefs ? prev_ : EMPTY_ADDR, *fanout_index); - for (u32 i = 0; i < tss.size(); i++) { - if (tss[i] < pivot) { - status = lhs.append(tss[i], xss[i]); - if (status != AKU_SUCCESS) { - return std::make_tuple(status, EMPTY_ADDR); - } - } else { - ixbase = i; - break; - } - } - SubtreeRef lhs_ref; - if (ixbase == 0) { - // Special case, the lhs node is empty - lhs_ref.addr = EMPTY_ADDR; - } else { - LogicAddr lhs_addr; - std::tie(status, lhs_addr) = lhs.commit(bstore); - if (status != AKU_SUCCESS) { - return std::make_tuple(status, EMPTY_ADDR); - } - status = init_subtree_from_leaf(lhs, lhs_ref); - if (status != AKU_SUCCESS) { - return std::make_tuple(status, EMPTY_ADDR); - } - lhs_ref.addr = lhs_addr; - (*fanout_index)++; - } - // Right hand side leaf node, it can't be empty in any case - // because the leaf node is not empty. - auto prev = lhs_ref.addr == EMPTY_ADDR ? prev_ : lhs_ref.addr; - NBTreeLeaf rhs(get_id(), prev, *fanout_index); - for (u32 i = ixbase; i < tss.size(); i++) { - status = rhs.append(tss[i], xss[i]); - if (status != AKU_SUCCESS) { - return std::make_tuple(status, EMPTY_ADDR); - } - } - SubtreeRef rhs_ref; - if (ixbase == tss.size()) { - // Special case, rhs is empty - rhs_ref.addr = EMPTY_ADDR; - } else { - LogicAddr rhs_addr; - std::tie(status, rhs_addr) = rhs.commit(bstore); - if (status != AKU_SUCCESS) { - return std::make_tuple(status, EMPTY_ADDR); - } - status = init_subtree_from_leaf(rhs, rhs_ref); - if (status != AKU_SUCCESS) { - return std::make_tuple(status, EMPTY_ADDR); - } - rhs_ref.addr = rhs_addr; - (*fanout_index)++; - } - // Superblock - if (lhs_ref.addr != EMPTY_ADDR) { - status = top_level->append(lhs_ref); - if (status != AKU_SUCCESS) { - return std::make_tuple(status, EMPTY_ADDR); - } - } - if (rhs_ref.addr != EMPTY_ADDR) { - status = top_level->append(rhs_ref); - if (status != AKU_SUCCESS) { - return std::make_tuple(status, EMPTY_ADDR); - } - } - return std::make_tuple(AKU_SUCCESS, EMPTY_ADDR); -} - -std::tuple NBTreeLeaf::split(std::shared_ptr bstore, - aku_Timestamp pivot, - bool preserve_backrefs) -{ - // New superblock - NBTreeSuperblock sblock(get_id(), preserve_backrefs ? get_prev_addr() : EMPTY_ADDR, get_fanout(), 0); - aku_Status status; - LogicAddr addr; - u16 fanout = 0; - std::tie(status, addr) = split_into(bstore, pivot, false, &fanout, &sblock); - if (status != AKU_SUCCESS || sblock.nelements() == 0) { - return std::make_tuple(status, EMPTY_ADDR); - } - std::tie(status, addr) = sblock.commit(bstore); - if (status != AKU_SUCCESS) { - return std::make_tuple(status, EMPTY_ADDR); - } - return std::make_tuple(AKU_SUCCESS, addr); -} - -// ///////// // -// IOVecLeaf // - -IOVecLeaf::IOVecLeaf(aku_ParamId id, LogicAddr prev, u16 fanout_index) - : prev_(prev) - , block_(std::make_shared()) - , writer_(block_.get()) - , fanout_index_(fanout_index) -{ - // Check that invariant holds. - SubtreeRef* subtree = block_->allocate(); - if (subtree == nullptr) { - AKU_PANIC("Can't allocate space in IOVecBlock"); - } - subtree->addr = prev; - subtree->level = 0; // Leaf node - subtree->type = NBTreeBlockType::LEAF; - subtree->id = id; - subtree->version = AKUMULI_VERSION; - subtree->payload_size = 0; - subtree->fanout_index = fanout_index; - // values that should be updated by insert - subtree->begin = std::numeric_limits::max(); - subtree->end = 0; - subtree->count = 0; - subtree->min = std::numeric_limits::max(); - subtree->max = std::numeric_limits::lowest(); - subtree->sum = 0; - subtree->min_time = std::numeric_limits::max(); - subtree->max_time = std::numeric_limits::lowest(); - subtree->first = .0; - subtree->last = .0; - - // Initialize the writer - writer_.init(id); -} - - -IOVecLeaf::IOVecLeaf(std::shared_ptr bstore, LogicAddr curr) - : IOVecLeaf(read_iovec_block_from_bstore(bstore, curr)) -{ -} - -IOVecLeaf::IOVecLeaf(std::shared_ptr block) - : prev_(EMPTY_ADDR) - , block_(block) -{ - const SubtreeRef* subtree = subtree_cast(block_->get_cdata(0)); + const SubtreeRef* subtree = block_->get_cheader(); prev_ = subtree->addr; fanout_index_ = subtree->fanout_index; } @@ -2009,11 +1660,11 @@ size_t IOVecLeaf::bytes_used() const { } SubtreeRef const* IOVecLeaf::get_leafmeta() const { - return subtree_cast(block_->get_cdata(0)); + return block_->get_cheader(); } size_t IOVecLeaf::nelements() const { - SubtreeRef const* subtree = subtree_cast(block_->get_cdata(0)); + SubtreeRef const* subtree = block_->get_cheader(); return subtree->count; } @@ -2022,25 +1673,25 @@ u16 IOVecLeaf::get_fanout() const { } aku_ParamId IOVecLeaf::get_id() const { - SubtreeRef const* subtree = subtree_cast(block_->get_cdata(0)); + SubtreeRef const* subtree = block_->get_cheader(); return subtree->id; } std::tuple IOVecLeaf::get_timestamps() const { - SubtreeRef const* subtree = subtree_cast(block_->get_cdata(0)); + SubtreeRef const* subtree = block_->get_cheader(); return std::make_tuple(subtree->begin, subtree->end); } void IOVecLeaf::set_prev_addr(LogicAddr addr) { prev_ = addr; - SubtreeRef* subtree = subtree_cast(block_->get_data(0)); + SubtreeRef* subtree = block_->get_header(); subtree->addr = addr; } void IOVecLeaf::set_node_fanout(u16 fanout) { assert(fanout <= AKU_NBTREE_FANOUT); fanout_index_ = fanout; - SubtreeRef* subtree = subtree_cast(block_->get_data(0)); + SubtreeRef* subtree = block_->get_header(); subtree->fanout_index = fanout; } @@ -2083,7 +1734,7 @@ aku_Status IOVecLeaf::read_all(std::vector* timestamps, aku_Status IOVecLeaf::append(aku_Timestamp ts, double value) { aku_Status status = writer_.put(ts, value); if (status == AKU_SUCCESS) { - SubtreeRef* subtree = subtree_cast(block_->get_data(0)); + SubtreeRef* subtree = block_->get_header(); subtree->end = ts; subtree->last = value; if (subtree->count == 0) { @@ -2108,7 +1759,7 @@ std::tuple IOVecLeaf::commit(std::shared_ptr assert(nelements() != 0); u16 size = static_cast(writer_.commit()) - sizeof(SubtreeRef); assert(size); - SubtreeRef* subtree = subtree_cast(block_->get_data(0)); + SubtreeRef* subtree = block_->get_header(); subtree->payload_size = size; if (prev_ != EMPTY_ADDR && fanout_index_ > 0) { subtree->addr = prev_; @@ -2124,7 +1775,7 @@ std::tuple IOVecLeaf::commit(std::shared_ptr subtree->fanout_index = fanout_index_; // Compute checksum subtree->checksum = bstore->checksum(*block_, sizeof(SubtreeRef), size); - return bstore->append_block(block_); + return bstore->append_block(*block_); } @@ -2152,7 +1803,7 @@ std::unique_ptr IOVecLeaf::aggregate(aku_Timestamp begin, aku std::unique_ptr IOVecLeaf::candlesticks(aku_Timestamp begin, aku_Timestamp end, NBTreeCandlestickHint hint) const { AKU_UNUSED(hint); auto agg = INIT_AGGRES; - const SubtreeRef* subtree = subtree_cast(block_->get_cdata(0)); + const SubtreeRef* subtree = block_->get_cheader(); agg.copy_from(*subtree); std::unique_ptr result; AggregateOperator::Direction dir = begin < end ? AggregateOperator::Direction::FORWARD : AggregateOperator::Direction::BACKWARD; @@ -2170,9 +1821,9 @@ std::tuple IOVecLeaf::split_into(std::shared_ptr IOVecLeaf::split(std::shared_ptr b bool preserve_backrefs) { // New superblock - NBTreeSuperblock sblock(get_id(), preserve_backrefs ? get_prev_addr() : EMPTY_ADDR, get_fanout(), 0); + IOVecSuperblock sblock(get_id(), preserve_backrefs ? get_prev_addr() : EMPTY_ADDR, get_fanout(), 0); aku_Status status; LogicAddr addr; u16 fanout = 0; @@ -2278,12 +1929,12 @@ std::tuple IOVecLeaf::split(std::shared_ptr b } -// //////////////////////// // -// NBTreeSuperblock // -// //////////////////////// // +// /////////////// // +// IOVecSuperblock // +// /////////////// // -NBTreeSuperblock::NBTreeSuperblock(aku_ParamId id, LogicAddr prev, u16 fanout, u16 lvl) - : block_(std::make_shared()) +IOVecSuperblock::IOVecSuperblock(aku_ParamId id, LogicAddr prev, u16 fanout, u16 lvl) + : block_(new IOVecBlock()) , id_(id) , write_pos_(0) , fanout_index_(fanout) @@ -2291,17 +1942,18 @@ NBTreeSuperblock::NBTreeSuperblock(aku_ParamId id, LogicAddr prev, u16 fanout, u , prev_(prev) , immutable_(false) { - SubtreeRef* pref = subtree_cast(block_->get_data()); - pref->type = NBTreeBlockType::INNER; + SubtreeRef ref = {}; + ref.type = NBTreeBlockType::INNER; + block_->append_chunk(&ref, sizeof(ref)); assert(prev_ != 0); } -NBTreeSuperblock::NBTreeSuperblock(std::shared_ptr block) - : block_(block) +IOVecSuperblock::IOVecSuperblock(std::unique_ptr block) + : block_(std::move(block)) , immutable_(true) { // Use zero-copy here. - SubtreeRef const* ref = subtree_cast(block->get_cdata()); + SubtreeRef const* ref = block_->get_cheader(); assert(ref->type == NBTreeBlockType::INNER); id_ = ref->id; fanout_index_ = ref->fanout_index; @@ -2311,17 +1963,17 @@ NBTreeSuperblock::NBTreeSuperblock(std::shared_ptr block) assert(prev_ != 0); } -NBTreeSuperblock::NBTreeSuperblock(LogicAddr addr, std::shared_ptr bstore) - : NBTreeSuperblock(read_block_from_bstore(bstore, addr)) +IOVecSuperblock::IOVecSuperblock(LogicAddr addr, std::shared_ptr bstore) + : IOVecSuperblock(read_iovec_block_from_bstore(bstore, addr)) { } -NBTreeSuperblock::NBTreeSuperblock(LogicAddr addr, std::shared_ptr bstore, bool remove_last) - : block_(std::make_shared()) +IOVecSuperblock::IOVecSuperblock(LogicAddr addr, std::shared_ptr bstore, bool remove_last) + : block_(new IOVecBlock()) , immutable_(false) { - std::shared_ptr block = read_block_from_bstore(bstore, addr); - SubtreeRef const* ref = subtree_cast(block->get_cdata()); + std::unique_ptr block = read_iovec_block_from_bstore(bstore, addr); + SubtreeRef const* ref = block->get_cheader(); assert(ref->type == NBTreeBlockType::INNER); id_ = ref->id; fanout_index_ = ref->fanout_index; @@ -2333,51 +1985,55 @@ NBTreeSuperblock::NBTreeSuperblock(LogicAddr addr, std::shared_ptr b } assert(prev_ != 0); // We can't use zero-copy here because `block` belongs to other node. - memcpy(block_->get_data(), block->get_cdata(), AKU_BLOCK_SIZE); + block_->copy_from(*block); + + // Shrink block size if possible to save memory + int bytes_used = static_cast((write_pos_ + 1) * sizeof(SubtreeRef)); + block_->set_write_pos_and_shrink(bytes_used); } -SubtreeRef const* NBTreeSuperblock::get_sblockmeta() const { - SubtreeRef const* pref = subtree_cast(block_->get_cdata()); +SubtreeRef const* IOVecSuperblock::get_sblockmeta() const { + SubtreeRef const* pref = block_->get_cheader(); return pref; } -size_t NBTreeSuperblock::nelements() const { +size_t IOVecSuperblock::nelements() const { return write_pos_; } -u16 NBTreeSuperblock::get_level() const { +u16 IOVecSuperblock::get_level() const { return level_; } -u16 NBTreeSuperblock::get_fanout() const { +u16 IOVecSuperblock::get_fanout() const { return fanout_index_; } -aku_ParamId NBTreeSuperblock::get_id() const { +aku_ParamId IOVecSuperblock::get_id() const { return id_; } -LogicAddr NBTreeSuperblock::get_prev_addr() const { +LogicAddr IOVecSuperblock::get_prev_addr() const { return prev_; } -void NBTreeSuperblock::set_prev_addr(LogicAddr addr) { +void IOVecSuperblock::set_prev_addr(LogicAddr addr) { assert(addr != 0); prev_ = addr; - subtree_cast(block_->get_data())->addr = addr; + block_->get_header()->addr = addr; } -void NBTreeSuperblock::set_node_fanout(u16 newfanout) { +void IOVecSuperblock::set_node_fanout(u16 newfanout) { assert(newfanout <= AKU_NBTREE_FANOUT); fanout_index_ = newfanout; - subtree_cast(block_->get_data())->fanout_index = newfanout; + block_->get_header()->fanout_index = newfanout; } -LogicAddr NBTreeSuperblock::get_addr() const { +LogicAddr IOVecSuperblock::get_addr() const { return block_->get_addr(); } -aku_Status NBTreeSuperblock::append(const SubtreeRef &p) { +aku_Status IOVecSuperblock::append(const SubtreeRef &p) { if (is_full()) { return AKU_EOVERFLOW; } @@ -2385,9 +2041,11 @@ aku_Status NBTreeSuperblock::append(const SubtreeRef &p) { return AKU_EBAD_DATA; } // Write data into buffer - SubtreeRef* pref = subtree_cast(block_->get_data()); - auto it = pref + 1 + write_pos_; - *it = p; + u32 bytes_written = block_->append_chunk(&p, sizeof(SubtreeRef)); + if (bytes_written == 0) { + return AKU_ENO_MEM; + } + SubtreeRef* pref = reinterpret_cast(block_->get_data(0)); if (write_pos_ == 0) { pref->begin = p.begin; } @@ -2396,12 +2054,12 @@ aku_Status NBTreeSuperblock::append(const SubtreeRef &p) { return AKU_SUCCESS; } -std::tuple NBTreeSuperblock::commit(std::shared_ptr bstore) { +std::tuple IOVecSuperblock::commit(std::shared_ptr bstore) { assert(nelements() != 0); if (immutable_) { return std::make_tuple(AKU_EBAD_DATA, EMPTY_ADDR); } - SubtreeRef* backref = subtree_cast(block_->get_data()); + SubtreeRef* backref = block_->get_header(); auto status = init_subtree_from_subtree(*this, *backref); if (status != AKU_SUCCESS) { return std::make_tuple(status, EMPTY_ADDR); @@ -2415,34 +2073,41 @@ std::tuple NBTreeSuperblock::commit(std::shared_ptrtype = NBTreeBlockType::INNER; backref->version = AKUMULI_VERSION; // add checksum - backref->checksum = bstore->checksum(block_->get_cdata() + sizeof(SubtreeRef), backref->payload_size); - return bstore->append_block(block_); + backref->checksum = bstore->checksum(block_->get_cdata(0) + sizeof(SubtreeRef), backref->payload_size); + return bstore->append_block(*block_); } -bool NBTreeSuperblock::is_full() const { +bool IOVecSuperblock::is_full() const { return write_pos_ >= AKU_NBTREE_FANOUT; } -aku_Status NBTreeSuperblock::read_all(std::vector* refs) const { - SubtreeRef const* ref = subtree_cast(block_->get_cdata()); +aku_Status IOVecSuperblock::read_all(std::vector* refs) const { for(u32 ix = 0u; ix < write_pos_; ix++) { - auto p = ref + 1 + ix; - refs->push_back(*p); + SubtreeRef item; + u32 res = block_->read_chunk(&item, sizeof(SubtreeRef) * (ix + 1), sizeof(SubtreeRef)); + if (res == 0) { + return AKU_EBAD_DATA; + } + refs->push_back(item); } return AKU_SUCCESS; } -bool NBTreeSuperblock::top(SubtreeRef* outref) const { +bool IOVecSuperblock::top(SubtreeRef* outref) const { if (write_pos_ == 0) { return false; } - SubtreeRef const* ref = subtree_cast(block_->get_cdata()); - auto p = ref + write_pos_; - *outref = *p; + SubtreeRef item; + u32 offset = sizeof(item) * write_pos_; + u32 res = block_->read_chunk(&item, offset, sizeof(item)); + if (res == 0) { + return false; + } + *outref = item; return true; } -bool NBTreeSuperblock::top(LogicAddr* outaddr) const { +bool IOVecSuperblock::top(LogicAddr* outaddr) const { SubtreeRef child; if (top(&child)) { *outaddr = child.addr; @@ -2451,12 +2116,12 @@ bool NBTreeSuperblock::top(LogicAddr* outaddr) const { return false; } -std::tuple NBTreeSuperblock::get_timestamps() const { - SubtreeRef const* pref = subtree_cast(block_->get_cdata()); +std::tuple IOVecSuperblock::get_timestamps() const { + SubtreeRef const* pref = block_->get_cheader(); return std::tie(pref->begin, pref->end); } -std::unique_ptr NBTreeSuperblock::search(aku_Timestamp begin, +std::unique_ptr IOVecSuperblock::search(aku_Timestamp begin, aku_Timestamp end, std::shared_ptr bstore) const { @@ -2465,7 +2130,7 @@ std::unique_ptr NBTreeSuperblock::search(aku_Timestamp begin return result; } -std::unique_ptr NBTreeSuperblock::filter(aku_Timestamp begin, +std::unique_ptr IOVecSuperblock::filter(aku_Timestamp begin, aku_Timestamp end, const ValueFilter& filter, std::shared_ptr bstore) const @@ -2475,7 +2140,7 @@ std::unique_ptr NBTreeSuperblock::filter(aku_Timestamp begin return result; } -std::unique_ptr NBTreeSuperblock::aggregate(aku_Timestamp begin, +std::unique_ptr IOVecSuperblock::aggregate(aku_Timestamp begin, aku_Timestamp end, std::shared_ptr bstore) const { @@ -2484,7 +2149,7 @@ std::unique_ptr NBTreeSuperblock::aggregate(aku_Timestamp beg return result; } -std::unique_ptr NBTreeSuperblock::candlesticks(aku_Timestamp begin, aku_Timestamp end, +std::unique_ptr IOVecSuperblock::candlesticks(aku_Timestamp begin, aku_Timestamp end, std::shared_ptr bstore, NBTreeCandlestickHint hint) const { @@ -2493,7 +2158,7 @@ std::unique_ptr NBTreeSuperblock::candlesticks(aku_Timestamp return result; } -std::unique_ptr NBTreeSuperblock::group_aggregate(aku_Timestamp begin, +std::unique_ptr IOVecSuperblock::group_aggregate(aku_Timestamp begin, aku_Timestamp end, u64 step, std::shared_ptr bstore) const @@ -2503,10 +2168,10 @@ std::unique_ptr NBTreeSuperblock::group_aggregate(aku_Timesta return result; } -std::tuple NBTreeSuperblock::split_into(std::shared_ptr bstore, +std::tuple IOVecSuperblock::split_into(std::shared_ptr bstore, aku_Timestamp pivot, bool preserve_horizontal_links, - NBTreeSuperblock* root) + SuperblockAppender *root) { // for each node in BFS order: // if pivot is inside the node: @@ -2528,25 +2193,25 @@ std::tuple NBTreeSuperblock::split_into(std::shared_ptrappend(refs[j]); current_fanout++; } - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore, refs[i].addr); if (status != AKU_SUCCESS) { return std::make_tuple(status, EMPTY_ADDR); } - auto refsi = subtree_cast(block->get_cdata()); + auto refsi = block->get_cheader(); assert(refsi->count == refs[i].count); assert(refsi->type == refs[i].type); assert(refsi->begin == refs[i].begin); AKU_UNUSED(refsi); if (refs[i].type == NBTreeBlockType::INNER) { - NBTreeSuperblock sblock(block); + IOVecSuperblock sblock(std::move(block)); LogicAddr ignored; std::tie(status, new_ith_child_addr, ignored) = sblock.split(bstore, pivot, false); if (status != AKU_SUCCESS) { return std::make_tuple(status, EMPTY_ADDR); } } else { - NBTreeLeaf oldleaf(block); + IOVecLeaf oldleaf(std::move(block)); if ((refs.size() - AKU_NBTREE_FANOUT) > 1) { // Split in-place std::tie(status, new_ith_child_addr) = oldleaf.split_into(bstore, pivot, preserve_horizontal_links, ¤t_fanout, root); @@ -2563,8 +2228,8 @@ std::tuple NBTreeSuperblock::split_into(std::shared_ptr block = read_block_from_bstore(bstore, new_ith_child_addr); - NBTreeSuperblock child(block); + auto block = read_iovec_block_from_bstore(bstore, new_ith_child_addr); + IOVecSuperblock child(std::move(block)); status = init_subtree_from_subtree(child, newref); if (status != AKU_SUCCESS) { return std::make_tuple(status, EMPTY_ADDR); @@ -2583,7 +2248,7 @@ std::tuple NBTreeSuperblock::split_into(std::shared_ptr NBTreeSuperblock::split_into(std::shared_ptr child_block; + std::unique_ptr child_block; std::tie(status, child_block) = read_and_check(bstore, refs[j].addr); - NBTreeLeaf cloned_child(child_block, NBTreeLeaf::CloneTag()); + IOVecLeaf cloned_child(std::move(child_block), IOVecLeaf::CloneTag()); cloned_child.set_prev_addr(last_child_addr); cloned_child.set_node_fanout(current_fanout); current_fanout++; @@ -2636,13 +2301,13 @@ std::tuple NBTreeSuperblock::split_into(std::shared_ptr NBTreeSuperblock::split(std::shared_ptr bstore, +std::tuple IOVecSuperblock::split(std::shared_ptr bstore, aku_Timestamp pivot, bool preserve_horizontal_links) { aku_Status status; LogicAddr last_child; - NBTreeSuperblock new_sblock(id_, prev_, get_fanout(), level_); + IOVecSuperblock new_sblock(id_, prev_, get_fanout(), level_); std::tie(status, last_child) = split_into(bstore, pivot, preserve_horizontal_links, &new_sblock); if (status != AKU_SUCCESS || new_sblock.nelements() == 0) { return std::make_tuple(status, EMPTY_ADDR, EMPTY_ADDR); @@ -2654,6 +2319,7 @@ std::tuple NBTreeSuperblock::split(std::shared } return std::tie(status, newaddr, last_child); } + // //////////////////////// // // NBTreeExtent // // //////////////////////// // @@ -2686,7 +2352,7 @@ struct NBTreeLeafExtent : NBTreeExtent { if (last_ != EMPTY_ADDR) { // Load previous node and calculate fanout. aku_Status status; - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore_, last_); if (status == AKU_EUNAVAILABLE) { // Can't read previous node (retention) @@ -2696,7 +2362,7 @@ struct NBTreeLeafExtent : NBTreeExtent { Logger::msg(AKU_LOG_ERROR, "Can't read block @" + std::to_string(last_) + ", error: " + StatusUtil::str(status)); AKU_PANIC("Invalid argument, " + StatusUtil::str(status)); } else { - auto psubtree = subtree_cast(block->get_cdata()); + auto psubtree = block->get_cheader(); fanout_index_ = psubtree->fanout_index + 1; if (fanout_index_ == AKU_NBTREE_FANOUT) { fanout_index_ = 0; @@ -2740,12 +2406,12 @@ struct NBTreeLeafExtent : NBTreeExtent { aku_Status get_prev_subtreeref(SubtreeRef &payload) { aku_Status status = AKU_SUCCESS; - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore_, last_); if (status != AKU_SUCCESS) { return status; } - NBTreeLeaf leaf(block); + IOVecLeaf leaf(std::move(block)); status = init_subtree_from_leaf(leaf, payload); payload.addr = last_; return status; @@ -2756,7 +2422,6 @@ struct NBTreeLeafExtent : NBTreeExtent { } void reset_leaf() { - //leaf_.reset(new NBTreeLeaf(id_, last_, fanout_index_)); leaf_.reset(new IOVecLeaf(id_, last_, fanout_index_)); } @@ -2888,7 +2553,7 @@ std::tuple NBTreeLeafExtent::append(aku_Timestamp ts, double va // Commit full node std::tie(parent_saved, addr) = commit(false); // Stack overflow here means that there is a logic error in - // the program that results in NBTreeLeaf::append always + // the program that results in IOVecLeaf::append always // returning AKU_EOVERFLOW. append(ts, value); return std::make_tuple(parent_saved, addr); @@ -2991,8 +2656,8 @@ std::tuple NBTreeLeafExtent::split(aku_Timestamp pivot) { if (status != AKU_SUCCESS || addr == EMPTY_ADDR) { return std::make_tuple(false, EMPTY_ADDR); } - auto block = read_block_from_bstore(bstore_, addr); - NBTreeSuperblock sblock(block); + auto block = read_iovec_block_from_bstore(bstore_, addr); + IOVecSuperblock sblock(std::move(block)); // Gather stats and send them to upper-level node SubtreeRef payload = INIT_SUBTREE_REF; status = init_subtree_from_subtree(sblock, payload); @@ -3028,6 +2693,7 @@ std::tuple NBTreeLeafExtent::split(aku_Timestamp pivot) { return std::make_tuple(parent_saved, addr); } + // ////////////////////// // // NBTreeSBlockExtent // // ////////////////////// // @@ -3035,7 +2701,7 @@ std::tuple NBTreeLeafExtent::split(aku_Timestamp pivot) { struct NBTreeSBlockExtent : NBTreeExtent { std::shared_ptr bstore_; std::weak_ptr roots_; - std::unique_ptr curr_; + std::unique_ptr curr_; aku_ParamId id_; LogicAddr last_; u16 fanout_index_; @@ -3060,7 +2726,7 @@ struct NBTreeSBlockExtent : NBTreeExtent { // `addr` is not empty. Node should be restored from // block-store. aku_Status status; - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore_, addr); if (status == AKU_EUNAVAILABLE) { addr = EMPTY_ADDR; @@ -3069,7 +2735,7 @@ struct NBTreeSBlockExtent : NBTreeExtent { Logger::msg(AKU_LOG_ERROR, "Can't read @" + std::to_string(addr) + ", error: " + StatusUtil::str(status)); AKU_PANIC("Invalid argument, " + StatusUtil::str(status)); } else { - auto psubtree = subtree_cast(block->get_cdata()); + auto psubtree = block->get_cheader(); fanout_index_ = psubtree->fanout_index + 1; if (fanout_index_ == AKU_NBTREE_FANOUT) { fanout_index_ = 0; @@ -3080,10 +2746,10 @@ struct NBTreeSBlockExtent : NBTreeExtent { } if (addr != EMPTY_ADDR) { // CoW constructor should be used here. - curr_.reset(new NBTreeSuperblock(addr, bstore_, false)); + curr_.reset(new IOVecSuperblock(addr, bstore_, false)); } else { // `addr` is not set. Node should be created from scratch. - curr_.reset(new NBTreeSuperblock(id, EMPTY_ADDR, 0, level)); + curr_.reset(new IOVecSuperblock(id, EMPTY_ADDR, 0, level)); } } @@ -3116,7 +2782,7 @@ struct NBTreeSBlockExtent : NBTreeExtent { } void reset_subtree() { - curr_.reset(new NBTreeSuperblock(id_, last_, fanout_index_, level_)); + curr_.reset(new IOVecSuperblock(id_, last_, fanout_index_, level_)); } u16 get_fanout_index() const { @@ -3201,22 +2867,22 @@ void NBTreeSBlockExtent::debug_dump(std::ostream& stream, int base_indent, std:: switch(action) { case Action::DUMP_NODE: { aku_Status status; - std::shared_ptr block; - std::tie(status, block) = bstore_->read_block(addr); + std::unique_ptr block; + std::tie(status, block) = bstore_->read_iovec_block(addr); if (status != AKU_SUCCESS) { stream << tag("addr") << addr << "\n"; stream << tag("fail") << StatusUtil::c_str(status) << "" << std::endl; continue; } - auto subtreeref = reinterpret_cast(block->get_cdata()); + auto subtreeref = block->get_cheader(); if (subtreeref->type == NBTreeBlockType::LEAF) { // leaf node - NBTreeLeaf leaf(block); + IOVecLeaf leaf(std::move(block)); SubtreeRef const* ref = leaf.get_leafmeta(); dump_subtree_ref(stream, ref, leaf.get_prev_addr(), indent, leaf.get_addr(), tsformat, mask); } else { // superblock - NBTreeSuperblock sblock(block); + IOVecSuperblock sblock(std::move(block)); SubtreeRef const* ref = sblock.get_sblockmeta(); dump_subtree_ref(stream, ref, sblock.get_prev_addr(), indent, sblock.get_addr(), tsformat, mask); std::vector children; @@ -3344,8 +3010,8 @@ bool NBTreeSBlockExtent::is_dirty() const { std::tuple NBTreeSBlockExtent::split(aku_Timestamp pivot) { const auto empty_res = std::make_tuple(false, EMPTY_ADDR); aku_Status status; - std::unique_ptr clone; - clone.reset(new NBTreeSuperblock(id_, curr_->get_prev_addr(), curr_->get_fanout(), curr_->get_level())); + std::unique_ptr clone; + clone.reset(new IOVecSuperblock(id_, curr_->get_prev_addr(), curr_->get_fanout(), curr_->get_level())); LogicAddr last_child_addr; std::tie(status, last_child_addr) = curr_->split_into(bstore_, pivot, true, clone.get()); // The addr variable should be empty, because we're using the clone @@ -3357,15 +3023,16 @@ std::tuple NBTreeSBlockExtent::split(aku_Timestamp pivot) { } +template static void check_superblock_consistency(std::shared_ptr bstore, - NBTreeSuperblock const* sblock, + SuperblockT const* sblock, u16 required_level, bool check_backrefs) { // For each child. std::vector refs; aku_Status status = sblock->read_all(&refs); if (status != AKU_SUCCESS) { - AKU_PANIC("NBTreeSuperblock.read_all failed, exit code: " + StatusUtil::str(status)); + AKU_PANIC("IOVecSuperblock.read_all failed, exit code: " + StatusUtil::str(status)); } std::vector nodes2follow; // Check nodes. @@ -3391,23 +3058,23 @@ static void check_superblock_consistency(std::shared_ptr bstore, } } // Try to read block and check stats - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore, refs[i].addr); if (status == AKU_EUNAVAILABLE) { // block was deleted due to retention. Logger::msg(AKU_LOG_INFO, "Block " + std::to_string(refs[i].addr)); } else if (status == AKU_SUCCESS) { SubtreeRef out = INIT_SUBTREE_REF; - const SubtreeRef* iref = reinterpret_cast(block->get_cdata()); + const SubtreeRef* iref = block->get_cheader(); if (iref->type == NBTreeBlockType::LEAF) { - NBTreeLeaf leaf(block); + IOVecLeaf leaf(std::move(block)); status = init_subtree_from_leaf(leaf, out); if (status != AKU_SUCCESS) { AKU_PANIC("Can't summarize leaf node at " + std::to_string(refs[i].addr) + " error: " + StatusUtil::str(status)); } } else { - NBTreeSuperblock superblock(block); + IOVecSuperblock superblock(std::move(block)); status = init_subtree_from_subtree(superblock, out); if (status != AKU_SUCCESS) { AKU_PANIC("Can't summarize inner node at " + std::to_string(refs[i].addr) + " error: " @@ -3466,11 +3133,11 @@ static void check_superblock_consistency(std::shared_ptr bstore, // Recur for (auto addr: nodes2follow) { - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore, addr); - const SubtreeRef* iref = reinterpret_cast(block->get_cdata()); + const SubtreeRef* iref = block->get_cheader(); if (iref->type == NBTreeBlockType::INNER) { - NBTreeSuperblock child(addr, bstore); + IOVecSuperblock child(addr, bstore); // We need to check backrefs only on top level that is used for crash recovery. // In all other levels backreferences is not used for anything. check_superblock_consistency(bstore, &child, required_level == 0 ? 0 : required_level - 1, false); @@ -3632,16 +3299,16 @@ void NBTreeExtentsList::check_rescue_points(u32 i) const { assert(false); } - NBTreeSuperblock sblock(id_, EMPTY_ADDR, 0, 0); + IOVecSuperblock sblock(id_, EMPTY_ADDR, 0, 0); std::vector refs; while(addr != EMPTY_ADDR) { - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore_, addr); if (status == AKU_EUNAVAILABLE) { // Block removed due to retention. Can't actually check anything. return; } - const SubtreeRef* ref = subtree_cast(block->get_cdata()); + const SubtreeRef* ref = block->get_cheader(); SubtreeRef tmp = *ref; tmp.addr = addr; refs.push_back(tmp); @@ -3696,14 +3363,14 @@ std::tuple NBTreeExtentsList::_split(aku_Timestamp pivot) bool parent_saved = false; std::tie(parent_saved, paddr) = extents_.at(extent_index)->split(pivot); if (paddr != EMPTY_ADDR) { - std::shared_ptr rblock; + std::unique_ptr rblock; std::tie(status, rblock) = read_and_check(bstore_, paddr); if (status != AKU_SUCCESS) { Logger::msg(AKU_LOG_ERROR, "Can't read @" + std::to_string(paddr) + ", error: " + StatusUtil::str(status)); AKU_PANIC("Can't read back the data"); } // extent_index and the level of the node can mismatch - auto pnode = subtree_cast(rblock->get_cdata()); + auto pnode = rblock->get_cheader(); if (rescue_points_.size() > pnode->level) { rescue_points_.at(pnode->level) = paddr; } else { @@ -3827,7 +3494,7 @@ void NBTreeExtentsList::open() { // Read old leaf node. Add single element to the root. LogicAddr addr = rescue_points_.front(); - std::shared_ptr leaf_block; + std::unique_ptr leaf_block; aku_Status status; std::tie(status, leaf_block) = read_and_check(bstore_, addr); if (status != AKU_SUCCESS) { @@ -3838,7 +3505,7 @@ void NBTreeExtentsList::open() { initialized_ = true; return; } - NBTreeLeaf leaf(leaf_block); // fully loaded leaf + IOVecLeaf leaf(std::move(leaf_block)); // fully loaded leaf SubtreeRef sref = INIT_SUBTREE_REF; status = init_subtree_from_leaf(leaf, sref); if (status != AKU_SUCCESS) { @@ -3965,7 +3632,7 @@ void NBTreeExtentsList::repair() { continue; } aku_Status status; - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore_, curr); if (status != AKU_SUCCESS) { // Stop collecting data and force building of the current extent. @@ -3973,9 +3640,9 @@ void NBTreeExtentsList::repair() { // The node was deleted because of retention process we should continue; // with the next rescue point which may be newer. } - const SubtreeRef* curr_pref = reinterpret_cast(block->get_cdata()); + const SubtreeRef* curr_pref = block->get_cheader(); if (curr_pref->type == NBTreeBlockType::LEAF) { - NBTreeLeaf leaf(block); + IOVecLeaf leaf(std::move(block)); SubtreeRef ref = INIT_SUBTREE_REF; status = init_subtree_from_leaf(leaf, ref); if (status != AKU_SUCCESS) { @@ -3987,7 +3654,7 @@ void NBTreeExtentsList::repair() { refs.push_back(ref); stack.push(leaf.get_prev_addr()); // get_prev_addr() can return EMPTY_ADDR } else { - NBTreeSuperblock sblock(block); + IOVecSuperblock sblock(std::move(block)); SubtreeRef ref = INIT_SUBTREE_REF; status = init_subtree_from_subtree(sblock, ref); if (status != AKU_SUCCESS) { @@ -4203,8 +3870,8 @@ NBTreeExtentsList::RepairStatus NBTreeExtentsList::repair_status(std::vector block) { - auto ref = reinterpret_cast(block->get_cdata()); +static NBTreeBlockType _dbg_get_block_type(const std::unique_ptr& block) { + auto ref = block->get_cheader(); return ref->level == 0 ? NBTreeBlockType::LEAF : NBTreeBlockType::INNER; } @@ -4215,14 +3882,14 @@ void NBTreeExtentsList::debug_print(LogicAddr root, std::shared_ptr return; } aku_Status status; - std::shared_ptr block; + std::unique_ptr block; std::tie(status, block) = read_and_check(bstore, root); if (status != AKU_SUCCESS) { std::cout << pad << "ERROR: Can't read block at " << root << " " << StatusUtil::str(status) << std::endl; } auto type = _dbg_get_block_type(block); if (type == NBTreeBlockType::LEAF) { - NBTreeLeaf leaf(block); + IOVecLeaf leaf(std::move(block)); std::vector ts; std::vector xs; status = leaf.read_all(&ts, &xs); @@ -4232,7 +3899,7 @@ void NBTreeExtentsList::debug_print(LogicAddr root, std::shared_ptr std::cout << pad << "Leaf at " << root << " TS: [" << ts.front() << ", " << ts.back() << "]" << std::endl; std::cout << pad << " " << root << " XS: [" << ts.front() << ", " << ts.back() << "]" << std::endl; } else { - NBTreeSuperblock inner(root, bstore); + IOVecSuperblock inner(root, bstore); std::vector refs; status = inner.read_all(&refs); if (status != AKU_SUCCESS) { diff --git a/libakumuli/storage_engine/nbtree.h b/libakumuli/storage_engine/nbtree.h index 91ce3e27..ac39d224 100644 --- a/libakumuli/storage_engine/nbtree.h +++ b/libakumuli/storage_engine/nbtree.h @@ -94,190 +94,61 @@ struct NBTreeCandlestickHint { aku_Timestamp min_delta; }; +struct SuperblockAppender { + ~SuperblockAppender() = default; + virtual aku_Status append(SubtreeRef const& p) = 0; + virtual bool top(SubtreeRef* outref) const = 0; + virtual bool top(LogicAddr* outaddr) const = 0; +}; + class NBTreeSuperblock; +class IOVecSuperblock; + /** NBTree leaf node. Supports append operation. * Can be commited to block store when full. */ -class NBTreeLeaf { +class IOVecLeaf { //! Root address LogicAddr prev_; //! Buffer for pending updates - std::shared_ptr block_; + std::unique_ptr block_; //! DataBlockWriter for pending `append` operations. - DataBlockWriter writer_; + IOVecBlockWriter writer_; //! Fanout index u16 fanout_index_; public: + //! Empty tag to choose c-tor struct CloneTag {}; - enum class LeafLoadMethod { - FULL_PAGE_LOAD, - ONLY_HEADER, - }; - //! Only for testing and benchmarks size_t _get_uncommitted_size() const; + size_t bytes_used() const; + /** Create empty leaf node. * @param id Series id. * @param link to block store. * @param prev Prev element of the tree. * @param fanout_index Index inside current fanout */ - NBTreeLeaf(aku_ParamId id, LogicAddr prev, u16 fanout_index); + IOVecLeaf(aku_ParamId id, LogicAddr prev, u16 fanout_index); /** Load from block store. * @param block Leaf's serialized data. * @param load Load method. * @note This c-tor panics if block is invalid or doesn't exists. */ - NBTreeLeaf(std::shared_ptr bstore); + IOVecLeaf(std::unique_ptr bstore); /** * @brief Clone leaf node * @param block is a pointer to block that contains leaf's data */ - NBTreeLeaf(std::shared_ptr block, CloneTag tag); - - /** Load from block store. - * @param bstore Block store. - * @param curr Address of the current leaf-node. - * @param load Load method. - */ - NBTreeLeaf(std::shared_ptr bstore, LogicAddr curr); - - //! Get leaf metadata. - SubtreeRef const* get_leafmeta() const; - - //! Returns number of elements. - size_t nelements() const; - - //! Read timestamps - std::tuple get_timestamps() const; - - //! Get logic address of the previous node - LogicAddr get_prev_addr() const; - - //! Set prev addr (works only on mutable node) - void set_prev_addr(LogicAddr addr); - - //! Set fanout index of the node - void set_node_fanout(u16 fanout); - - //! Return address of the node itself (or EMPTY_ADDR if not saved yet) - LogicAddr get_addr() const; - - /** Read all elements from the leaf node. - * @param timestamps Destination for timestamps. - * @param values Destination for values. - * @return status. - */ - aku_Status read_all(std::vector* timestamps, std::vector* values) const; - - //! Append values to NBTree - aku_Status append(aku_Timestamp ts, double value); - - /** Flush all pending changes to block store and close. - * Calling this function too often can result in unoptimal space usage. - */ - std::tuple commit(std::shared_ptr bstore); - - //! Return node's fanout index - u16 get_fanout() const; - - //! Return id of the tree - aku_ParamId get_id() const; - - //! Return iterator that outputs all values in time range that is stored in this leaf. - std::unique_ptr range(aku_Timestamp begin, aku_Timestamp end) const; - - /** - * @brief Return filtering operator - * @param begin is a beginning of the search range (inclusive) - * @param end is an end of the search range (exclusive) - * @param filter is a value filter - * @return pointer to operator (it can be invalid due to I/O error) - */ - std::unique_ptr filter(aku_Timestamp begin, - aku_Timestamp end, - const ValueFilter& filter) const; - - std::unique_ptr aggregate(aku_Timestamp begin, aku_Timestamp end) const; - - //! Return iterator that returns candlesticks - std::unique_ptr candlesticks(aku_Timestamp begin, aku_Timestamp end, NBTreeCandlestickHint hint) const; - - //! Group-aggregate query results iterator - std::unique_ptr group_aggregate(aku_Timestamp begin, aku_Timestamp end, u64 step) const; - - // Node split experiment // - - /** - * @brief Split the node into the specified top node - * @param bstore is a pointer to blockstore - * @param pivot is a pivot point of the split - * @param preserve_backrefs is a flag that controls the backrefs (ignored) - * @param top_level is a top level node (the method will add links to this node - * instead of creating new inner node, the commit method of the `top_level` node wouldn't be called) - * @return status and address of the new topmost node (always EMPTY_ADDR) - */ - std::tuple split_into(std::shared_ptr bstore, - aku_Timestamp pivot, - bool preserve_backrefs, u16 *fanout_index, - NBTreeSuperblock* top_level); - - - /** - * @brief Split the node - * @param bstore is a pointer to blockstore - * @param pivot is a pivot point of the split - * @param preserve_backrefs is a flag that controls the backrefs (ignored) - * @return status and address of the new topmost node - */ - std::tuple split(std::shared_ptr bstore, - aku_Timestamp pivot, - bool preserve_backrefs); -}; - - -/** NBTree leaf node. Supports append operation. - * Can be commited to block store when full. - */ -class IOVecLeaf { - //! Root address - LogicAddr prev_; - //! Buffer for pending updates - std::shared_ptr block_; - //! DataBlockWriter for pending `append` operations. - IOVecBlockWriter writer_; - //! Fanout index - u16 fanout_index_; - -public: - - //! Only for testing and benchmarks - size_t _get_uncommitted_size() const; - - size_t bytes_used() const; - - /** Create empty leaf node. - * @param id Series id. - * @param link to block store. - * @param prev Prev element of the tree. - * @param fanout_index Index inside current fanout - */ - IOVecLeaf(aku_ParamId id, LogicAddr prev, u16 fanout_index); - - /** Load from block store. - * @param block Leaf's serialized data. - * @param load Load method. - * @note This c-tor panics if block is invalid or doesn't exists. - */ - IOVecLeaf(std::shared_ptr bstore); + IOVecLeaf(std::unique_ptr block, CloneTag); /** Load from block store. * @param bstore Block store. @@ -364,7 +235,7 @@ class IOVecLeaf { std::tuple split_into(std::shared_ptr bstore, aku_Timestamp pivot, bool preserve_backrefs, u16 *fanout_index, - NBTreeSuperblock* top_level); + SuperblockAppender* top_level); /** @@ -382,27 +253,27 @@ class IOVecLeaf { /** NBTree superblock. Stores refs to subtrees. */ -class NBTreeSuperblock { - std::shared_ptr block_; - aku_ParamId id_; - u32 write_pos_; - u16 fanout_index_; - u16 level_; - LogicAddr prev_; - bool immutable_; +class IOVecSuperblock : public SuperblockAppender { + std::unique_ptr block_; + aku_ParamId id_; + u32 write_pos_; + u16 fanout_index_; + u16 level_; + LogicAddr prev_; + bool immutable_; public: //! Create new writable node. - NBTreeSuperblock(aku_ParamId id, LogicAddr prev, u16 fanout, u16 lvl); + IOVecSuperblock(aku_ParamId id, LogicAddr prev, u16 fanout, u16 lvl); //! Read immutable node from block-store. - NBTreeSuperblock(std::shared_ptr block); + IOVecSuperblock(std::unique_ptr block); //! Read immutable node from block-store. - NBTreeSuperblock(LogicAddr addr, std::shared_ptr bstore); + IOVecSuperblock(LogicAddr addr, std::shared_ptr bstore); //! Copy on write c-tor. Create new node, copy content referenced by address, remove last entery if needed. - NBTreeSuperblock(LogicAddr addr, std::shared_ptr bstore, bool remove_last); + IOVecSuperblock(LogicAddr addr, std::shared_ptr bstore, bool remove_last); //! Append subtree ref aku_Status append(SubtreeRef const& p); @@ -479,7 +350,7 @@ class NBTreeSuperblock { std::tuple split_into(std::shared_ptr bstore, aku_Timestamp pivot, bool preserve_horizontal_links, - NBTreeSuperblock *root); + SuperblockAppender *root); /** * @brief Split the node @@ -755,7 +626,7 @@ class NBTreeExtentsList : public std::enable_shared_from_this * @param out is an output parameter * @return status */ -aku_Status init_subtree_from_leaf(const NBTreeLeaf& leaf, SubtreeRef& out); +aku_Status init_subtree_from_leaf(const IOVecLeaf& leaf, SubtreeRef& out); /** * @brief Initialize SubtreeRef by reading subtree (addr field is not set) @@ -764,6 +635,7 @@ aku_Status init_subtree_from_leaf(const NBTreeLeaf& leaf, SubtreeRef& out); * @return status */ aku_Status init_subtree_from_subtree(const NBTreeSuperblock& node, SubtreeRef& backref); +aku_Status init_subtree_from_subtree(const IOVecSuperblock& node, SubtreeRef& backref); } } // namespaces diff --git a/libakumuli/storage_engine/volume.cpp b/libakumuli/storage_engine/volume.cpp index 5e4ae1b6..e19f9bfd 100644 --- a/libakumuli/storage_engine/volume.cpp +++ b/libakumuli/storage_engine/volume.cpp @@ -19,7 +19,9 @@ #include #include #include +#include #include +//#include #include @@ -147,6 +149,137 @@ void IOVecBlock::set_write_pos(int pos) { pos_ = pos; } +void IOVecBlock::copy_from(const IOVecBlock& other) { + // Single chunk + if (other.data_[0].size() == AKU_BLOCK_SIZE) { + u32 cons = 0; + for (int i = 0; i < NCOMPONENTS; i++) { + data_[i].resize(COMPONENT_SIZE); + memcpy(data_[i].data(), other.data_[0].data() + cons, COMPONENT_SIZE); + cons += COMPONENT_SIZE; + } + } + // Four components + else { + for (int i = 0; i < NCOMPONENTS; i++) { + if (other.data_[i].size() == 0) { + return; + } + data_[i].resize(COMPONENT_SIZE); + memcpy(data_[i].data(), other.data_[i].data(), COMPONENT_SIZE); + } + } +} + +u32 IOVecBlock::read_chunk(void* dest, u32 offset, u32 size) { + if (data_[0].size() == AKU_BLOCK_SIZE) { + memcpy(dest, data_[0].data() + offset, size); + } + else { + // Locate the component first + u32 ixbegin = offset / IOVecBlock::COMPONENT_SIZE; + u32 offbegin = offset % IOVecBlock::COMPONENT_SIZE; + u32 end = offset + size; + u32 ixend = end / IOVecBlock::COMPONENT_SIZE; + if (ixbegin == ixend) { + // Fast path, access single component + if (data_[ixbegin].size() == 0) { + return 0; + } + u8* source = data_[ixbegin].data(); + memcpy(dest, source + offbegin, size); + } + else { + // Read from two components + if (data_[ixbegin].size() == 0) { + return 0; + } + u8* c1 = data_[ixbegin].data(); + u32 l1 = IOVecBlock::COMPONENT_SIZE - offbegin; + memcpy(dest, c1 + offbegin, l1); + // Write second component + if (data_[ixend].size() == 0) { + return 0; + } + u32 l2 = size - l1; + u8* c2 = data_[ixend].data(); + memcpy(static_cast(dest) + l1, c2, l2); + } + } + return size; +} + +u32 IOVecBlock::append_chunk(const void* source, u32 size) { + if (is_readonly()) { + return 0; + } + if (data_[0].size() == AKU_BLOCK_SIZE) { + // Fast path + if (pos_ + size > AKU_BLOCK_SIZE) { + return 0; + } + memcpy(data_[0].data() + pos_, source, size); + } + else { + int ixbegin = pos_ / IOVecBlock::COMPONENT_SIZE; + int offbegin = pos_ % IOVecBlock::COMPONENT_SIZE; + int end = pos_ + size; + int ixend = end / IOVecBlock::COMPONENT_SIZE; + if (ixbegin >= IOVecBlock::NCOMPONENTS || ixend >= IOVecBlock::NCOMPONENTS) { + return 0; + } + if (ixbegin == ixend) { + // Fast path, write to the single component + if (data_[ixbegin].size() == 0) { + int ixadd = add(); + if (ixadd != ixbegin) { + AKU_PANIC("IOVec block corrupted"); + } + } + u8* dest = data_[ixbegin].data(); + memcpy(dest + offbegin, source, size); + } + else { + // Write to two components + if (data_[ixbegin].size() == 0) { + int ixadd = add(); + if (ixadd != ixbegin) { + AKU_PANIC("First IOVec block corrupted"); + } + } + u8* c1 = data_[ixbegin].data(); + u32 l1 = IOVecBlock::COMPONENT_SIZE - offbegin; + memcpy(c1 + offbegin, source, l1); + // Write second component + if (data_[ixend].size() == 0) { + int ixadd = add(); + if (ixadd != ixend) { + AKU_PANIC("Second IOVec blcok corrupted"); + } + } + u32 l2 = size - l1; + u8* c2 = data_[ixend].data(); + memcpy(c2, static_cast(source) + l1, l2); + } + } + pos_ += size; + return pos_; +} + +void IOVecBlock::set_write_pos_and_shrink(int top) { + set_write_pos(top); + if (is_readonly() || data_[0].size() == AKU_BLOCK_SIZE) { + return; + } + int component = pos_ / IOVecBlock::COMPONENT_SIZE; + for (int ix = IOVecBlock::NCOMPONENTS; ix --> 0;) { + if (ix > component) { + data_[ix].resize(0); + data_[ix].shrink_to_fit(); + } + } +} + //-- const u8* IOVecBlock::get_data(int component) const { @@ -426,10 +559,11 @@ Volume::Volume(const char* path, size_t write_pos) , apr_file_handle_(_open_file(path, apr_pool_.get())) , file_size_(static_cast(_get_file_size(apr_file_handle_.get())/AKU_BLOCK_SIZE)) , write_pos_(static_cast(write_pos)) + //, synced_pos_(static_cast(write_pos)) , path_(path) , mmap_ptr_(nullptr) { -#if UINTPTR_MAX == 0xFFFFFFFFFFFFFFF0 +#if 0//UINTPTR_MAX == 0xFFFFFFFFFFFFFFFF // 64-bit architecture, we can use mmap for speed mmap_.reset(new MemoryMappedFile(path, false)); if (mmap_->is_bad()) { @@ -451,6 +585,7 @@ Volume::Volume(const char* path, size_t write_pos) void Volume::reset() { write_pos_ = 0; + //synced_pos_ = 0; } void Volume::create_new(const char* path, size_t capacity) { @@ -552,8 +687,28 @@ std::tuple Volume::read_block_zero_copy(u32 ix) const { } void Volume::flush() { + apr_status_t status = apr_file_flush(apr_file_handle_.get()); panic_on_error(status, "Volume flush error"); + /* + apr_os_file_t fh; + status = apr_os_file_get(&fh, apr_file_handle_.get()); + panic_on_error(status, "Can't extract file handle"); + int advstatus= posix_fadvise(fh, + synced_pos_ * AKU_BLOCK_SIZE, + (write_pos_ - synced_pos_) * AKU_BLOCK_SIZE, + POSIX_FADV_DONTNEED); + if (advstatus != 0) { + if (advstatus == EINVAL) { + AKU_PANIC("Invalid fadvise parameters"); + } + if (advstatus == EBADF) { + AKU_PANIC("Invalid file descriptor"); + } + AKU_PANIC("Unknown error"); + } + synced_pos_ = write_pos_; + */ } u32 Volume::get_size() const { diff --git a/libakumuli/storage_engine/volume.h b/libakumuli/storage_engine/volume.h index d00e00aa..5f98ec1f 100644 --- a/libakumuli/storage_engine/volume.h +++ b/libakumuli/storage_engine/volume.h @@ -147,9 +147,47 @@ struct IOVecBlock { const u8* get_cdata(int component) const; + template + const Header* get_header() const { + static_assert(sizeof(Header) < 1024, "Header should be less than 1KB"); + const u8* ptr = get_data(0); + return reinterpret_cast(ptr); + } + + template + const Header* get_cheader() const { + return get_header
(); + } + + template + Header* get_header() { + static_assert(sizeof(Header) < 1024, "Header should be less than 1KB"); + u8* ptr = get_data(0); + return reinterpret_cast(ptr); + } + u8* get_data(int component); size_t get_size(int component) const; + + /** Copy content of the 'other' block into current block. + */ + void copy_from(const IOVecBlock& other); + + /** Copy 'size' bytes into 'dest' starting from 'offset'. + * Return number of copied bytes. + */ + u32 read_chunk(void *dest, u32 offset, u32 size); + + /** Copy 'size' bytes into the block starting from 'source'. + * Return number of new write pos or 0 on error. + */ + u32 append_chunk(const void *source, u32 size); + + /** Adjust write pos. + * Try to shrink the block by deallocating unused chunks. + */ + void set_write_pos_and_shrink(int top); }; typedef std::unique_ptr AprPoolPtr; @@ -229,6 +267,7 @@ class Volume { AprFilePtr apr_file_handle_; u32 file_size_; u32 write_pos_; + //u32 synced_pos_; std::string path_; // Optional mmap std::unique_ptr mmap_; diff --git a/perftests/CMakeLists.txt b/perftests/CMakeLists.txt index 9386d5a9..f3018d66 100644 --- a/perftests/CMakeLists.txt +++ b/perftests/CMakeLists.txt @@ -240,3 +240,34 @@ target_link_libraries( ${Boost_LIBRARIES} ) set_target_properties(perf_nbtree PROPERTIES EXCLUDE_FROM_ALL 1) + +# IOVec perftest +add_executable( + perf_iovec + perf_iovec.cpp + perftest_tools.cpp + ../libakumuli/storage_engine/blockstore.cpp + ../libakumuli/storage_engine/volume.cpp + ../libakumuli/storage_engine/nbtree.cpp + ../libakumuli/storage_engine/ref_store.cpp + ../libakumuli/storage_engine/compression.cpp + ../libakumuli/storage_engine/operators/operator.cpp + ../libakumuli/storage_engine/operators/aggregate.cpp + ../libakumuli/storage_engine/operators/scan.cpp + ../libakumuli/storage_engine/operators/join.cpp + ../libakumuli/storage_engine/operators/merge.cpp + ../libakumuli/util.cpp + ../libakumuli/status_util.cpp + ../libakumuli/log_iface.cpp + ../libakumuli/crc32c.cpp +) + +target_link_libraries( + perf_iovec + pthread + "${JEMALLOC_LIBRARY}" + "${APRUTIL_LIBRARY}" + "${APR_LIBRARY}" + ${Boost_LIBRARIES} +) +set_target_properties(perf_iovec PROPERTIES EXCLUDE_FROM_ALL 1) diff --git a/perftests/perf_iovec.cpp b/perftests/perf_iovec.cpp new file mode 100644 index 00000000..47bab420 --- /dev/null +++ b/perftests/perf_iovec.cpp @@ -0,0 +1,331 @@ +// C++ headers +#include +#include + +// Lib headers +#include +#include + +// App headers +#include "storage_engine/blockstore.h" +#include "storage_engine/volume.h" +#include "storage_engine/nbtree.h" +#include "log_iface.h" +#include "util.h" + + +using namespace Akumuli::StorageEngine; + +class Timer +{ +public: + Timer() { gettimeofday(&_start_time, nullptr); } + void restart() { gettimeofday(&_start_time, nullptr); } + double elapsed() const { + timeval curr; + gettimeofday(&curr, nullptr); + return double(curr.tv_sec - _start_time.tv_sec) + + double(curr.tv_usec - _start_time.tv_usec)/1000000.0; + } +private: + timeval _start_time; +}; + + +static void console_logger(aku_LogLevel lvl, const char* msg) { + switch(lvl) { + case AKU_LOG_ERROR: + std::cerr << "ERROR: " << msg << std::endl; + break; + case AKU_LOG_INFO: + std::cout << "INFO: " << msg << std::endl; + break; + case AKU_LOG_TRACE: + //std::cerr << "trace: " << msg << std::endl; + break; + }; +} + +int main() { + apr_initialize(); + + Akumuli::Logger::set_logger(console_logger); + + + const double start = 0.0; + const double inc = 0.1; + const double factor = 1.1; + const int N = 1000000; + + std::cout << "NBTree[w,r]\tIOVec[w,r]" << std::endl; + const auto MDBL = std::numeric_limits::max(); + double t[4] = {MDBL, MDBL, MDBL, MDBL}; + for (int o = 0; o < 10; o++) + { + { + std::unique_ptr leaf(new NBTreeLeaf(42, EMPTY_ADDR, 0)); + int leaf_append_cnt = 0; + LogicAddr last_addr = EMPTY_ADDR; + LogicAddr first_addr = EMPTY_ADDR; + auto bstore = BlockStoreBuilder::create_memstore([&](LogicAddr a) { + if (first_addr == EMPTY_ADDR) { + first_addr = a; + } + leaf_append_cnt++; + last_addr = a; + }); + Timer tm; + double x = start; + for (int i = 0; i < N; i++) { + x += inc; + x *= factor; + auto status = leaf->append(i, x); + if (status == AKU_EOVERFLOW) { + LogicAddr addr; + std::tie(status, addr) = leaf->commit(bstore); + if (status != AKU_SUCCESS) { + std::cout << "Failed to commit leaf" << std::endl; + std::abort(); + } + if (addr != last_addr) { + std::cout << "Unexpected address " << addr << " returned, " << last_addr << " expected" << std::endl; + std::abort(); + } + leaf.reset(new NBTreeLeaf(42, EMPTY_ADDR, 0)); + } + } + t[0] = std::min(tm.elapsed(), t[0]); + + // Read back + std::vector ts; + std::vector xs; + ts.reserve(5000); + xs.reserve(5000); + + tm.restart(); + + for (LogicAddr addr = first_addr; addr < last_addr; addr++) { + NBTreeLeaf rdleaf(bstore, addr); + auto status = rdleaf.read_all(&ts, &xs); + if(status != AKU_SUCCESS) { + std::cout << "Failed to read block " << addr << std::endl; + std::abort(); + } + } + + t[1] = std::min(tm.elapsed(), t[1]); + } + { + std::unique_ptr leaf(new IOVecLeaf(42, EMPTY_ADDR, 0)); + int leaf_append_cnt = 0; + LogicAddr last_addr = EMPTY_ADDR; + LogicAddr first_addr = EMPTY_ADDR; + auto bstore = BlockStoreBuilder::create_memstore([&](LogicAddr a) { + if (first_addr == EMPTY_ADDR) { + first_addr = a; + } + leaf_append_cnt++; + last_addr = a; + }); + Timer tm; + double x = start; + for (int i = 0; i < N; i++) { + x += inc; + x *= factor; + auto status = leaf->append(i, x); + if (status == AKU_EOVERFLOW) { + LogicAddr addr; + std::tie(status, addr) = leaf->commit(bstore); + if (status != AKU_SUCCESS) { + std::cout << "Failed to commit leaf" << std::endl; + std::abort(); + } + if (addr != last_addr) { + std::cout << "Unexpected address " << addr << " returned, " << last_addr << " expected" << std::endl; + std::abort(); + } + leaf.reset(new IOVecLeaf(42, EMPTY_ADDR, 0)); + } + } + t[2] = std::min(tm.elapsed(), t[2]); + + // Read back + std::vector ts; + std::vector xs; + ts.reserve(5000); + xs.reserve(5000); + + tm.restart(); + + for (LogicAddr addr = first_addr; addr < last_addr; addr++) { + NBTreeLeaf rdleaf(bstore, addr); + auto status = rdleaf.read_all(&ts, &xs); + if(status != AKU_SUCCESS) { + std::cout << "Failed to read block " << addr << std::endl; + std::abort(); + } + } + + t[3] = std::min(tm.elapsed(), t[3]); + } + } + std::cout << t[0] << ", " << t[1] << "\t" << t[2] << ", " << t[3] << std::endl; + + // Check superblock + + for(int i = 0; i < 4; i++) { + t[i] = MDBL; + } + std::cout << "SBlock[w,r]\tIOVec[w,r]" << std::endl; + for (int o = 0; o < 10; o++) + { + { + std::unique_ptr inner(new NBTreeSuperblock(42, EMPTY_ADDR, 0, 1)); + int leaf_append_cnt = 0; + LogicAddr last_addr = EMPTY_ADDR; + LogicAddr first_addr = EMPTY_ADDR; + auto bstore = BlockStoreBuilder::create_memstore([&](LogicAddr a) { + if (first_addr == EMPTY_ADDR) { + first_addr = a; + } + leaf_append_cnt++; + last_addr = a; + }); + Timer tm; + const auto btype = NBTreeBlockType::INNER; + SubtreeRef ref = { + 1003, // count + 42, // id + 400, // begin + 500, // end + 114, // addr + start, // min + 341, // min time + 210.4, // max + 311, // max time + 21320.0, // sum + 4.4, // first + 4.1, // last + btype, // block type + 1, // level + 4000, // payload size + 1, // version + 0, // fanout index + 0, // checksum + }; + for (int i = 0; i < N; i++) { + ref.min += inc; + ref.min *= factor; + auto status = inner->append(ref); + if (status == AKU_EOVERFLOW) { + LogicAddr addr; + std::tie(status, addr) = inner->commit(bstore); + if (status != AKU_SUCCESS) { + std::cout << "Failed to commit superblock" << std::endl; + std::abort(); + } + if (addr != last_addr) { + std::cout << "Unexpected superblock address " << addr << " returned, " << last_addr << " expected" << std::endl; + std::abort(); + } + inner.reset(new NBTreeSuperblock(42, EMPTY_ADDR, 0, 1)); + } + } + t[0] = std::min(tm.elapsed(), t[0]); + + // Read back + std::vector ts; + std::vector xs; + ts.reserve(5000); + xs.reserve(5000); + + tm.restart(); + + for (LogicAddr addr = first_addr; addr < last_addr; addr++) { + NBTreeSuperblock rdleaf(addr, bstore); + auto status = rdleaf.read_all(&xs); + if(status != AKU_SUCCESS) { + std::cout << "Failed to read block " << addr << std::endl; + std::abort(); + } + } + + t[1] = std::min(tm.elapsed(), t[1]); + } + { + std::unique_ptr inner(new IOVecSuperblock(42, EMPTY_ADDR, 0, 1)); + int leaf_append_cnt = 0; + LogicAddr last_addr = EMPTY_ADDR; + LogicAddr first_addr = EMPTY_ADDR; + auto bstore = BlockStoreBuilder::create_memstore([&](LogicAddr a) { + if (first_addr == EMPTY_ADDR) { + first_addr = a; + } + leaf_append_cnt++; + last_addr = a; + }); + Timer tm; + const auto btype = NBTreeBlockType::INNER; + SubtreeRef ref = { + 1003, // count + 42, // id + 400, // begin + 500, // end + 114, // addr + start, // min + 341, // min time + 210.4, // max + 311, // max time + 21320.0, // sum + 4.4, // first + 4.1, // last + btype, // block type + 1, // level + 4000, // payload size + 1, // version + 0, // fanout index + 0, // checksum + }; + for (int i = 0; i < N; i++) { + ref.min += inc; + ref.min *= factor; + auto status = inner->append(ref); + if (status == AKU_EOVERFLOW) { + LogicAddr addr; + std::tie(status, addr) = inner->commit(bstore); + if (status != AKU_SUCCESS) { + std::cout << "Failed to commit superblock" << std::endl; + std::abort(); + } + if (addr != last_addr) { + std::cout << "Unexpected superblock address " << addr << " returned, " << last_addr << " expected" << std::endl; + std::abort(); + } + inner.reset(new IOVecSuperblock(42, EMPTY_ADDR, 0, 1)); + } + } + t[2] = std::min(tm.elapsed(), t[2]); + + // Read back + std::vector ts; + std::vector xs; + ts.reserve(5000); + xs.reserve(5000); + + tm.restart(); + + for (LogicAddr addr = first_addr; addr < last_addr; addr++) { + IOVecSuperblock rdleaf(addr, bstore); + auto status = rdleaf.read_all(&xs); + if(status != AKU_SUCCESS) { + std::cout << "Failed to read block " << addr << std::endl; + std::abort(); + } + } + + t[3] = std::min(tm.elapsed(), t[3]); + } + } + std::cout << t[0] << ", " << t[1] << "\t" << t[2] << ", " << t[3] << std::endl; + return 0; +} diff --git a/unittests/test_blockstore.cpp b/unittests/test_blockstore.cpp index 93215240..011b4236 100644 --- a/unittests/test_blockstore.cpp +++ b/unittests/test_blockstore.cpp @@ -115,28 +115,29 @@ BOOST_AUTO_TEST_CASE(Test_blockstore_0) { delete_blockstore(); create_blockstore(); auto bstore = open_blockstore(); - std::shared_ptr block; + std::shared_ptr block; aku_Status status; // Should be unreadable - std::tie(status, block) = bstore->read_block(0); + std::tie(status, block) = bstore->read_iovec_block(0); BOOST_REQUIRE_NE(status, AKU_SUCCESS); // Append first block - auto buffer = std::make_shared(); - buffer->get_data()[0] = 1; + auto buffer = std::make_shared(); + buffer->add(); + buffer->get_data(0)[0] = 1; LogicAddr addr; - std::tie(status, addr) = bstore->append_block(buffer); + std::tie(status, addr) = bstore->append_block(*buffer); BOOST_REQUIRE_EQUAL(status, AKU_SUCCESS); BOOST_REQUIRE_EQUAL(addr, 0); // Should be readable now - std::tie(status, block) = bstore->read_block(0); + std::tie(status, block) = bstore->read_iovec_block(0); BOOST_REQUIRE_EQUAL(status, AKU_SUCCESS); - const u8* block_data = block->get_cdata(); - size_t block_size = block->get_size(); + const u8* block_data = block->get_cdata(0); + size_t block_size = block->get_size(0); BOOST_REQUIRE_EQUAL(block_size, 4096); BOOST_REQUIRE_EQUAL(block_data[0], 1); @@ -155,25 +156,26 @@ BOOST_AUTO_TEST_CASE(Test_blockstore_1) { aku_Status status; for (int i = 0; i < 17; i++) { - auto buffer = std::make_shared(); - buffer->get_data()[0] = static_cast(i); - std::tie(status, addr) = bstore->append_block(buffer); + auto buffer = std::make_shared(); + buffer->add(); + buffer->get_data(0)[0] = static_cast(i); + std::tie(status, addr) = bstore->append_block(*buffer); BOOST_REQUIRE_EQUAL(status, AKU_SUCCESS); } BOOST_REQUIRE_EQUAL(addr, (2ull << 32)); - std::shared_ptr block; + std::shared_ptr block; // Should be unreadable now - std::tie(status, block) = bstore->read_block(0); + std::tie(status, block) = bstore->read_iovec_block(0); BOOST_REQUIRE_EQUAL(status, AKU_EUNAVAILABLE); // Reada this block - std::tie(status, block) = bstore->read_block(2ull << 32); + std::tie(status, block) = bstore->read_iovec_block(2ull << 32); BOOST_REQUIRE_EQUAL(status, AKU_SUCCESS); - const u8* block_data = block->get_cdata(); - size_t block_size = block->get_size(); + const u8* block_data = block->get_cdata(0); + size_t block_size = block->get_size(0); BOOST_REQUIRE_EQUAL(block_size, 4096); BOOST_REQUIRE_EQUAL(block_data[0], 16); @@ -185,27 +187,28 @@ BOOST_AUTO_TEST_CASE(Test_blockstore_3) { delete_expandable_storage(); create_expandable_storage(); auto bstore = open_expandable_storage(); - std::shared_ptr block; + std::shared_ptr block; aku_Status status; // Should be clean - std::tie(status, block) = bstore->read_block(0); + std::tie(status, block) = bstore->read_iovec_block(0); BOOST_REQUIRE_NE(status, AKU_SUCCESS); // Append first block - auto buffer = std::make_shared(); - buffer->get_data()[0] = 1; + auto buffer = std::make_shared(); + buffer->add(); + buffer->get_data(0)[0] = 1; LogicAddr addr; - std::tie(status, addr) = bstore->append_block(buffer); + std::tie(status, addr) = bstore->append_block(*buffer); BOOST_REQUIRE_EQUAL(status, AKU_SUCCESS); BOOST_REQUIRE_EQUAL(addr, 0); // Should be readable now - std::tie(status, block) = bstore->read_block(0); + std::tie(status, block) = bstore->read_iovec_block(0); BOOST_REQUIRE_EQUAL(status, AKU_SUCCESS); - const u8* block_data = block->get_cdata(); - size_t block_size = block->get_size(); + const u8* block_data = block->get_cdata(0); + size_t block_size = block->get_size(0); BOOST_REQUIRE_EQUAL(block_size, 4096); BOOST_REQUIRE_EQUAL(block_data[0], 1); @@ -220,25 +223,27 @@ BOOST_AUTO_TEST_CASE(Test_blockstore_4) { create_expandable_storage(); std::shared_ptr mock; auto bstore = open_expandable_storage(&mock); - std::shared_ptr block; + std::shared_ptr block; aku_Status status; bool exist = boost::filesystem::exists(expected_path); BOOST_REQUIRE(!exist); for (u32 i = 0; i < CAPACITIES.at(0); i++) { - auto buffer = std::make_shared(); - buffer->get_data()[0] = 1; + auto buffer = std::make_shared(); + buffer->add(); + buffer->get_data(0)[0] = 1; LogicAddr addr; - std::tie(status, addr) = bstore->append_block(buffer); + std::tie(status, addr) = bstore->append_block(*buffer); BOOST_REQUIRE_EQUAL(status, AKU_SUCCESS); BOOST_REQUIRE_EQUAL(addr, i); exist = boost::filesystem::exists(expected_path); BOOST_REQUIRE(!exist); } - auto buffer = std::make_shared(); - buffer->get_data()[0] = 1; + auto buffer = std::make_shared(); + buffer->add(); + buffer->get_data(0)[0] = 1; LogicAddr addr; - std::tie(status, addr) = bstore->append_block(buffer); + std::tie(status, addr) = bstore->append_block(*buffer); BOOST_REQUIRE_EQUAL(status, AKU_SUCCESS); std::string new_vol_path = mock->volumes.at(1).path; @@ -249,11 +254,11 @@ BOOST_AUTO_TEST_CASE(Test_blockstore_4) { BOOST_REQUIRE_EQUAL(new_vol_path, std::string(expected_path)); // Should be readable now - std::tie(status, block) = bstore->read_block(addr); + std::tie(status, block) = bstore->read_iovec_block(addr); BOOST_REQUIRE_EQUAL(status, AKU_SUCCESS); - const u8* block_data = block->get_cdata(); - size_t block_size = block->get_size(); + const u8* block_data = block->get_cdata(0); + size_t block_size = block->get_size(0); BOOST_REQUIRE_EQUAL(block_size, 4096); BOOST_REQUIRE_EQUAL(block_data[0], 1); diff --git a/unittests/test_nbtree.cpp b/unittests/test_nbtree.cpp index 29f6598c..8a7afc25 100644 --- a/unittests/test_nbtree.cpp +++ b/unittests/test_nbtree.cpp @@ -571,7 +571,7 @@ BOOST_AUTO_TEST_CASE(Test_nbtree_recovery_7) { // Test iteration void test_nbtree_leaf_iteration(aku_Timestamp begin, aku_Timestamp end) { - NBTreeLeaf leaf(42, EMPTY_ADDR, 0); + IOVecLeaf leaf(42, EMPTY_ADDR, 0); aku_Timestamp last_successfull = 100; aku_Timestamp first_timestamp = 100; for (size_t ix = first_timestamp; true; ix++) { @@ -688,7 +688,7 @@ AggregationResult calculate_expected_value(std::vector const& xss) { } void test_nbtree_leaf_aggregation(aku_Timestamp begin, aku_Timestamp end) { - NBTreeLeaf leaf(42, EMPTY_ADDR, 0); + IOVecLeaf leaf(42, EMPTY_ADDR, 0); aku_Timestamp first_timestamp = 100; std::vector xss; RandomWalk rwalk(0.0, 1.0, 1.0); @@ -1448,7 +1448,7 @@ BOOST_AUTO_TEST_CASE(Test_group_aggregate_backward) { } template -static void fill_leaf(NBTreeLeaf* leaf, Cont tss) { +static void fill_leaf(IOVecLeaf* leaf, Cont tss) { for (auto ts: tss) { auto status = leaf->append(ts, ts*0.1); if (status != AKU_SUCCESS) { @@ -1457,7 +1457,7 @@ static void fill_leaf(NBTreeLeaf* leaf, Cont tss) { } } -static LogicAddr save_leaf(NBTreeLeaf* leaf, NBTreeSuperblock* parent, std::shared_ptr bstore) { +static LogicAddr save_leaf(IOVecLeaf* leaf, IOVecSuperblock* parent, std::shared_ptr bstore) { aku_Status status; LogicAddr result; std::tie(status, result) = leaf->commit(bstore); @@ -1480,10 +1480,10 @@ static LogicAddr save_leaf(NBTreeLeaf* leaf, NBTreeSuperblock* parent, std::shar /** * @brief Read block from block store, expect success */ -static std::shared_ptr read_block(std::shared_ptr bstore, LogicAddr addr) { +static std::unique_ptr read_block(std::shared_ptr bstore, LogicAddr addr) { aku_Status status; - std::shared_ptr block; - std::tie(status, block) = bstore->read_block(addr); + std::unique_ptr block; + std::tie(status, block) = bstore->read_iovec_block(addr); if (status != AKU_SUCCESS) { throw std::runtime_error("can't read block"); } @@ -1515,17 +1515,17 @@ void test_node_split_algorithm_lvl2(aku_Timestamp pivot, std::map bstore = BlockStoreBuilder::create_memstore(); const aku_ParamId id = 42; LogicAddr prev = EMPTY_ADDR; - NBTreeSuperblock sblock(id, EMPTY_ADDR, 0, 1); + IOVecSuperblock sblock(id, EMPTY_ADDR, 0, 1); // 0 - NBTreeLeaf l0(id, prev, 0); + IOVecLeaf l0(id, prev, 0); fill_leaf(&l0, tss[0]); prev = save_leaf(&l0, &sblock, bstore); // 1 - NBTreeLeaf l1(id, prev, 1); + IOVecLeaf l1(id, prev, 1); fill_leaf(&l1, tss[1]); prev = save_leaf(&l1, &sblock, bstore); // 2 - NBTreeLeaf l2(id, prev, 2); + IOVecLeaf l2(id, prev, 2); fill_leaf(&l2, tss[2]); prev = save_leaf(&l2, &sblock, bstore); @@ -1541,7 +1541,7 @@ void test_node_split_algorithm_lvl2(aku_Timestamp pivot, std::map it = new_sblock.search(0, 100, bstore); auto actual = extract_timestamps(*it); @@ -1622,7 +1622,7 @@ BOOST_AUTO_TEST_CASE(Test_node_split_algorithm_3) { test_node_split_algorithm_lvl2(30, tss, 3); } -static LogicAddr append_inner_node(NBTreeSuperblock& root, NBTreeSuperblock& child, std::shared_ptr bstore) { +static LogicAddr append_inner_node(IOVecSuperblock& root, IOVecSuperblock& child, std::shared_ptr bstore) { aku_Status status; LogicAddr child_addr; std::tie(status, child_addr) = child.commit(bstore); @@ -1636,14 +1636,14 @@ static LogicAddr append_inner_node(NBTreeSuperblock& root, NBTreeSuperblock& chi return child_addr; } -static void check_backrefs(NBTreeSuperblock& root, std::shared_ptr bstore) { +static void check_backrefs(IOVecSuperblock& root, std::shared_ptr bstore) { // Get the children and check connections std::vector refs; auto status = root.read_all(&refs); BOOST_REQUIRE(status == AKU_SUCCESS); LogicAddr prev_node_addr = EMPTY_ADDR; for (auto ref: refs) { - NBTreeSuperblock curr(read_block(bstore, ref.addr)); + IOVecSuperblock curr(read_block(bstore, ref.addr)); BOOST_REQUIRE_EQUAL(curr.get_prev_addr(), prev_node_addr); prev_node_addr = ref.addr; } @@ -1674,25 +1674,25 @@ void test_node_split_algorithm_lvl3(aku_Timestamp pivot, std::map it = new_sblock.search(0, 100, bstore); @@ -1946,7 +1946,7 @@ static std::tuple count_nbtree_nodes(std::shared_ptr bstor number_of_leaf_nodes++; } else { number_of_inner_nodes++; - NBTreeSuperblock sblock(read_block(bstore, addr)); + IOVecSuperblock sblock(read_block(bstore, addr)); std::vector refs; auto status = sblock.read_all(&refs); BOOST_REQUIRE_EQUAL(status, AKU_SUCCESS); @@ -1976,10 +1976,10 @@ void test_node_split_algorithm_lvl2_split_twice(aku_Timestamp pivot1, std::shared_ptr bstore = BlockStoreBuilder::create_memstore(); const aku_ParamId id = 42; LogicAddr prev = EMPTY_ADDR; - NBTreeSuperblock sblock(id, EMPTY_ADDR, 0, 1); + IOVecSuperblock sblock(id, EMPTY_ADDR, 0, 1); for(auto kv: tss) { - NBTreeLeaf leaf(id, prev, static_cast(kv.first)); + IOVecLeaf leaf(id, prev, static_cast(kv.first)); fill_leaf(&leaf, kv.second); prev = save_leaf(&leaf, &sblock, bstore); } @@ -1998,7 +1998,7 @@ void test_node_split_algorithm_lvl2_split_twice(aku_Timestamp pivot1, BOOST_REQUIRE_EQUAL(status, AKU_SUCCESS); BOOST_REQUIRE_NE(new_root1 - root, 0); - NBTreeSuperblock new_sblock1(read_block(bstore, new_root1)); + IOVecSuperblock new_sblock1(read_block(bstore, new_root1)); std::unique_ptr it = new_sblock1.search(0, 100, bstore); auto actual = extract_timestamps(*it); @@ -2016,7 +2016,7 @@ void test_node_split_algorithm_lvl2_split_twice(aku_Timestamp pivot1, BOOST_REQUIRE_EQUAL(status, AKU_SUCCESS); BOOST_REQUIRE_NE(new_root2 - new_root1, 0); - NBTreeSuperblock new_sblock2(read_block(bstore, new_root2)); + IOVecSuperblock new_sblock2(read_block(bstore, new_root2)); it = new_sblock2.search(0, 100, bstore); actual = extract_timestamps(*it); @@ -2222,7 +2222,7 @@ BOOST_AUTO_TEST_CASE(Test_value_filter_7) { } void test_nbtreeleaf_filter_operator(aku_Timestamp begin, aku_Timestamp end) { - NBTreeLeaf leaf(42, EMPTY_ADDR, 0); + IOVecLeaf leaf(42, EMPTY_ADDR, 0); aku_Timestamp first_timestamp = 100; std::vector xss; std::vector tss;