diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index f3036005453e..09f7e6ffbd5b 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -4,6 +4,7 @@ #include "server/snapshot.h" +#include #include #include #include @@ -37,8 +38,80 @@ size_t SliceSnapshot::DbRecord::size() const { return HeapSize(value); } +struct SliceSnapshot::TieredSerializer { + struct ValueBase { + DbIndex dbid; + CompactObj key; + time_t expire; + }; + + using PendingValue = std::pair>; + using DelayedValue = std::pair; + + static const int kMaxPageAccum = 100000; + + void Save(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv, time_t expire_time) { + ValueBase base{db_indx, PrimeKey(pk.ToString()), expire_time}; + + // Delay serialization of small values to possibly find more for the same page, + // reducing the total number of issued io requests. + if (auto [offset, size] = pv.GetExternalSlice(); size < TieredStorage::kMinOccupancySize) { + PrimeValue pv_copy; + pv_copy.ImportExternal(pv); + size_t page = offset / tiering::kPageSize; + auto& entries = delayed_[page]; + { + delayed_sizes_.erase({entries.size(), page}); + entries.emplace_back(std::move(base), std::move(pv_copy)); + delayed_sizes_.insert({entries.size(), page}); + } + } else { + pending_.push_back(Read(std::move(base), pv)); + } + } + + void FlushDelayed(bool force) { + // Flush pages with most records accumulated first, or all, if forced. + // It's enough just to issue reads, because they are collapsed by the tiered storage internally + while ((force && !delayed_.empty()) || delayed_.size() > kMaxPageAccum) { + DCHECK(!delayed_sizes_.empty()); + auto [size, page] = delayed_sizes_.extract(delayed_sizes_.begin()).value(); + auto entries = delayed_.extract(page); + for (auto& [base, value] : entries.mapped()) { + DCHECK(value.IsExternal()); + pending_.push_back(Read(std::move(base), value)); + } + } + } + + void Serialize(RdbSerializer* serializer) { + for (auto& [base, value] : pending_) + serializer->SaveEntry(base.key, value.Get(), base.expire, base.dbid); + pending_.clear(); + } + + private: + static PendingValue Read(ValueBase value, const PrimeValue& pv) { + auto* ts = EngineShard::tlocal()->tiered_storage(); + util::fb2::Future future; // store PrimeValue directly to avoid further copies + ts->Read(value.dbid, value.key.ToString(), pv, + [future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); }); + return PendingValue{std::move(value), std::move(future)}; + } + + std::vector pending_; + + // Small values with delayed serialization + absl::flat_hash_map> delayed_; + // Largest entries in delayed map + absl::btree_set, std::greater<>> delayed_sizes_; +}; + SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode) - : db_slice_(slice), dest_(dest), compression_mode_(compression_mode) { + : db_slice_(slice), + dest_(dest), + tiered_serializer_(new TieredSerializer{}), + compression_mode_(compression_mode) { db_array_ = slice->databases(); tl_slice_snapshots.insert(this); } @@ -308,12 +381,7 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr } if (pv.IsExternal()) { - // We can't block, so we just schedule a tiered read and append it to the delayed entries - util::fb2::Future future; - EngineShard::tlocal()->tiered_storage()->Read( - db_indx, pk.ToString(), pv, - [future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); }); - delayed_entries_.push_back({db_indx, PrimeKey(pk.ToString()), std::move(future), expire_time}); + tiered_serializer_->Save(db_indx, pk, pv, expire_time); ++type_freq_map_[RDB_TYPE_STRING]; } else { io::Result res = serializer->SaveEntry(pk, pv, expire_time, db_indx); @@ -335,29 +403,19 @@ size_t SliceSnapshot::Serialize() { DbRecord db_rec{.id = id, .value = std::move(sfile.val)}; dest_->Push(std::move(db_rec)); - if (serialized != 0) { - VLOG(2) << "Pushed with Serialize() " << serialized << " bytes"; - } + + VLOG_IF(2, serialized != 0) << "Pushed with Serialize() " << serialized << " bytes"; return serialized; } bool SliceSnapshot::PushSerializedToChannel(bool force) { + tiered_serializer_->FlushDelayed(force); + tiered_serializer_->Serialize(serializer_.get()); + if (!force && serializer_->SerializedLen() < 4096) return false; - // Flush any of the leftovers to avoid interleavings - const auto serialized = Serialize(); - - // Bucket serialization might have accumulated some delayed values. - // Because we can finally block in this function, we'll await and serialize them - while (!delayed_entries_.empty()) { - auto& entry = delayed_entries_.back(); - serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid); - delayed_entries_.pop_back(); - } - - const auto total_serialized = Serialize() + serialized; - return total_serialized > 0; + return Serialize() > 0; } void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { @@ -377,6 +435,9 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) stats_.side_saved += SerializeBucket(db_index, it); }); } + + // Flush tiered delayed entries to avoid reordering with journal + tiered_serializer_->FlushDelayed(true); } // For any key any journal entry must arrive at the replica strictly after its first original rdb diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 95529dca3beb..a42b178a7379 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -91,6 +91,8 @@ class SliceSnapshot { void Cancel(); private: + struct TieredSerializer; + // Main fiber that iterates over all buckets in the db slice // and submits them to SerializeBucket. void IterateBucketsFb(const Cancellation* cll, bool send_full_sync_cut); @@ -140,14 +142,6 @@ class SliceSnapshot { RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const; private: - // An entry whose value must be awaited - struct DelayedEntry { - DbIndex dbid; - CompactObj key; - util::fb2::Future value; - time_t expire; - }; - DbSlice* db_slice_; DbTableArray db_array_; @@ -157,7 +151,7 @@ class SliceSnapshot { DbIndex current_db_; std::unique_ptr serializer_; - std::vector delayed_entries_; // collected during atomic bucket traversal + std::unique_ptr tiered_serializer_; // Used for sanity checks. bool serialize_bucket_running_ = false; diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 984c3dc73778..272021a4671f 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -158,7 +158,14 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { return db_slice_.memory_budget() - memory_margin_ - value_len > 0; } + void ClearDelayed() { + for (auto segment : delayed_deletes_) + OpManager::DeleteOffloaded(segment); + delayed_deletes_.clear(); + } + int64_t memory_margin_ = 0; + std::vector delayed_deletes_; struct { size_t total_stashes = 0, total_cancels = 0, total_fetches = 0; @@ -228,6 +235,11 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) { auto bin = ts_->bins_->Delete(segment); if (bin.empty) { + if (SliceSnapshot::IsSnaphotInProgress()) { + delayed_deletes_.push_back(segment); + return false; + } + return true; } @@ -422,6 +434,8 @@ void TieredStorage::RunOffloading(DbIndex dbid) { if (SliceSnapshot::IsSnaphotInProgress()) return; + op_manager_->ClearDelayed(); + // Don't run offloading if there's only very little space left auto disk_stats = op_manager_->GetStats().disk_stats; if (disk_stats.allocated_bytes + kMaxIterations / 2 * tiering::kPageSize > diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index ca6f04c9885f..8d9880faf6f6 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -430,10 +430,10 @@ async def test_bgsave_and_save(async_client: aioredis.Redis): async def test_tiered_entries(async_client: aioredis.Redis): """This test makes sure tieried entries are correctly persisted""" - # With variance 4: 512 - 8192 we include small and large values - await StaticSeeder(key_target=5000, data_size=1024, variance=4, types=["STRING"]).run( - async_client - ) + # With variance 8: 128 - 8192 we include small and large values + await StaticSeeder( + key_target=6000, data_size=1024, variance=8, types=["STRING"], samples=30 + ).run(async_client) # Compute the capture, this brings all items back to memory... so we'll wait for offloading start_capture = await StaticSeeder.capture(async_client)