From de060edab4aec29410b9e40238fcce516645855d Mon Sep 17 00:00:00 2001 From: Roy Guo Date: Mon, 18 Apr 2022 10:57:47 +0800 Subject: [PATCH] [WIP] LinkCompaction: Add LinkSstIterator --- db/compaction_job.cc | 31 +-- db/compaction_job.h | 32 +++ db/table_cache.cc | 52 ++-- db/version_edit.h | 3 + include/rocksdb/options.h | 1 + table/two_level_iterator.cc | 493 +++++++++++++++++++++++++++++++++++- table/two_level_iterator.h | 10 +- util/heap.h | 2 +- util/iterator_cache.h | 4 + 9 files changed, 575 insertions(+), 53 deletions(-) diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 1a7b38bf79..9a95112df6 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1585,34 +1585,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { snapshot_checker_, compact_->compaction->level(), db_options_.statistics.get(), shutting_down_); - struct BuilderSeparateHelper : public SeparateHelper { - SeparateHelper* separate_helper = nullptr; - std::unique_ptr value_meta_extractor; - Status (*trans_to_separate_callback)(void* args, const Slice& key, - LazyBuffer& value) = nullptr; - void* trans_to_separate_callback_args = nullptr; - - Status TransToSeparate(const Slice& internal_key, LazyBuffer& value, - const Slice& meta, bool is_merge, - bool is_index) override { - return SeparateHelper::TransToSeparate( - internal_key, value, value.file_number(), meta, is_merge, is_index, - value_meta_extractor.get()); - } - - Status TransToSeparate(const Slice& key, LazyBuffer& value) override { - if (trans_to_separate_callback == nullptr) { - return Status::NotSupported(); - } - return trans_to_separate_callback(trans_to_separate_callback_args, key, - value); - } + BuilderSeparateHelper separate_helper; - LazyBuffer TransToCombined(const Slice& user_key, uint64_t sequence, - const LazyBuffer& value) const override { - return separate_helper->TransToCombined(user_key, sequence, value); - } - } separate_helper; if (compact_->compaction->immutable_cf_options() ->value_meta_extractor_factory != nullptr) { ValueExtractorContext context = {cfd->GetID()}; @@ -2048,7 +2022,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // TODO(guokuankuan@bytedance.com) void CompactionJob::ProcessLinkCompaction(SubcompactionState* sub_compact) { - return; + assert(sub_compact != nullptr); + return ProcessKeyValueCompaction(sub_compact); } void CompactionJob::ProcessGarbageCollection(SubcompactionState* sub_compact) { diff --git a/db/compaction_job.h b/db/compaction_job.h index dfe7dddaab..2a640d7876 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -64,6 +64,38 @@ class VersionEdit; class VersionSet; class CompactionJob { + public: + + class BuilderSeparateHelper : public SeparateHelper { + public: + SeparateHelper* separate_helper = nullptr; + std::unique_ptr value_meta_extractor; + Status (*trans_to_separate_callback)(void* args, const Slice& key, + LazyBuffer& value) = nullptr; + void* trans_to_separate_callback_args = nullptr; + + Status TransToSeparate(const Slice& internal_key, LazyBuffer& value, + const Slice& meta, bool is_merge, + bool is_index) override { + return SeparateHelper::TransToSeparate( + internal_key, value, value.file_number(), meta, is_merge, is_index, + value_meta_extractor.get()); + } + + Status TransToSeparate(const Slice& key, LazyBuffer& value) override { + if (trans_to_separate_callback == nullptr) { + return Status::NotSupported(); + } + return trans_to_separate_callback(trans_to_separate_callback_args, key, + value); + } + + LazyBuffer TransToCombined(const Slice& user_key, uint64_t sequence, + const LazyBuffer& value) const override { + return separate_helper->TransToCombined(user_key, sequence, value); + } + }; + public: CompactionJob(int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, diff --git a/db/table_cache.cc b/db/table_cache.cc index 75eee2d558..22de5b77f7 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -85,7 +85,7 @@ static bool InheritanceMismatch(const FileMetaData& sst_meta, return true; } -// Store params for create depend table iterator in future +// Store params for create dependency table iterator in future class LazyCreateIterator : public Snapshot { TableCache* table_cache_; ReadOptions options_; // deep copy @@ -292,7 +292,7 @@ InternalIterator* TableCache::NewIterator( } size_t readahead = 0; bool record_stats = !for_compaction; - if (file_meta.prop.is_map_sst()) { + if (file_meta.prop.is_map_sst() || file_meta.prop.is_link_sst()) { record_stats = false; } else { // MapSST don't handle these @@ -340,15 +340,9 @@ InternalIterator* TableCache::NewIterator( } InternalIterator* result = nullptr; if (s.ok()) { - if (!file_meta.prop.is_map_sst()) { - if (options.table_filter && - !options.table_filter(*table_reader->GetTableProperties())) { - result = NewEmptyInternalIterator(arena); - } else { - result = table_reader->NewIterator(options, prefix_extractor, arena, - skip_filters, for_compaction); - } - } else { + // For map & linked SST, we should expand their underlying key value pairs, + // not simply iterate the input SST key values. + if(file_meta.prop.is_map_sst() || file_meta.prop.is_link_sst()) { ReadOptions map_options = options; map_options.total_order_seek = true; map_options.readahead_size = 0; @@ -356,6 +350,7 @@ InternalIterator* TableCache::NewIterator( table_reader->NewIterator(map_options, prefix_extractor, arena, skip_filters, false /* for_compaction */); if (!dependence_map.empty()) { + // Map SST will handle range deletion internally, so we can skip here. bool ignore_range_deletions = options.ignore_range_deletions || file_meta.prop.map_handle_range_deletions(); @@ -365,33 +360,54 @@ InternalIterator* TableCache::NewIterator( lazy_create_iter = new (buffer) LazyCreateIterator( this, options, env_options, range_del_agg, prefix_extractor, for_compaction, skip_filters, ignore_range_deletions, level); - } else { lazy_create_iter = new LazyCreateIterator( this, options, env_options, range_del_agg, prefix_extractor, for_compaction, skip_filters, ignore_range_deletions, level); } - auto map_sst_iter = NewMapSstIterator( - &file_meta, result, dependence_map, ioptions_.internal_comparator, - lazy_create_iter, c_style_callback(*lazy_create_iter), arena); + + // For map & linked sst, we should expand their dependencies and merge + // all related iterators into one combined iterator for further reads. + InternalIterator* sst_iter = nullptr; + + if(file_meta.prop.is_map_sst()) { + sst_iter = NewMapSstIterator( + &file_meta, result, dependence_map, ioptions_.internal_comparator, + lazy_create_iter, c_style_callback(*lazy_create_iter), arena); + } else { + assert(file_meta.prop.is_link_sst()); + sst_iter = NewLinkSstIterator( + &file_meta, result, dependence_map, ioptions_.internal_comparator, + lazy_create_iter, c_style_callback(*lazy_create_iter), arena); + } + if (arena != nullptr) { - map_sst_iter->RegisterCleanup( + sst_iter->RegisterCleanup( [](void* arg1, void* arg2) { static_cast(arg1)->~InternalIterator(); static_cast(arg2)->~LazyCreateIterator(); }, result, lazy_create_iter); } else { - map_sst_iter->RegisterCleanup( + sst_iter->RegisterCleanup( [](void* arg1, void* arg2) { delete static_cast(arg1); delete static_cast(arg2); }, result, lazy_create_iter); } - result = map_sst_iter; + result = sst_iter; + } + } else { + if (options.table_filter && + !options.table_filter(*table_reader->GetTableProperties())) { + result = NewEmptyInternalIterator(arena); + } else { + result = table_reader->NewIterator(options, prefix_extractor, arena, + skip_filters, for_compaction); } } + if (create_new_table_reader) { assert(handle == nullptr); result->RegisterCleanup(&DeleteTableReader, table_reader, diff --git a/db/version_edit.h b/db/version_edit.h index 86049f46ce..ff878e4745 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -98,8 +98,11 @@ struct TablePropertyCache { std::vector inheritance; // inheritance set uint64_t earliest_time_begin_compact = port::kMaxUint64; uint64_t latest_time_end_compact = port::kMaxUint64; + uint32_t lbr_hash_bits = 11; // hash bits for each LinkSST KV + uint32_t lbr_group_size = 16; // Group size for LinkBlockRecord bool is_map_sst() const { return purpose == kMapSst; } + bool is_link_sst() const {return purpose == kLinkSst; } bool has_range_deletions() const { return (flags & kNoRangeDeletions) == 0; } bool map_handle_range_deletions() const { return (flags & kMapHandleRangeDeletions) != 0; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 110f5ec52a..a8e546d56f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -88,6 +88,7 @@ enum SstPurpose { kEssenceSst, // Actual data storage sst kLogSst, // Log as sst kMapSst, // Dummy sst + kLinkSst, // Link SST }; struct Options; diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index 50c000f5b0..72dba2dc73 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -214,21 +214,18 @@ class MapSstIterator final : public InternalIterator { int include_largest_; std::vector link_; struct HeapElement { - InternalIterator* iter; + InternalIterator* iter{}; Slice key; }; template class HeapComparator { public: HeapComparator(const InternalKeyComparator& comparator) : c_(comparator) {} - bool operator()(const HeapElement& a, const HeapElement& b) const { return is_less ? c_.Compare(a.key, b.key) < 0 : c_.Compare(a.key, b.key) > 0; } - const InternalKeyComparator& internal_comparator() const { return c_; } - private: const InternalKeyComparator& c_; }; @@ -488,7 +485,7 @@ class MapSstIterator final : public InternalIterator { if (min_heap_.empty() || icomp.Compare(min_heap_.top().key, largest_key_) >= include_largest_) { - // out of largest bound + // out of the largest bound first_level_value_.reset(); first_level_iter_->Next(); if (InitFirstLevelIter()) { @@ -544,6 +541,475 @@ class MapSstIterator final : public InternalIterator { virtual Status status() const override { return status_; } }; +// Each LinkSST contains a list of LinkBlock, each LinkBlock contains a certain +// number of file_numbers which indicate where the KV pairs are placed. +class LinkBlockRecord { + public: + LinkBlockRecord(IteratorCache* iterator_cache, + const Slice& key, int group_sz, int hash_bits) + : iterator_cache_(iterator_cache), + max_key_(key), group_sz_(group_sz), hash_bits_(hash_bits) { + key_buffer_.resize(group_sz); + } + + // TODO(guokuankuan@bytedance.com) + // Encode current LinkBlock into slice, so we can put it into a LinkSST. + // Format: + // [file_numbers] block_sz * uint64_t (varint) + // [hash_values] byte aligned (block_sz * hash_bits) + // [smallest key] length prefixed slice + void Encode() {} + + // Decode from a LinkSST's value. + bool DecodeFrom(Slice& input) { + // Decode all file numbers for each KV pair + for(int i = 0; i < group_sz_; ++i) { + uint64_t file_number = 0; + if(!GetVarint64(&input, &file_number)) { + return false; + } + file_numbers_.emplace_back(file_number); + } + assert(file_numbers_.size() == group_sz_); + // Decode hashed values + int total_bits = group_sz_ * hash_bits_; + int total_bytes = total_bits % 8 == 0 ? total_bits / 8 :total_bits / 8 + 1; + assert(input.size() > total_bytes); + assert(hash_bits_ <= 32); + for(int i = 0; i < group_sz_; ++i) { + // TODO(guokuankuan@bytedance.com) Add some UT for this function. + // convert bit represent into uint32 hash values. + int start_pos = i * hash_bits_; + int start_bytes = start_pos / 8; + int end_bytes = (start_pos + hash_bits_) / 8; + uint32_t hash = 0; + memcpy((char*)&hash, input.data() + start_bytes, end_bytes - start_bytes + 1); + hash << (start_pos % 8); + hash >> ((7 - (start_pos + hash_bits_) % 8) + (start_pos % 8)); + hash_values_.emplace_back(hash); + } + // Decode optional smallest key + if(!GetLengthPrefixedSlice(&input, &smallest_key_)) { + return false; + } + return true; + } + + // There should be only one active LinkBlock during the Link SST iteration. + // Here we load all underlying iterators within current LinkBlock and reset + // iter_idx for further sue. + bool ActiveLinkBlock() { + for(int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[i]); + if(!it->status().ok()) { + return false; + } + } + iter_idx = 0; + return true; + } + + // Seek all iterators to their first item. + Status SeekToFirst() { + iter_idx = 0; + for(int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + it->SeekToFirst(); + if(!it->status().ok()) { + return it->status(); + } + } + return Status::OK(); + } + + // Seek all iterators to their last item. + Status SeekToLast() { + assert(group_sz_ == file_numbers_.size()); + iter_idx = group_sz_ - 1; + for(int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + it->SeekToLast(); + if(!it->status().ok()) { + return it->status(); + } + } + return Status::OK(); + } + + Status Seek(const Slice& target, const InternalKeyComparator& icomp) { + uint32_t target_hash = hash(target, hash_bits_); + // Lower bound search for target key + uint32_t left = 0; + uint32_t right = group_sz_; + while(left < right) { + uint32_t mid = left + (right - left) / 2; + auto key = buffered_key(mid, file_numbers_[mid]); + // TODO (guokuankuan@bytedance.com) + // Shall we check key's hash value here? + if(icomp.Compare(key, target) >= 0) { + right = mid; + } else { + left = mid + 1; + } + } + + if(left < group_sz_) { + iter_idx = left; + // Prepare target SST's iterator for further use + // TODO Shall we init all other iterators to the right place so we can + // reuse them in later Next()/Prev()? + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + it->Seek(target); + return Status::OK(); + } else { + iter_idx = -1; + return Status::Corruption(); + } + } + + // Move current iterator to next position, will skip all invalid records + // (hash = 0) + // If all subsequent items are invalid, return an error status. + Status Next() { + // Find the next valid position of current LinkBlock + int next_iter_idx = iter_idx + 1; + while(next_iter_idx < hash_values_.size()) { + // Move forward target iterator + auto it = iterator_cache_->GetIterator(file_numbers_[next_iter_idx]); + it->Next(); + assert(it->status().ok()); + if(hash_values_[next_iter_idx] != INVALID_ITEM_HASH) { + break; + } + next_iter_idx++; + } + + // Exceed max boundary, we should try next LinkBlock, the iter_idx is now + // meaningless since there should be only one LinkBlock active at the same + // time during iteration. + if(next_iter_idx == hash_values_.size()) { + return Status::NotFound("Exceed LinkBlock's max boundary"); + } + + // Current LinkBlock is still in use, update iter_idx. + iter_idx = next_iter_idx; + return Status::OK(); + } + + // See the comment `Next()`, the `Prev()` implementation is almost the same + // except iterator direction. + Status Prev() { + // Find the previous valid position of current LinkBlock + int prev_iter_idx = iter_idx - 1; + while(prev_iter_idx >= 0) { + // Move backward + auto it = iterator_cache_->GetIterator(file_numbers_[prev_iter_idx]); + it->Prev(); + assert(it->status().ok()); + if(hash_values_[prev_iter_idx] != INVALID_ITEM_HASH) { + break; + } + prev_iter_idx--; + } + + // Exceed the smallest boundary + if(prev_iter_idx == -1) { + return Status::NotFound("Exceed LinkBlock's smallest boundary"); + } + + // Current LinkBlock is still in use, update iter_idx. + iter_idx = prev_iter_idx; + + return Status::OK(); + } + + // Extract key from the underlying SST iterator + Slice CurrentKey() const { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + return it->key(); + } + + // Extract value from the underlying SST iterator + LazyBuffer CurrentValue() const { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + return it->value(); + } + + // Return the max key of current LinkBlock + Slice MaxKey() const { return max_key_; } + + // If we have a non-empty `smallest_key_`, we should re-init all underlying + // iterators (default: empty) + Slice SmallestKey() const {return smallest_key_; } + + bool HasSmallestKey() const { return smallest_key_.valid(); } + + private: + // If a key's hash is marked as `INVALID_ITEM_HASH`, it means we should remove + // this item in next compaction, thus we shouldn't read from it. + const int INVALID_ITEM_HASH = 0; + + // TODO(guokuankuan@bytedance.com) + // Hash a user key into an integer, limit the maximum bits. + uint32_t hash(const Slice& user_key, int max_bits) { + return 0; + } + + Slice buffered_key(uint32_t idx, uint64_t file_number) { + if(!key_buffer_[idx].valid()) { + // Find out the occurrence between `idx` and the last position of current + // file_number, e.g. [0 ,1, 10, 11, 9, 10, 2], if `idx` is 2, then the + // occurrence should be {2, 5}. + std::vector occurrence = {idx}; + for(uint32_t i = idx + 1; i < group_sz_; ++i) { + if(file_numbers_[i] == file_number) { + occurrence.emplace_back(i); + } + } + + // Seek to the last position of current file_number and `Prev` back to the + // position `idx`, fill all touched keys into the key buffer. + auto it = iterator_cache_->GetIterator(file_number); + it->SeekForPrev(max_key_); + assert(it->status().ok()); + for(uint32_t i = occurrence.size() - 1; i >=0; --i) { + uint32_t pos = occurrence[i]; + if(!key_buffer_[pos].valid()) { + key_buffer_[pos] = it->key(); + } + it->Prev(); + } + } + + assert(!key_buffer_[idx].empty()); + return key_buffer_[idx]; + } + + private: + IteratorCache* iterator_cache_; + + // The end/max key of current LinkBlock + Slice max_key_; + // How many KV pairs we should group into one LinkBlock. + int group_sz_ = 8; + // Bits count required for each underlying SST file + int hash_bits_ = 11; + // Cache touched keys while iterating + std::vector key_buffer_; + // Indicate which SST current KV pairs belongs to. + // file_numbers_.size() == block_sz_ + std::vector file_numbers_; + // Each KV pair has a hash value (hash(user_key)) + std::vector hash_values_; + // Current iteration index of this LinkBlock. + int iter_idx = 0; + // Optional, if smallest_key_exist, it means one of the underlying iterator + // is expired, we should seek all iterators to target key again for further + // iteration. + Slice smallest_key_; +}; + +// TODO(guokuankuan@bytedance.com) +// A LinkSstIterator is almost the same to MapSstIterator. +class LinkSstIterator : public InternalIterator { + private: + const FileMetaData* file_meta_; + InternalIterator* link_sst_iter_ {}; + InternalKeyComparator icomp_; + // The smallest key of current Link SST +// Slice smallest_key; + // The largest key of current Link SST +// Slice largest_key; + + IteratorCache iterator_cache_; + std::vector lbr_list_; + + Status status_; + uint32_t cur_lbr_idx_{}; + +// BinaryHeap, HeapVectorType> min_heap_; + + public: + LinkSstIterator(const FileMetaData* file_meta, InternalIterator* iter, + const DependenceMap& dependence_map, + const InternalKeyComparator& icomp, void* create_arg, + const IteratorCache::CreateIterCallback& create) + : file_meta_(file_meta), + link_sst_iter_(iter), + icomp_(icomp), + iterator_cache_(dependence_map, create_arg, create) { + if (file_meta != nullptr && !file_meta_->prop.is_map_sst()) { + abort(); + } + } + + ~LinkSstIterator() override = default; + + private: + // Init all LBR by decoding all LinkSST's value. + bool InitLinkBlockRecords() { + LazyBuffer current_value_; + link_sst_iter_->SeekToFirst(); + while(link_sst_iter_->Valid()) { + current_value_ = link_sst_iter_->value(); + status_ = current_value_.fetch(); + if(!status_.ok()) { + status_ = Status::Corruption("Failed to fetch lazy buffer"); + return false; + } + Slice input = current_value_.slice(); + lbr_list_.emplace_back(&iterator_cache_, + link_sst_iter_->key(), + file_meta_->prop.lbr_group_size, + file_meta_->prop.lbr_hash_bits); + if(!lbr_list_.back().DecodeFrom(input)) { + status_ = Status::Corruption("Cannot decode Link SST"); + return false; + } + link_sst_iter_->Next(); + } + current_value_.reset(); + return true; + } + + // We assume there should be a lot of underlying SST for each LinkSST, so we + // could simply initialize all SST iterators before any iteration. + bool InitSecondLevelIterators() { + for(auto& lb: lbr_list_) { + if(!lb.ActiveLinkBlock()) { + return false; + } + } + return true; + } + + public: + bool Valid() const override { return !lbr_list_.empty(); } + void SeekToFirst() override { + if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Aborted(); + return; + } + assert(!lbr_list_.empty()); + cur_lbr_idx_ = 0; + status_ = lbr_list_[cur_lbr_idx_].SeekToFirst(); + } + + void SeekToLast() override { + if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Corruption(); + return; + } + assert(!lbr_list_.empty()); + cur_lbr_idx_ = lbr_list_.size() - 1; + status_ = lbr_list_[cur_lbr_idx_].SeekToLast(); + } + + // TODO(guokuankuan@bytendance.com) + // Is input target a InternalKey ? then what is the default sequence#? + void Seek(const Slice& target) override { + if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Corruption(); + return; + } + // Find target LinkBlock's position + auto it = std::lower_bound(lbr_list_.begin(), lbr_list_.end(), target, + [&](const LinkBlockRecord& lbr, const Slice& target){ + return icomp_.Compare(lbr.MaxKey(), target) < 0; + }); + if(it == lbr_list_.end()) { + status_ = Status::NotFound(); + return; + } + cur_lbr_idx_ = it - lbr_list_.begin(); + // Do the Seek + status_ = lbr_list_[cur_lbr_idx_].Seek(target, icomp_); + } + + // Position at the first key at or before the target. + void SeekForPrev(const Slice& target) override { + if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Corruption(); + return; + } + + // We adopt Seek & Prev semantics here + Seek(target); + + // If the position's key is equal to target, we are good to go. + if(status_.ok() && key() == target) { + return; + } + + // If the key is greater than target, then we need to `Prev` to the right + // place. + while(status_.ok() && key().compare(target) > 0) { + Prev(); + } + } + + void Next() override { + while(cur_lbr_idx_ < lbr_list_.size()) { + auto s = lbr_list_[cur_lbr_idx_].Next(); + if(s.ok()) { + break; + } + + // If we cannot `Next()` current LBR properly, try next. + cur_lbr_idx_++; + if(cur_lbr_idx_ == lbr_list_.size()) { + break; + } + + assert(cur_lbr_idx_ < lbr_list_.size()); + // If next LBR has a valid smallest key, we should re-seek all iterators + // (which means the iterators' continuous may break) + auto lbr = lbr_list_[cur_lbr_idx_]; + if(lbr.HasSmallestKey()) { + lbr.Seek(lbr.SmallestKey(), icomp_); + } + } + + // No valid position found + if(cur_lbr_idx_ == lbr_list_.size()) { + status_ = Status::NotFound("End of iterator exceeded"); + return; + } + } + + void Prev() override { + while(cur_lbr_idx_ >= 0) { + auto s = lbr_list_[cur_lbr_idx_].Prev(); + if(s.ok()) { + break; + } + // All items were consumed, exit. + if(cur_lbr_idx_ == 0) { + status_ = Status::NotFound("Not more previous items!"); + return; + } + + // If we cannot `Prev()` current LBR, try previous one, note that if current + // LBR has a valid smallest key, we should re-seek previous LBR. + cur_lbr_idx_--; + auto curr_lbr = lbr_list_[cur_lbr_idx_ + 1]; + auto prev_lbr = lbr_list_[cur_lbr_idx_]; + if(curr_lbr.HasSmallestKey()) { + prev_lbr.Seek(prev_lbr.MaxKey(), icomp_); + } + } + } + + Slice key() const override { + assert(Valid()); + return lbr_list_[cur_lbr_idx_].CurrentKey(); + } + LazyBuffer value() const override { + assert(Valid()); + return lbr_list_[cur_lbr_idx_].CurrentValue(); + } + Status status() const override { return status_; } +}; } // namespace InternalIteratorBase* NewTwoLevelIterator( @@ -552,6 +1018,23 @@ InternalIteratorBase* NewTwoLevelIterator( return new TwoLevelIndexIterator(state, first_level_iter); } +InternalIterator* NewLinkSstIterator( + const FileMetaData* file_meta, InternalIterator* mediate_sst_iter, + const DependenceMap& dependence_map, const InternalKeyComparator& icomp, + void* callback_arg, const IteratorCache::CreateIterCallback& create_iter, + Arena* arena) { + assert(file_meta == nullptr || file_meta->prop.is_link_sst()); + if (arena == nullptr) { + return new LinkSstIterator(file_meta, mediate_sst_iter, dependence_map, + icomp, callback_arg, create_iter); + } else { + void* buffer = arena->AllocateAligned(sizeof(LinkSstIterator)); + return new (buffer) + LinkSstIterator(file_meta, mediate_sst_iter, dependence_map, icomp, + callback_arg, create_iter); + } +} + InternalIterator* NewMapSstIterator( const FileMetaData* file_meta, InternalIterator* mediate_sst_iter, const DependenceMap& dependence_map, const InternalKeyComparator& icomp, diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h index 3a967bc3ea..0286fbf153 100644 --- a/table/two_level_iterator.h +++ b/table/two_level_iterator.h @@ -42,12 +42,20 @@ extern InternalIteratorBase* NewTwoLevelIterator( TwoLevelIteratorState* state, InternalIteratorBase* first_level_iter); -// Retuan a two level iterator. for unroll map sst // keep all params lifecycle please +// @return a two level iterator. for unroll map sst extern InternalIterator* NewMapSstIterator( const FileMetaData* file_meta, InternalIterator* mediate_sst_iter, const DependenceMap& dependence_map, const InternalKeyComparator& icomp, void* callback_arg, const IteratorCache::CreateIterCallback& create_iter, Arena* arena = nullptr); +// A link sst iterator should expand all its dependencies for the callers. +// @return TwoLevelIterator +extern InternalIterator* NewLinkSstIterator( + const FileMetaData* file_meta, InternalIterator* mediate_sst_iter, + const DependenceMap& dependence_map, const InternalKeyComparator& icomp, + void* callback_arg, const IteratorCache::CreateIterCallback& create_iter, + Arena* arena = nullptr); + } // namespace TERARKDB_NAMESPACE diff --git a/util/heap.h b/util/heap.h index a21015998c..96b34c8087 100644 --- a/util/heap.h +++ b/util/heap.h @@ -129,7 +129,7 @@ class BinaryHeap { T v = std::move(data_[index]); size_t picked_child = port::kMaxSizet; - while (1) { + while (true) { const size_t left_child = get_left(index); if (get_left(index) >= data_.size()) { break; diff --git a/util/iterator_cache.h b/util/iterator_cache.h index 011f2dde99..bef49e698a 100644 --- a/util/iterator_cache.h +++ b/util/iterator_cache.h @@ -19,6 +19,10 @@ class RangeDelAggregator; class TableReader; // FileMetaData> +// +// TODO(guokuankuan@bytedance.com) +// Shall we change this variable name to `FileMetaMap`? This map is simply map +// file number to it's related file metadata typedef chash_map DependenceMap; class IteratorCache {