Skip to content

Commit

Permalink
[WIP] LinkCompaction: Add LinkSstIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
royguo committed Apr 19, 2022
1 parent c9dd2ff commit de060ed
Show file tree
Hide file tree
Showing 9 changed files with 575 additions and 53 deletions.
31 changes: 3 additions & 28 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1585,34 +1585,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
snapshot_checker_, compact_->compaction->level(),
db_options_.statistics.get(), shutting_down_);

struct BuilderSeparateHelper : public SeparateHelper {
SeparateHelper* separate_helper = nullptr;
std::unique_ptr<ValueExtractor> value_meta_extractor;
Status (*trans_to_separate_callback)(void* args, const Slice& key,
LazyBuffer& value) = nullptr;
void* trans_to_separate_callback_args = nullptr;

Status TransToSeparate(const Slice& internal_key, LazyBuffer& value,
const Slice& meta, bool is_merge,
bool is_index) override {
return SeparateHelper::TransToSeparate(
internal_key, value, value.file_number(), meta, is_merge, is_index,
value_meta_extractor.get());
}

Status TransToSeparate(const Slice& key, LazyBuffer& value) override {
if (trans_to_separate_callback == nullptr) {
return Status::NotSupported();
}
return trans_to_separate_callback(trans_to_separate_callback_args, key,
value);
}
BuilderSeparateHelper separate_helper;

LazyBuffer TransToCombined(const Slice& user_key, uint64_t sequence,
const LazyBuffer& value) const override {
return separate_helper->TransToCombined(user_key, sequence, value);
}
} separate_helper;
if (compact_->compaction->immutable_cf_options()
->value_meta_extractor_factory != nullptr) {
ValueExtractorContext context = {cfd->GetID()};
Expand Down Expand Up @@ -2048,7 +2022,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {

// TODO([email protected])
void CompactionJob::ProcessLinkCompaction(SubcompactionState* sub_compact) {
return;
assert(sub_compact != nullptr);
return ProcessKeyValueCompaction(sub_compact);
}

void CompactionJob::ProcessGarbageCollection(SubcompactionState* sub_compact) {
Expand Down
32 changes: 32 additions & 0 deletions db/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,38 @@ class VersionEdit;
class VersionSet;

class CompactionJob {
public:

class BuilderSeparateHelper : public SeparateHelper {
public:
SeparateHelper* separate_helper = nullptr;
std::unique_ptr<ValueExtractor> value_meta_extractor;
Status (*trans_to_separate_callback)(void* args, const Slice& key,
LazyBuffer& value) = nullptr;
void* trans_to_separate_callback_args = nullptr;

Status TransToSeparate(const Slice& internal_key, LazyBuffer& value,
const Slice& meta, bool is_merge,
bool is_index) override {
return SeparateHelper::TransToSeparate(
internal_key, value, value.file_number(), meta, is_merge, is_index,
value_meta_extractor.get());
}

Status TransToSeparate(const Slice& key, LazyBuffer& value) override {
if (trans_to_separate_callback == nullptr) {
return Status::NotSupported();
}
return trans_to_separate_callback(trans_to_separate_callback_args, key,
value);
}

LazyBuffer TransToCombined(const Slice& user_key, uint64_t sequence,
const LazyBuffer& value) const override {
return separate_helper->TransToCombined(user_key, sequence, value);
}
};

public:
CompactionJob(int job_id, Compaction* compaction,
const ImmutableDBOptions& db_options,
Expand Down
52 changes: 34 additions & 18 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static bool InheritanceMismatch(const FileMetaData& sst_meta,
return true;
}

// Store params for create depend table iterator in future
// Store params for create dependency table iterator in future
class LazyCreateIterator : public Snapshot {
TableCache* table_cache_;
ReadOptions options_; // deep copy
Expand Down Expand Up @@ -292,7 +292,7 @@ InternalIterator* TableCache::NewIterator(
}
size_t readahead = 0;
bool record_stats = !for_compaction;
if (file_meta.prop.is_map_sst()) {
if (file_meta.prop.is_map_sst() || file_meta.prop.is_link_sst()) {
record_stats = false;
} else {
// MapSST don't handle these
Expand Down Expand Up @@ -340,22 +340,17 @@ InternalIterator* TableCache::NewIterator(
}
InternalIterator* result = nullptr;
if (s.ok()) {
if (!file_meta.prop.is_map_sst()) {
if (options.table_filter &&
!options.table_filter(*table_reader->GetTableProperties())) {
result = NewEmptyInternalIterator<LazyBuffer>(arena);
} else {
result = table_reader->NewIterator(options, prefix_extractor, arena,
skip_filters, for_compaction);
}
} else {
// For map & linked SST, we should expand their underlying key value pairs,
// not simply iterate the input SST key values.
if(file_meta.prop.is_map_sst() || file_meta.prop.is_link_sst()) {
ReadOptions map_options = options;
map_options.total_order_seek = true;
map_options.readahead_size = 0;
result =
table_reader->NewIterator(map_options, prefix_extractor, arena,
skip_filters, false /* for_compaction */);
if (!dependence_map.empty()) {
// Map SST will handle range deletion internally, so we can skip here.
bool ignore_range_deletions =
options.ignore_range_deletions ||
file_meta.prop.map_handle_range_deletions();
Expand All @@ -365,33 +360,54 @@ InternalIterator* TableCache::NewIterator(
lazy_create_iter = new (buffer) LazyCreateIterator(
this, options, env_options, range_del_agg, prefix_extractor,
for_compaction, skip_filters, ignore_range_deletions, level);

} else {
lazy_create_iter = new LazyCreateIterator(
this, options, env_options, range_del_agg, prefix_extractor,
for_compaction, skip_filters, ignore_range_deletions, level);
}
auto map_sst_iter = NewMapSstIterator(
&file_meta, result, dependence_map, ioptions_.internal_comparator,
lazy_create_iter, c_style_callback(*lazy_create_iter), arena);

// For map & linked sst, we should expand their dependencies and merge
// all related iterators into one combined iterator for further reads.
InternalIterator* sst_iter = nullptr;

if(file_meta.prop.is_map_sst()) {
sst_iter = NewMapSstIterator(
&file_meta, result, dependence_map, ioptions_.internal_comparator,
lazy_create_iter, c_style_callback(*lazy_create_iter), arena);
} else {
assert(file_meta.prop.is_link_sst());
sst_iter = NewLinkSstIterator(
&file_meta, result, dependence_map, ioptions_.internal_comparator,
lazy_create_iter, c_style_callback(*lazy_create_iter), arena);
}

if (arena != nullptr) {
map_sst_iter->RegisterCleanup(
sst_iter->RegisterCleanup(
[](void* arg1, void* arg2) {
static_cast<InternalIterator*>(arg1)->~InternalIterator();
static_cast<LazyCreateIterator*>(arg2)->~LazyCreateIterator();
},
result, lazy_create_iter);
} else {
map_sst_iter->RegisterCleanup(
sst_iter->RegisterCleanup(
[](void* arg1, void* arg2) {
delete static_cast<InternalIterator*>(arg1);
delete static_cast<LazyCreateIterator*>(arg2);
},
result, lazy_create_iter);
}
result = map_sst_iter;
result = sst_iter;
}
} else {
if (options.table_filter &&
!options.table_filter(*table_reader->GetTableProperties())) {
result = NewEmptyInternalIterator<LazyBuffer>(arena);
} else {
result = table_reader->NewIterator(options, prefix_extractor, arena,
skip_filters, for_compaction);
}
}

if (create_new_table_reader) {
assert(handle == nullptr);
result->RegisterCleanup(&DeleteTableReader, table_reader,
Expand Down
3 changes: 3 additions & 0 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@ struct TablePropertyCache {
std::vector<uint64_t> inheritance; // inheritance set
uint64_t earliest_time_begin_compact = port::kMaxUint64;
uint64_t latest_time_end_compact = port::kMaxUint64;
uint32_t lbr_hash_bits = 11; // hash bits for each LinkSST KV
uint32_t lbr_group_size = 16; // Group size for LinkBlockRecord

bool is_map_sst() const { return purpose == kMapSst; }
bool is_link_sst() const {return purpose == kLinkSst; }
bool has_range_deletions() const { return (flags & kNoRangeDeletions) == 0; }
bool map_handle_range_deletions() const {
return (flags & kMapHandleRangeDeletions) != 0;
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ enum SstPurpose {
kEssenceSst, // Actual data storage sst
kLogSst, // Log as sst
kMapSst, // Dummy sst
kLinkSst, // Link SST
};

struct Options;
Expand Down
Loading

0 comments on commit de060ed

Please sign in to comment.