Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LinkCompaction: Add Link Compaction related interfaces #232

Open
wants to merge 5 commits into
base: link-compaction
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions db/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ enum CompactionType {
kKeyValueCompaction = 0,
kMapCompaction = 1,
kGarbageCollection = 2,
kLinkCompaction = 3,
};

struct CompactionParams {
Expand Down
2 changes: 1 addition & 1 deletion db/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ CompactionIterator::CompactionIterator(
level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
}

if (snapshots_->size() == 0) {
if (snapshots_->empty()) {
// optimize for fast path if there are no snapshots
visible_at_tip_ = true;
earliest_snapshot_ = kMaxSequenceNumber;
Expand Down
44 changes: 16 additions & 28 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ struct CompactionJob::SubcompactionState {
}

struct RebuildBlobsInfo {
// File numbers
chash_set<uint64_t> blobs;
// pop_count = planned file count - actual used file count.
size_t pop_count;
};
struct BlobRefInfo {
Expand Down Expand Up @@ -1493,6 +1495,9 @@ void CompactionJob::ProcessCompaction(SubcompactionState* sub_compact) {
case kGarbageCollection:
ProcessGarbageCollection(sub_compact);
break;
case kLinkCompaction:
ProcessLinkCompaction(sub_compact);
break;
default:
assert(false);
break;
Expand Down Expand Up @@ -1580,34 +1585,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
snapshot_checker_, compact_->compaction->level(),
db_options_.statistics.get(), shutting_down_);

struct BuilderSeparateHelper : public SeparateHelper {
SeparateHelper* separate_helper = nullptr;
std::unique_ptr<ValueExtractor> value_meta_extractor;
Status (*trans_to_separate_callback)(void* args, const Slice& key,
LazyBuffer& value) = nullptr;
void* trans_to_separate_callback_args = nullptr;

Status TransToSeparate(const Slice& internal_key, LazyBuffer& value,
const Slice& meta, bool is_merge,
bool is_index) override {
return SeparateHelper::TransToSeparate(
internal_key, value, value.file_number(), meta, is_merge, is_index,
value_meta_extractor.get());
}
BuilderSeparateHelper separate_helper;

Status TransToSeparate(const Slice& key, LazyBuffer& value) override {
if (trans_to_separate_callback == nullptr) {
return Status::NotSupported();
}
return trans_to_separate_callback(trans_to_separate_callback_args, key,
value);
}

LazyBuffer TransToCombined(const Slice& user_key, uint64_t sequence,
const LazyBuffer& value) const override {
return separate_helper->TransToCombined(user_key, sequence, value);
}
} separate_helper;
if (compact_->compaction->immutable_cf_options()
->value_meta_extractor_factory != nullptr) {
ValueExtractorContext context = {cfd->GetID()};
Expand Down Expand Up @@ -1765,6 +1744,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (!sub_compact->compaction->partial_compaction()) {
dict_sample_data.reserve(kSampleBytes);
}

// Represents how many records in target blob SST that are needed by the key
// SST
std::unordered_map<uint64_t, uint64_t> dependence;

size_t yield_count = 0;
Expand Down Expand Up @@ -2036,7 +2018,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
sub_compact->c_iter.reset();
input.reset();
sub_compact->status = status;
} // namespace TERARKDB_NAMESPACE
}

// TODO([email protected])
void CompactionJob::ProcessLinkCompaction(SubcompactionState* sub_compact) {
assert(sub_compact != nullptr);
return ProcessKeyValueCompaction(sub_compact);
}

void CompactionJob::ProcessGarbageCollection(SubcompactionState* sub_compact) {
assert(sub_compact != nullptr);
Expand Down
33 changes: 33 additions & 0 deletions db/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,38 @@ class VersionEdit;
class VersionSet;

class CompactionJob {
public:

class BuilderSeparateHelper : public SeparateHelper {
public:
SeparateHelper* separate_helper = nullptr;
std::unique_ptr<ValueExtractor> value_meta_extractor;
Status (*trans_to_separate_callback)(void* args, const Slice& key,
LazyBuffer& value) = nullptr;
void* trans_to_separate_callback_args = nullptr;

Status TransToSeparate(const Slice& internal_key, LazyBuffer& value,
const Slice& meta, bool is_merge,
bool is_index) override {
return SeparateHelper::TransToSeparate(
internal_key, value, value.file_number(), meta, is_merge, is_index,
value_meta_extractor.get());
}

Status TransToSeparate(const Slice& key, LazyBuffer& value) override {
if (trans_to_separate_callback == nullptr) {
return Status::NotSupported();
}
return trans_to_separate_callback(trans_to_separate_callback_args, key,
value);
}

LazyBuffer TransToCombined(const Slice& user_key, uint64_t sequence,
const LazyBuffer& value) const override {
return separate_helper->TransToCombined(user_key, sequence, value);
}
};

public:
CompactionJob(int job_id, Compaction* compaction,
const ImmutableDBOptions& db_options,
Expand Down Expand Up @@ -123,6 +155,7 @@ class CompactionJob {
// kv-pairs
void ProcessCompaction(SubcompactionState* sub_compact);
void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
void ProcessLinkCompaction(SubcompactionState* sub_compact);
void ProcessGarbageCollection(SubcompactionState* sub_compact);

Status FinishCompactionOutputFile(
Expand Down
23 changes: 21 additions & 2 deletions db/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2200,6 +2200,13 @@ class LevelCompactionBuilder {
// Pick and return a compaction.
Compaction* PickCompaction();

// Gather target files (including blob SST(KV separated) and ordinary SST) and
// composite a compaction for future use.
// A Link Compaction contains two major objectives:
// (1) Link target SST files into a logical linked sst
// (2) GC old linked SST files
Compaction* PickLinkCompaction();

// Pick lazy compaction
Compaction* PickLazyCompaction(const std::vector<SequenceNumber>& snapshots);

Expand Down Expand Up @@ -2298,7 +2305,10 @@ void LevelCompactionBuilder::SetupInitialFiles() {
// In these cases, to reduce L0 file count and thus reduce likelihood
// of write stalls, we can attempt compacting a span of files within
// L0.
if (PickIntraL0Compaction()) {
//
// If link compaction is enabled, we should skip L0 internal compaction
// since the link should be finished quite fast.
if (!ioptions_.enable_link_compaction && PickIntraL0Compaction()) {
output_level_ = 0;
compaction_reason_ = CompactionReason::kLevelL0FilesNum;
break;
Expand Down Expand Up @@ -2431,6 +2441,13 @@ Compaction* LevelCompactionBuilder::PickCompaction() {
return c;
}

// TODO ([email protected])
// We could reuse the ordinary compaction picker at the moment, but sooner we should pick link compactions smarter.
Compaction* LevelCompactionBuilder::PickLinkCompaction() {
compaction_type_ = CompactionType::kLinkCompaction;
return PickCompaction();
}

Compaction* LevelCompactionBuilder::PickLazyCompaction(
const std::vector<SequenceNumber>& snapshots) {
using SortedRun = CompactionPicker::SortedRun;
Expand Down Expand Up @@ -2971,7 +2988,7 @@ bool LevelCompactionBuilder::PickFileToCompact() {
// store where to start the iteration in the next call to PickCompaction
vstorage_->SetNextCompactionIndex(start_level_, cmp_idx);

return start_level_inputs_.size() > 0;
return !start_level_inputs_.empty();
}

bool LevelCompactionBuilder::PickIntraL0Compaction() {
Expand Down Expand Up @@ -2999,6 +3016,8 @@ Compaction* LevelCompactionPicker::PickCompaction(
mutable_cf_options, ioptions_);
if (ioptions_.enable_lazy_compaction) {
return builder.PickLazyCompaction(snapshots);
} else if (ioptions_.enable_link_compaction) {
return builder.PickLinkCompaction();
} else {
return builder.PickCompaction();
}
Expand Down
1 change: 1 addition & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,7 @@ class DBImpl : public DB {
void BackgroundCallGarbageCollection();
void BackgroundCallFlush();
void BackgroundCallPurge();

Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction);
Expand Down
9 changes: 9 additions & 0 deletions db/dbformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ enum ValueType : unsigned char {
// generated by WriteUnprepared write policy is not mistakenly read by
// another.
kTypeBeginUnprepareXID = 0x13, // WAL only.
// Similar to kTypeValueIndex, this means current value belongs to
// a LinkSST and pointed to the actual value file.
kTypeLinkIndex = 0x14, // LinkSST only
kMaxValue = 0x7F // Not used for storing records.
};

Expand Down Expand Up @@ -784,6 +787,12 @@ struct ParsedInternalKeyComparator {
const InternalKeyComparator* cmp;
};

// This interface is used for transferring data format between KV separated and
// combined kv pairs.
// We should add an implementation instance while processing key value pairs
// during compactions.
// Note that, we also port `Version` to this interface since we may need to fetch
// value from any version that has separated values.
class SeparateHelper {
public:
virtual ~SeparateHelper() = default;
Expand Down
52 changes: 34 additions & 18 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static bool InheritanceMismatch(const FileMetaData& sst_meta,
return true;
}

// Store params for create depend table iterator in future
// Store params for create dependency table iterator in future
class LazyCreateIterator : public Snapshot {
TableCache* table_cache_;
ReadOptions options_; // deep copy
Expand Down Expand Up @@ -292,7 +292,7 @@ InternalIterator* TableCache::NewIterator(
}
size_t readahead = 0;
bool record_stats = !for_compaction;
if (file_meta.prop.is_map_sst()) {
if (file_meta.prop.is_map_sst() || file_meta.prop.is_link_sst()) {
record_stats = false;
} else {
// MapSST don't handle these
Expand Down Expand Up @@ -340,22 +340,17 @@ InternalIterator* TableCache::NewIterator(
}
InternalIterator* result = nullptr;
if (s.ok()) {
if (!file_meta.prop.is_map_sst()) {
if (options.table_filter &&
!options.table_filter(*table_reader->GetTableProperties())) {
result = NewEmptyInternalIterator<LazyBuffer>(arena);
} else {
result = table_reader->NewIterator(options, prefix_extractor, arena,
skip_filters, for_compaction);
}
} else {
// For map & linked SST, we should expand their underlying key value pairs,
// not simply iterate the input SST key values.
if(file_meta.prop.is_map_sst() || file_meta.prop.is_link_sst()) {
ReadOptions map_options = options;
map_options.total_order_seek = true;
map_options.readahead_size = 0;
result =
table_reader->NewIterator(map_options, prefix_extractor, arena,
skip_filters, false /* for_compaction */);
if (!dependence_map.empty()) {
// Map SST will handle range deletion internally, so we can skip here.
bool ignore_range_deletions =
options.ignore_range_deletions ||
file_meta.prop.map_handle_range_deletions();
Expand All @@ -365,33 +360,54 @@ InternalIterator* TableCache::NewIterator(
lazy_create_iter = new (buffer) LazyCreateIterator(
this, options, env_options, range_del_agg, prefix_extractor,
for_compaction, skip_filters, ignore_range_deletions, level);

} else {
lazy_create_iter = new LazyCreateIterator(
this, options, env_options, range_del_agg, prefix_extractor,
for_compaction, skip_filters, ignore_range_deletions, level);
}
auto map_sst_iter = NewMapSstIterator(
&file_meta, result, dependence_map, ioptions_.internal_comparator,
lazy_create_iter, c_style_callback(*lazy_create_iter), arena);

// For map & linked sst, we should expand their dependencies and merge
// all related iterators into one combined iterator for further reads.
InternalIterator* sst_iter = nullptr;

if(file_meta.prop.is_map_sst()) {
sst_iter = NewMapSstIterator(
&file_meta, result, dependence_map, ioptions_.internal_comparator,
lazy_create_iter, c_style_callback(*lazy_create_iter), arena);
} else {
assert(file_meta.prop.is_link_sst());
sst_iter = NewLinkSstIterator(
&file_meta, result, dependence_map, ioptions_.internal_comparator,
lazy_create_iter, c_style_callback(*lazy_create_iter), arena);
}

if (arena != nullptr) {
map_sst_iter->RegisterCleanup(
sst_iter->RegisterCleanup(
[](void* arg1, void* arg2) {
static_cast<InternalIterator*>(arg1)->~InternalIterator();
static_cast<LazyCreateIterator*>(arg2)->~LazyCreateIterator();
},
result, lazy_create_iter);
} else {
map_sst_iter->RegisterCleanup(
sst_iter->RegisterCleanup(
[](void* arg1, void* arg2) {
delete static_cast<InternalIterator*>(arg1);
delete static_cast<LazyCreateIterator*>(arg2);
},
result, lazy_create_iter);
}
result = map_sst_iter;
result = sst_iter;
}
} else {
if (options.table_filter &&
!options.table_filter(*table_reader->GetTableProperties())) {
result = NewEmptyInternalIterator<LazyBuffer>(arena);
} else {
result = table_reader->NewIterator(options, prefix_extractor, arena,
skip_filters, for_compaction);
}
}

if (create_new_table_reader) {
assert(handle == nullptr);
result->RegisterCleanup(&DeleteTableReader, table_reader,
Expand Down
3 changes: 3 additions & 0 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@ struct TablePropertyCache {
std::vector<uint64_t> inheritance; // inheritance set
uint64_t earliest_time_begin_compact = port::kMaxUint64;
uint64_t latest_time_end_compact = port::kMaxUint64;
uint32_t lbr_hash_bits = 11; // hash bits for each LinkSST KV
uint32_t lbr_group_size = 16; // Group size for LinkBlockRecord

bool is_map_sst() const { return purpose == kMapSst; }
bool is_link_sst() const {return purpose == kLinkSst; }
bool has_range_deletions() const { return (flags & kNoRangeDeletions) == 0; }
bool map_handle_range_deletions() const {
return (flags & kMapHandleRangeDeletions) != 0;
Expand Down
5 changes: 5 additions & 0 deletions include/rocksdb/advanced_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ struct AdvancedColumnFamilyOptions {
// LazyCompaction, default false
bool enable_lazy_compaction = false;

// Link Compaciton is a replacement policy to general compactions.
// It will not physically compacts target SST files immediately but instead link them logically.
// The actually physical compaction will be triggered in the background.
bool enable_link_compaction = false;

// Read TableProperties from file if false
bool pin_table_properties_in_reader = true;

Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ class CompactionFilter

// Determines whether value changed by compaction filter were stable.
// Default as false, which means stability of outcome is not promised.
// "Stable" means the changed value will not change after the same
// operation is applied multiple times.
// Creators of the compaction filter should override this function, or
// the behavior of the stability checking is undefined.
virtual bool IsStableChangeValue() const { return false; }

// Returns a name that identifies this compaction filter.
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ enum SstPurpose {
kEssenceSst, // Actual data storage sst
kLogSst, // Log as sst
kMapSst, // Dummy sst
kLinkSst, // Link SST
};

struct Options;
Expand Down
2 changes: 2 additions & 0 deletions options/cf_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ struct ImmutableCFOptions {

bool enable_lazy_compaction;

bool enable_link_compaction;

bool pin_table_properties_in_reader;

bool inplace_update_support;
Expand Down
Loading