diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 7353cacd9e12..6e5400dd8ae7 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -386,28 +386,53 @@ void EngineShard::Shutdown() { queue_.Shutdown(); queue2_.Shutdown(); - DCHECK(!fiber_periodic_.IsJoinable()); + DCHECK(!fiber_heartbeat_periodic_.IsJoinable()); + DCHECK(!fiber_shard_handler_periodic_.IsJoinable()); ProactorBase::me()->RemoveOnIdleTask(defrag_task_); } void EngineShard::StopPeriodicFiber() { - fiber_periodic_done_.Notify(); - if (fiber_periodic_.IsJoinable()) { - fiber_periodic_.Join(); + fiber_heartbeat_periodic_done_.Notify(); + if (fiber_heartbeat_periodic_.IsJoinable()) { + fiber_heartbeat_periodic_.Join(); + } + fiber_shard_handler_periodic_done_.Notify(); + if (fiber_shard_handler_periodic_.IsJoinable()) { + fiber_shard_handler_periodic_.Join(); } } -void EngineShard::StartPeriodicFiber(util::ProactorBase* pb, std::function global_handler) { +void EngineShard::StartPeriodicFiberImpl(util::ProactorBase* pb, + std::function shard_handler, bool heartbeat) { uint32_t clock_cycle_ms = 1000 / std::max(1, GetFlag(FLAGS_hz)); if (clock_cycle_ms == 0) clock_cycle_ms = 1; - fiber_periodic_ = MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms, - handler = std::move(global_handler)] { - ThisFiber::SetName(absl::StrCat("shard_periodic", index)); - RunPeriodic(std::chrono::milliseconds(period_ms), std::move(handler)); - }); + if (heartbeat) { + fiber_heartbeat_periodic_ = + MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms, + handler = std::move(shard_handler)]() mutable { + ThisFiber::SetName(absl::StrCat("heartbeat_periodic", index)); + RunHeartbeatPeriodic(std::chrono::milliseconds(period_ms), std::move(handler)); + }); + } else { + fiber_shard_handler_periodic_ = + MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms, + handler = std::move(shard_handler)]() mutable { + ThisFiber::SetName(absl::StrCat("shard_handler_periodic", index)); + RunShardHandlerPeriodic(std::chrono::milliseconds(period_ms), std::move(handler)); + }); + } +} + +void EngineShard::StartPeriodicFiber(util::ProactorBase* pb, std::function shard_handler) { + StartPeriodicFiberImpl(pb, std::move(shard_handler), true); +} + +void EngineShard::StartPeriodicFiberWithoutHeartbeat(util::ProactorBase* pb, + std::function shard_handler) { + StartPeriodicFiberImpl(pb, std::move(shard_handler), false); } void EngineShard::InitThreadLocal(ProactorBase* pb) { @@ -689,15 +714,15 @@ void EngineShard::RetireExpiredAndEvict() { } } -void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms, - std::function shard_handler) { +void EngineShard::RunHeartbeatPeriodic(std::chrono::milliseconds period_ms, + std::function shard_handler) { VLOG(1) << "RunPeriodic with period " << period_ms.count() << "ms"; int64_t last_heartbeat_ms = INT64_MAX; int64_t last_handler_ms = 0; while (true) { - if (fiber_periodic_done_.WaitFor(period_ms)) { + if (fiber_heartbeat_periodic_done_.WaitFor(period_ms)) { VLOG(2) << "finished running engine shard periodic task"; return; } @@ -715,6 +740,28 @@ void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms, } } +void EngineShard::RunShardHandlerPeriodic(std::chrono::milliseconds period_ms, + std::function shard_handler) { + VLOG(1) << "RunShardHandlerPeriodic with period " << period_ms.count() << "ms"; + + int64_t last_handler_ms = INT64_MAX; + + while (true) { + if (fiber_shard_handler_periodic_done_.WaitFor(period_ms)) { + VLOG(2) << "finished running engine shard periodic task"; + return; + } + + int64_t now_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000; + if (now_ms - 5 * period_ms.count() > last_handler_ms) { + VLOG(1) << "This shard handler/sleep without heartbeat took " << now_ms - last_handler_ms + << "ms"; + } + last_handler_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000; + shard_handler(); + } +} + void EngineShard::CacheStats() { uint64_t now = fb2::ProactorBase::GetMonotonicTimeNs(); if (cache_stats_time_ + 1000000 > now) // 1ms diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index 262914b2cb4b..5a115e996af6 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -201,11 +201,18 @@ class EngineShard { void Shutdown(); // called before destructing EngineShard. void StartPeriodicFiber(util::ProactorBase* pb, std::function shard_handler); + void StartPeriodicFiberWithoutHeartbeat(util::ProactorBase* pb, + std::function shard_handler); + void StartPeriodicFiberImpl(util::ProactorBase* pb, std::function shard_handler, + bool heartbeat); void Heartbeat(); void RetireExpiredAndEvict(); - void RunPeriodic(std::chrono::milliseconds period_ms, std::function shard_handler); + void RunHeartbeatPeriodic(std::chrono::milliseconds period_ms, + std::function shard_handler); + void RunShardHandlerPeriodic(std::chrono::milliseconds period_ms, + std::function shard_handler); void CacheStats(); @@ -247,8 +254,11 @@ class EngineShard { IntentLock shard_lock_; uint32_t defrag_task_ = 0; - util::fb2::Fiber fiber_periodic_; - util::fb2::Done fiber_periodic_done_; + util::fb2::Fiber fiber_heartbeat_periodic_; + util::fb2::Done fiber_heartbeat_periodic_done_; + + util::fb2::Fiber fiber_shard_handler_periodic_; + util::fb2::Done fiber_shard_handler_periodic_done_; DefragTaskState defrag_state_; std::unique_ptr tiered_storage_; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 0d6fa5c13c8b..c35cc753cc1d 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -121,7 +121,8 @@ void EngineShardSet::Init(uint32_t sz, std::function shard_handler) { // Must be last, as it accesses objects initialized above. // We can not move shard_handler because this code is called multiple times. - shard->StartPeriodicFiber(pb, shard_handler); + shard->StartPeriodicFiber(pb, {}); + shard->StartPeriodicFiberWithoutHeartbeat(pb, shard_handler); } }); } diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index a49f70b487d1..cc6123d0bb19 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -114,7 +114,6 @@ class EngineShardSet { private: void InitThreadLocal(util::ProactorBase* pb); - util::ProactorPool* pp_; std::unique_ptr shards_; uint32_t size_ = 0; diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 924c75f3ee9b..71242c663ca1 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2322,7 +2322,7 @@ async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory, await c_replica.execute_command( "debug replica pause" - ) # puase replica to trigger reconnect on master + ) # pause replica to trigger reconnect on master await asyncio.sleep(1) @@ -2631,3 +2631,52 @@ async def test_replica_of_replica(df_factory): await c_replica2.execute_command(f"REPLICAOF localhost {replica.port}") assert await c_replica2.execute_command(f"REPLICAOF localhost {master.port}") == "OK" + + +@pytest.mark.asyncio +async def test_replication_timeout_on_full_sync_heartbeat_expiry( + df_factory: DflyInstanceFactory, df_seeder_factory +): + # setting replication_timeout to a very small value to force the replica to timeout + master = df_factory.create( + proactor_threads=2, replication_timeout=100, vmodule="replica=2,dflycmd=2" + ) + replica = df_factory.create() + + df_factory.start_all([master, replica]) + + c_master = master.client() + c_replica = replica.client() + + await c_master.execute_command("debug", "populate", "200000", "foo", "5000") + seeder = df_seeder_factory.create(port=master.port) + seeder_task = asyncio.create_task(seeder.run()) + + await asyncio.sleep(0.5) # wait for seeder running + + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + + # wait for full sync + async with async_timeout.timeout(3): + await wait_for_replicas_state(c_replica, state="full_sync", timeout=0.05) + + for i in range(1, 10000): + await c_master.execute_command(f"SET key{i} val{i} EX 2") + + await c_replica.execute_command("debug replica pause") + + # Dragonfly will get stack here. The journal writes to a channel which will block on write. + # Hearbeat will be called and will get stack while it tries to evict an expired item (because + # it will try to write that item to the journal which in turn will block while it writes to + # the channel). BreakStalledFlows() will never be called and the reconnect count will stay 0. + + await asyncio.sleep(2) + + await c_replica.execute_command("debug replica resume") # resume replication + + await asyncio.sleep(1) # replica will start resync + seeder.stop() + await seeder_task + + await check_all_replicas_finished([c_replica], c_master) + await assert_replica_reconnections(replica, 0)