Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tiering): Faster small bins serialization #3340

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 84 additions & 23 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "server/snapshot.h"

#include <absl/container/btree_set.h>
#include <absl/functional/bind_front.h>
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>
Expand Down Expand Up @@ -37,8 +38,80 @@ size_t SliceSnapshot::DbRecord::size() const {
return HeapSize(value);
}

struct SliceSnapshot::TieredSerializer {
struct ValueBase {
DbIndex dbid;
CompactObj key;
time_t expire;
};

using PendingValue = std::pair<ValueBase, util::fb2::Future<PrimeValue>>;
using DelayedValue = std::pair<ValueBase, PrimeValue /* stores segment and encoding flags */>;

static const int kMaxPageAccum = 100000;

void Save(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv, time_t expire_time) {
ValueBase base{db_indx, PrimeKey(pk.ToString()), expire_time};

// Delay serialization of small values to possibly find more for the same page,
// reducing the total number of issued io requests.
if (auto [offset, size] = pv.GetExternalSlice(); size < TieredStorage::kMinOccupancySize) {
PrimeValue pv_copy;
pv_copy.ImportExternal(pv);
size_t page = offset / tiering::kPageSize;
auto& entries = delayed_[page];
{
delayed_sizes_.erase({entries.size(), page});
entries.emplace_back(std::move(base), std::move(pv_copy));
delayed_sizes_.insert({entries.size(), page});
}
} else {
pending_.push_back(Read(std::move(base), pv));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pending_reads_

}
}

void FlushDelayed(bool force) {
// Flush pages with most records accumulated first, or all, if forced.
// It's enough just to issue reads, because they are collapsed by the tiered storage internally
while ((force && !delayed_.empty()) || delayed_.size() > kMaxPageAccum) {
Comment on lines +73 to +76
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to choose some kind of limit here... by the number of pages? by the total sum of enties? 🤷🏻‍♂️ We have to take into account that once we flush all of those, we can get a memory spike

DCHECK(!delayed_sizes_.empty());
auto [size, page] = delayed_sizes_.extract(delayed_sizes_.begin()).value();
auto entries = delayed_.extract(page);
for (auto& [base, value] : entries.mapped()) {
DCHECK(value.IsExternal());
pending_.push_back(Read(std::move(base), value));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what guarantess that by the time you flush delayed, their segment are still correct?
maybe the pages were freed and repurposed for other values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside tiered storage, I don't delete small bin pages while we're serializing

}
}
}

void Serialize(RdbSerializer* serializer) {
for (auto& [base, value] : pending_)
serializer->SaveEntry(base.key, value.Get(), base.expire, base.dbid);
pending_.clear();
}

private:
static PendingValue Read(ValueBase value, const PrimeValue& pv) {
auto* ts = EngineShard::tlocal()->tiered_storage();
util::fb2::Future<PrimeValue> future; // store PrimeValue directly to avoid further copies
ts->Read(value.dbid, value.key.ToString(), pv,
[future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); });
return PendingValue{std::move(value), std::move(future)};
}

std::vector<PendingValue> pending_;

// Small values with delayed serialization
absl::flat_hash_map<size_t /* page */, std::vector<DelayedValue>> delayed_;
// Largest entries in delayed map
absl::btree_set<std::pair<size_t /* size */, size_t /* page */>, std::greater<>> delayed_sizes_;
};

SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode)
: db_slice_(slice), dest_(dest), compression_mode_(compression_mode) {
: db_slice_(slice),
dest_(dest),
tiered_serializer_(new TieredSerializer{}),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to initialize it unconditionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷🏻‍♂️ I don't think it's expensive

compression_mode_(compression_mode) {
db_array_ = slice->databases();
tl_slice_snapshots.insert(this);
}
Expand Down Expand Up @@ -308,12 +381,7 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
}

if (pv.IsExternal()) {
// We can't block, so we just schedule a tiered read and append it to the delayed entries
util::fb2::Future<PrimeValue> future;
EngineShard::tlocal()->tiered_storage()->Read(
db_indx, pk.ToString(), pv,
[future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); });
delayed_entries_.push_back({db_indx, PrimeKey(pk.ToString()), std::move(future), expire_time});
tiered_serializer_->Save(db_indx, pk, pv, expire_time);
++type_freq_map_[RDB_TYPE_STRING];
} else {
io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time, db_indx);
Expand All @@ -335,29 +403,19 @@ size_t SliceSnapshot::Serialize() {
DbRecord db_rec{.id = id, .value = std::move(sfile.val)};

dest_->Push(std::move(db_rec));
if (serialized != 0) {
VLOG(2) << "Pushed with Serialize() " << serialized << " bytes";
}

VLOG_IF(2, serialized != 0) << "Pushed with Serialize() " << serialized << " bytes";
return serialized;
}

bool SliceSnapshot::PushSerializedToChannel(bool force) {
tiered_serializer_->FlushDelayed(force);
tiered_serializer_->Serialize(serializer_.get());

if (!force && serializer_->SerializedLen() < 4096)
return false;

// Flush any of the leftovers to avoid interleavings
const auto serialized = Serialize();

// Bucket serialization might have accumulated some delayed values.
// Because we can finally block in this function, we'll await and serialize them
while (!delayed_entries_.empty()) {
auto& entry = delayed_entries_.back();
serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid);
delayed_entries_.pop_back();
}

const auto total_serialized = Serialize() + serialized;
return total_serialized > 0;
return Serialize() > 0;
}

void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
Expand All @@ -377,6 +435,9 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
stats_.side_saved += SerializeBucket(db_index, it);
});
}

// Flush tiered delayed entries to avoid reordering with journal
tiered_serializer_->FlushDelayed(true);
}
Comment on lines 437 to 441
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we pass the value with ChangeReq& and find out whether it was tiered, only if it's tiered and part of a relevant page, we should flush it


// For any key any journal entry must arrive at the replica strictly after its first original rdb
Expand Down
12 changes: 3 additions & 9 deletions src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class SliceSnapshot {
void Cancel();

private:
struct TieredSerializer;

// Main fiber that iterates over all buckets in the db slice
// and submits them to SerializeBucket.
void IterateBucketsFb(const Cancellation* cll, bool send_full_sync_cut);
Expand Down Expand Up @@ -140,14 +142,6 @@ class SliceSnapshot {
RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const;

private:
// An entry whose value must be awaited
struct DelayedEntry {
DbIndex dbid;
CompactObj key;
util::fb2::Future<PrimeValue> value;
time_t expire;
};

DbSlice* db_slice_;
DbTableArray db_array_;

Expand All @@ -157,7 +151,7 @@ class SliceSnapshot {
DbIndex current_db_;

std::unique_ptr<RdbSerializer> serializer_;
std::vector<DelayedEntry> delayed_entries_; // collected during atomic bucket traversal
std::unique_ptr<TieredSerializer> tiered_serializer_;

// Used for sanity checks.
bool serialize_bucket_running_ = false;
Expand Down
14 changes: 14 additions & 0 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,14 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
return db_slice_.memory_budget() - memory_margin_ - value_len > 0;
}

void ClearDelayed() {
for (auto segment : delayed_deletes_)
OpManager::DeleteOffloaded(segment);
delayed_deletes_.clear();
}

int64_t memory_margin_ = 0;
std::vector<tiering::DiskSegment> delayed_deletes_;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: better naming


struct {
size_t total_stashes = 0, total_cancels = 0, total_fetches = 0;
Expand Down Expand Up @@ -228,6 +235,11 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {

auto bin = ts_->bins_->Delete(segment);
if (bin.empty) {
if (SliceSnapshot::IsSnaphotInProgress()) {
delayed_deletes_.push_back(segment);
return false;
}

return true;
}

Expand Down Expand Up @@ -422,6 +434,8 @@ void TieredStorage::RunOffloading(DbIndex dbid) {
if (SliceSnapshot::IsSnaphotInProgress())
return;

op_manager_->ClearDelayed();

// Don't run offloading if there's only very little space left
auto disk_stats = op_manager_->GetStats().disk_stats;
if (disk_stats.allocated_bytes + kMaxIterations / 2 * tiering::kPageSize >
Expand Down
8 changes: 4 additions & 4 deletions tests/dragonfly/snapshot_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,10 @@ async def test_bgsave_and_save(async_client: aioredis.Redis):
async def test_tiered_entries(async_client: aioredis.Redis):
"""This test makes sure tieried entries are correctly persisted"""

# With variance 4: 512 - 8192 we include small and large values
await StaticSeeder(key_target=5000, data_size=1024, variance=4, types=["STRING"]).run(
async_client
)
# With variance 8: 128 - 8192 we include small and large values
await StaticSeeder(
key_target=6000, data_size=1024, variance=8, types=["STRING"], samples=30
).run(async_client)

# Compute the capture, this brings all items back to memory... so we'll wait for offloading
start_capture = await StaticSeeder.capture(async_client)
Expand Down
Loading