Skip to content

Commit

Permalink
Merge pull request #308 from akumuli/sblock-memory-saving
Browse files Browse the repository at this point in the history
Sblock memory saving
  • Loading branch information
Lazin authored Jun 26, 2019
2 parents 2c4a641 + 16b1175 commit cc375b5
Show file tree
Hide file tree
Showing 15 changed files with 938 additions and 1,048 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Release notes
=============

Version 0.8.66
--------------

IMPROVEMENT

* Improve memory requirements by using vector I/O

Version 0.8.65
--------------

Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 2.8)

set(APP_VERSION_MAJOR "0")
set(APP_VERSION_MINOR "8")
set(APP_VERSION_PATCH "65")
set(APP_VERSION_PATCH "66")

set(APP_VERSION "${APP_VERSION_MAJOR}.${APP_VERSION_MINOR}.${APP_VERSION_PATCH}")
add_definitions(-DAKU_VERSION="${APP_VERSION}")
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Features
|Incremental backup |- |+ |
|Clustering |- |+ |
|Replication |- |+ |
|ARM support |- |+ |
|ARM support |+ |+ |
|Windows support |- |+ |

|Query language features |Current version|Future versions|
Expand All @@ -51,7 +51,7 @@ Features
|Aggregate series |+ |+ |
|Merge & aggregate |+ |+ |
|Group-aggregate |+ |+ |
|Group-aggregate & merge |- |+ |
|Group-aggregate & merge |+ |+ |
|Join |+ |+ |
|Join & merge |- |+ |
|Join & group-aggregate |- |+ |
Expand All @@ -75,6 +75,7 @@ are available via packagecloud:

* Ubuntu 14.04
* Ubuntu 16.04
* Ubuntu 18.04
* Debian Jessie
* Debian Stretch
* CentOS 7
Expand All @@ -87,5 +88,6 @@ Tools for monitoring
Akumuli supports OpenTSDB telnet-style API for writing. This means that many collectors works with it
without any trouble, for instance `netdata`, `collectd`, and `tcollector`. Grafana
[datasource](https://github.com/akumuli/akumuli-datasource) plugin is availabe as well.
Akumuli can be used as a long-term storage for Prometheus using [akumuli-prometheus-adapter](https://github.com/akumuli/akumuli-prometheus-adapter).

[Google group](https://groups.google.com/forum/#!forum/akumuli)
18 changes: 9 additions & 9 deletions libakumuli/storage2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -900,18 +900,18 @@ void dump_tree(std::ostream &stream,
};

if (type == StackItemType::NORMAL || type == StackItemType::RECOVERY) {
std::shared_ptr<Block> block;
std::unique_ptr<IOVecBlock> block;
aku_Status status;
std::tie(status, block) = bstore->read_block(curr);
std::tie(status, block) = bstore->read_iovec_block(curr);
if (status != AKU_SUCCESS) {
stream << _tag("addr") << afmt(curr) << "</addr>" << std::endl;
stream << _tag("fail") << StatusUtil::c_str(status) << "</fail>" << std::endl;
continue;
}
auto subtreeref = reinterpret_cast<const SubtreeRef*>(block->get_cdata());
auto subtreeref = block->get_cheader<SubtreeRef>();
if (subtreeref->type == NBTreeBlockType::LEAF) {
// Dump leaf node's content
NBTreeLeaf leaf(block);
IOVecLeaf leaf(std::move(block));
SubtreeRef const* ref = leaf.get_leafmeta();
stream << _tag("type") << "Leaf" << "</type>\n";
stream << _tag("addr") << afmt(curr) << "</addr>\n";
Expand Down Expand Up @@ -940,19 +940,19 @@ void dump_tree(std::ostream &stream,
stack.push(std::make_tuple(EMPTY_ADDR, indent + 1, StackItemType::CLOSE_NODE));
stack.push(std::make_tuple(prev, indent + 2, StackItemType::NORMAL));
stack.push(std::make_tuple(EMPTY_ADDR, indent + 1, StackItemType::OPEN_NODE));
std::tie(status, block) = bstore->read_block(prev);
std::tie(status, block) = bstore->read_iovec_block(prev);
if (status != AKU_SUCCESS) {
// Block was deleted but it should be on the stack anyway
break;
}
NBTreeLeaf lnext(block);
IOVecLeaf lnext(std::move(block));
prev = lnext.get_prev_addr();
}
stack.push(std::make_tuple(EMPTY_ADDR, indent, StackItemType::OPEN_FANOUT));
}
} else {
// Dump inner node's content and children
NBTreeSuperblock sblock(block);
IOVecSuperblock sblock(std::move(block));
SubtreeRef const* ref = sblock.get_sblockmeta();
stream << _tag("addr") << afmt(curr) << "</addr>\n";
stream << _tag("type") << "Superblock" << "</type>\n";
Expand Down Expand Up @@ -981,12 +981,12 @@ void dump_tree(std::ostream &stream,
stack.push(std::make_tuple(EMPTY_ADDR, indent + 1, StackItemType::CLOSE_NODE));
stack.push(std::make_tuple(prev, indent + 2, StackItemType::NORMAL));
stack.push(std::make_tuple(EMPTY_ADDR, indent + 1, StackItemType::OPEN_NODE));
std::tie(status, block) = bstore->read_block(prev);
std::tie(status, block) = bstore->read_iovec_block(prev);
if (status != AKU_SUCCESS) {
// Block was deleted but it should be on the stack anyway
break;
}
NBTreeSuperblock sbnext(block);
IOVecSuperblock sbnext(std::move(block));
prev = sbnext.get_prev_addr();
}
stack.push(std::make_tuple(EMPTY_ADDR, indent, StackItemType::OPEN_FANOUT));
Expand Down
206 changes: 12 additions & 194 deletions libakumuli/storage_engine/blockstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,55 +100,6 @@ BlockCache::PBlock BlockCache::loockup(LogicAddr addr) {
}


Block::Block(LogicAddr addr, std::vector<u8>&& data)
: data_(std::move(data))
, addr_(addr)
, zptr_(nullptr)
{
}

Block::Block(LogicAddr addr, const u8* ptr)
: addr_(addr)
, zptr_(ptr)
{
}

Block::Block()
: data_(static_cast<size_t>(AKU_BLOCK_SIZE), 0)
, addr_(EMPTY_ADDR)
, zptr_(nullptr)
{
}

const u8* Block::get_data() const {
return zptr_ ? zptr_ : data_.data();
}

const u8* Block::get_cdata() const {
return zptr_ ? zptr_ : data_.data();
}

bool Block::is_readonly() const {
return zptr_ != nullptr || addr_ != EMPTY_ADDR;
}

u8* Block::get_data() {
assert(is_readonly() == false);
return data_.data();
}

size_t Block::get_size() const {
return zptr_ ? AKU_BLOCK_SIZE : data_.size();
}

LogicAddr Block::get_addr() const {
return addr_;
}

void Block::set_addr(LogicAddr addr) {
addr_ = addr;
}



FileStorage::FileStorage(std::shared_ptr<VolumeRegistry> meta)
Expand Down Expand Up @@ -264,42 +215,20 @@ static LogicAddr make_logic(u32 gen, BlockAddr addr) {
return static_cast<u64>(gen) << 32 | addr;
}

std::tuple<aku_Status, LogicAddr> FileStorage::append_block(std::shared_ptr<Block> data) {
std::lock_guard<std::mutex> guard(lock_); AKU_UNUSED(guard);
BlockAddr block_addr;
aku_Status status;
std::tie(status, block_addr) = volumes_[current_volume_]->append_block(data->get_data());
if (status == AKU_EOVERFLOW) {
// transition to new/next volume
handle_volume_transition();
std::tie(status, block_addr) = volumes_.at(current_volume_)->append_block(data->get_data());
if (status != AKU_SUCCESS) {
return std::make_tuple(status, 0ull);
}
}
data->set_addr(block_addr);
status = meta_->set_nblocks(current_volume_, block_addr + 1);
if (status != AKU_SUCCESS) {
AKU_PANIC("Invalid BlockStore state, " + StatusUtil::str(status));
}
dirty_[current_volume_]++;
return std::make_tuple(status, make_logic(current_gen_, block_addr));
}

std::tuple<aku_Status, LogicAddr> FileStorage::append_block(std::shared_ptr<IOVecBlock> data) {
std::tuple<aku_Status, LogicAddr> FileStorage::append_block(IOVecBlock& data) {
std::lock_guard<std::mutex> guard(lock_); AKU_UNUSED(guard);
BlockAddr block_addr;
aku_Status status;
std::tie(status, block_addr) = volumes_[current_volume_]->append_block(data.get());
std::tie(status, block_addr) = volumes_[current_volume_]->append_block(&data);
if (status == AKU_EOVERFLOW) {
// transition to new/next volume
handle_volume_transition();
std::tie(status, block_addr) = volumes_.at(current_volume_)->append_block(data.get());
std::tie(status, block_addr) = volumes_.at(current_volume_)->append_block(&data);
if (status != AKU_SUCCESS) {
return std::make_tuple(status, 0ull);
}
}
data->set_addr(block_addr);
data.set_addr(block_addr);
status = meta_->set_nblocks(current_volume_, block_addr + 1);
if (status != AKU_SUCCESS) {
AKU_PANIC("Invalid BlockStore state, " + StatusUtil::str(status));
Expand Down Expand Up @@ -427,45 +356,7 @@ bool FixedSizeFileStorage::exists(LogicAddr addr) const {
return actual_gen == gen && vol < nblocks;
}

std::tuple<aku_Status, std::shared_ptr<Block>> FixedSizeFileStorage::read_block(LogicAddr addr) {
std::lock_guard<std::mutex> guard(lock_); AKU_UNUSED(guard);
aku_Status status;
auto gen = extract_gen(addr);
auto vol = extract_vol(addr);
auto volix = gen % static_cast<u32>(volumes_.size());
u32 actual_gen;
u32 nblocks;
std::tie(status, actual_gen) = meta_->get_generation(volix);
if (status != AKU_SUCCESS) {
return std::make_tuple(AKU_EBAD_ARG, std::unique_ptr<Block>());
}
std::tie(status, nblocks) = meta_->get_nblocks(volix);
if (status != AKU_SUCCESS) {
return std::make_tuple(AKU_EBAD_ARG, std::unique_ptr<Block>());
}
if (actual_gen != gen || vol >= nblocks) {
return std::make_tuple(AKU_EUNAVAILABLE, std::unique_ptr<Block>());
}
// Try to use zero-copy if possible
const u8* mptr;
std::tie(status, mptr) = volumes_[volix]->read_block_zero_copy(vol);
if (status == AKU_SUCCESS) {
std::shared_ptr<Block> zblock = std::make_shared<Block>(addr, mptr);
return std::make_tuple(status, std::move(zblock));
} else if (status == AKU_EUNAVAILABLE) {
// Fallback to copying if not possible
std::vector<u8> dest(AKU_BLOCK_SIZE, 0);
status = volumes_[volix]->read_block(vol, dest.data());
if (status != AKU_SUCCESS) {
return std::make_tuple(status, std::unique_ptr<Block>());
}
auto block = std::make_shared<Block>(addr, std::move(dest));
return std::make_tuple(status, std::move(block));
}
return std::make_tuple(status, std::unique_ptr<Block>());
}

std::tuple<aku_Status, std::shared_ptr<IOVecBlock>> FixedSizeFileStorage::read_iovec_block(LogicAddr addr) {
std::tuple<aku_Status, std::unique_ptr<IOVecBlock>> FixedSizeFileStorage::read_iovec_block(LogicAddr addr) {
std::lock_guard<std::mutex> guard(lock_); AKU_UNUSED(guard);
aku_Status status;
auto gen = extract_gen(addr);
Expand Down Expand Up @@ -529,44 +420,7 @@ bool ExpandableFileStorage::exists(LogicAddr addr) const {
return actual_gen == gen && vol < nblocks;
}

std::tuple<aku_Status, std::shared_ptr<Block>> ExpandableFileStorage::read_block(LogicAddr addr) {
std::lock_guard<std::mutex> guard(lock_); AKU_UNUSED(guard);
aku_Status status;
auto gen = extract_gen(addr);
auto vol = extract_vol(addr);
u32 actual_gen;
u32 nblocks;
std::tie(status, actual_gen) = meta_->get_generation(gen);
if (status != AKU_SUCCESS) {
return std::make_tuple(AKU_EBAD_ARG, std::unique_ptr<Block>());
}
std::tie(status, nblocks) = meta_->get_nblocks(gen);
if (status != AKU_SUCCESS) {
return std::make_tuple(AKU_EBAD_ARG, std::unique_ptr<Block>());
}
if (actual_gen != gen || vol >= nblocks) {
return std::make_tuple(AKU_EUNAVAILABLE, std::unique_ptr<Block>());
}
// Try to use zero-copy if possible
const u8* mptr;
std::tie(status, mptr) = volumes_[gen]->read_block_zero_copy(vol);
if (status == AKU_SUCCESS) {
std::shared_ptr<Block> zblock = std::make_shared<Block>(addr, mptr);
return std::make_tuple(status, std::move(zblock));
} else if (status == AKU_EUNAVAILABLE) {
// Fallback to copying if not possible
std::vector<u8> dest(AKU_BLOCK_SIZE, 0);
status = volumes_[gen]->read_block(vol, dest.data());
if (status != AKU_SUCCESS) {
return std::make_tuple(status, std::unique_ptr<Block>());
}
auto block = std::make_shared<Block>(addr, std::move(dest));
return std::make_tuple(status, std::move(block));
}
return std::make_tuple(status, std::unique_ptr<Block>());
}

std::tuple<aku_Status, std::shared_ptr<IOVecBlock>> ExpandableFileStorage::read_iovec_block(LogicAddr addr) {
std::tuple<aku_Status, std::unique_ptr<IOVecBlock>> ExpandableFileStorage::read_iovec_block(LogicAddr addr) {
std::lock_guard<std::mutex> guard(lock_); AKU_UNUSED(guard);
aku_Status status;
auto gen = extract_gen(addr);
Expand All @@ -586,7 +440,7 @@ std::tuple<aku_Status, std::shared_ptr<IOVecBlock>> ExpandableFileStorage::read_
}
// Read the volume
std::unique_ptr<IOVecBlock> block;
std::tie(status, block) = volumes_[vol]->read_block(vol);
std::tie(status, block) = volumes_[gen]->read_block(vol);
if (status == AKU_SUCCESS) {
return std::make_tuple(status, std::move(block));
}
Expand Down Expand Up @@ -676,30 +530,7 @@ u32 MemStore::checksum(const IOVecBlock& block, size_t offset , size_t size) con
return crc32;
}

std::tuple<aku_Status, std::shared_ptr<Block>> MemStore::read_block(LogicAddr addr) {
addr -= MEMSTORE_BASE;
std::lock_guard<std::mutex> guard(lock_); AKU_UNUSED(guard);
std::shared_ptr<Block> block;
u32 offset = static_cast<u32>(AKU_BLOCK_SIZE * addr);
if (addr < removed_pos_) {
return std::make_tuple(AKU_EUNAVAILABLE, block);
}
if (buffer_.size() < (offset + AKU_BLOCK_SIZE)) {
return std::make_tuple(AKU_EBAD_ARG, block);
}
std::vector<u8> data;
data.reserve(AKU_BLOCK_SIZE);
auto begin = buffer_.begin() + offset;
auto end = begin + AKU_BLOCK_SIZE;
std::copy(begin, end, std::back_inserter(data));
block.reset(new Block(addr + MEMSTORE_BASE, std::move(data)));
if (read_callback_) {
read_callback_(addr);
}
return std::make_tuple(AKU_SUCCESS, block);
}

std::tuple<aku_Status, std::shared_ptr<IOVecBlock>> MemStore::read_iovec_block(LogicAddr addr) {
std::tuple<aku_Status, std::unique_ptr<IOVecBlock>> MemStore::read_iovec_block(LogicAddr addr) {
addr -= MEMSTORE_BASE;
std::lock_guard<std::mutex> guard(lock_); AKU_UNUSED(guard);
u32 offset = static_cast<u32>(AKU_BLOCK_SIZE * addr);
Expand All @@ -722,24 +553,11 @@ std::tuple<aku_Status, std::shared_ptr<IOVecBlock>> MemStore::read_iovec_block(L
return std::make_tuple(AKU_SUCCESS, std::move(block));
}

std::tuple<aku_Status, LogicAddr> MemStore::append_block(std::shared_ptr<Block> data) {
std::lock_guard<std::mutex> guard(lock_); AKU_UNUSED(guard);
assert(data->get_size() == AKU_BLOCK_SIZE);
std::copy(data->get_data(), data->get_data() + AKU_BLOCK_SIZE, std::back_inserter(buffer_));
if (append_callback_) {
append_callback_(write_pos_ + MEMSTORE_BASE);
}
auto addr = write_pos_++;
addr += MEMSTORE_BASE;
data->set_addr(addr);
return std::make_tuple(AKU_SUCCESS, addr);
}

std::tuple<aku_Status, LogicAddr> MemStore::append_block(std::shared_ptr<IOVecBlock> data) {
std::tuple<aku_Status, LogicAddr> MemStore::append_block(IOVecBlock &data) {
std::lock_guard<std::mutex> guard(lock_); AKU_UNUSED(guard);
for (int i = 0; i < IOVecBlock::NCOMPONENTS; i++) {
if (data->get_size(i) != 0) {
const u8* p = data->get_cdata(i);
if (data.get_size(i) != 0) {
const u8* p = data.get_cdata(i);
std::copy(p, p + IOVecBlock::COMPONENT_SIZE, std::back_inserter(buffer_));
} else {
std::fill_n(std::back_inserter(buffer_), IOVecBlock::COMPONENT_SIZE, 0);
Expand All @@ -750,7 +568,7 @@ std::tuple<aku_Status, LogicAddr> MemStore::append_block(std::shared_ptr<IOVecBl
}
auto addr = write_pos_++;
addr += MEMSTORE_BASE;
data->set_addr(addr);
data.set_addr(addr);
return std::make_tuple(AKU_SUCCESS, addr);
}

Expand Down
Loading

0 comments on commit cc375b5

Please sign in to comment.