Skip to content

Commit

Permalink
fix: separate Heartbeat and ShardHandler to fibers
Browse files Browse the repository at this point in the history
Signed-off-by: kostas <[email protected]>
  • Loading branch information
kostasrim committed Oct 16, 2024
1 parent fbfe5d5 commit 0e8405e
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 19 deletions.
73 changes: 60 additions & 13 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,28 +386,53 @@ void EngineShard::Shutdown() {

queue_.Shutdown();
queue2_.Shutdown();
DCHECK(!fiber_periodic_.IsJoinable());
DCHECK(!fiber_heartbeat_periodic_.IsJoinable());
DCHECK(!fiber_shard_handler_periodic_.IsJoinable());

ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
}

void EngineShard::StopPeriodicFiber() {
fiber_periodic_done_.Notify();
if (fiber_periodic_.IsJoinable()) {
fiber_periodic_.Join();
fiber_heartbeat_periodic_done_.Notify();
if (fiber_heartbeat_periodic_.IsJoinable()) {
fiber_heartbeat_periodic_.Join();
}
fiber_shard_handler_periodic_done_.Notify();
if (fiber_shard_handler_periodic_.IsJoinable()) {
fiber_shard_handler_periodic_.Join();
}
}

void EngineShard::StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> global_handler) {
void EngineShard::StartPeriodicFiberImpl(util::ProactorBase* pb,
std::function<void()> shard_handler, bool heartbeat) {
uint32_t clock_cycle_ms = 1000 / std::max<uint32_t>(1, GetFlag(FLAGS_hz));
if (clock_cycle_ms == 0)
clock_cycle_ms = 1;

fiber_periodic_ = MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms,
handler = std::move(global_handler)] {
ThisFiber::SetName(absl::StrCat("shard_periodic", index));
RunPeriodic(std::chrono::milliseconds(period_ms), std::move(handler));
});
if (heartbeat) {
fiber_heartbeat_periodic_ =
MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms,
handler = std::move(shard_handler)]() mutable {
ThisFiber::SetName(absl::StrCat("heartbeat_periodic", index));
RunHeartbeatPeriodic(std::chrono::milliseconds(period_ms), std::move(handler));
});
} else {
fiber_shard_handler_periodic_ =
MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms,
handler = std::move(shard_handler)]() mutable {
ThisFiber::SetName(absl::StrCat("shard_handler_periodic", index));
RunShardHandlerPeriodic(std::chrono::milliseconds(period_ms), std::move(handler));
});
}
}

void EngineShard::StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> shard_handler) {
StartPeriodicFiberImpl(pb, std::move(shard_handler), true);
}

void EngineShard::StartPeriodicFiberWithoutHeartbeat(util::ProactorBase* pb,
std::function<void()> shard_handler) {
StartPeriodicFiberImpl(pb, std::move(shard_handler), false);
}

void EngineShard::InitThreadLocal(ProactorBase* pb) {
Expand Down Expand Up @@ -689,15 +714,15 @@ void EngineShard::RetireExpiredAndEvict() {
}
}

void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms,
std::function<void()> shard_handler) {
void EngineShard::RunHeartbeatPeriodic(std::chrono::milliseconds period_ms,
std::function<void()> shard_handler) {
VLOG(1) << "RunPeriodic with period " << period_ms.count() << "ms";

int64_t last_heartbeat_ms = INT64_MAX;
int64_t last_handler_ms = 0;

while (true) {
if (fiber_periodic_done_.WaitFor(period_ms)) {
if (fiber_heartbeat_periodic_done_.WaitFor(period_ms)) {
VLOG(2) << "finished running engine shard periodic task";
return;
}
Expand All @@ -715,6 +740,28 @@ void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms,
}
}

void EngineShard::RunShardHandlerPeriodic(std::chrono::milliseconds period_ms,
std::function<void()> shard_handler) {
VLOG(1) << "RunShardHandlerPeriodic with period " << period_ms.count() << "ms";

int64_t last_handler_ms = INT64_MAX;

while (true) {
if (fiber_shard_handler_periodic_done_.WaitFor(period_ms)) {
VLOG(2) << "finished running engine shard periodic task";
return;
}

int64_t now_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
if (now_ms - 5 * period_ms.count() > last_handler_ms) {
VLOG(1) << "This shard handler/sleep without heartbeat took " << now_ms - last_handler_ms
<< "ms";
}
last_handler_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
shard_handler();
}
}

void EngineShard::CacheStats() {
uint64_t now = fb2::ProactorBase::GetMonotonicTimeNs();
if (cache_stats_time_ + 1000000 > now) // 1ms
Expand Down
16 changes: 13 additions & 3 deletions src/server/engine_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,18 @@ class EngineShard {
void Shutdown(); // called before destructing EngineShard.

void StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> shard_handler);
void StartPeriodicFiberWithoutHeartbeat(util::ProactorBase* pb,
std::function<void()> shard_handler);
void StartPeriodicFiberImpl(util::ProactorBase* pb, std::function<void()> shard_handler,
bool heartbeat);

void Heartbeat();
void RetireExpiredAndEvict();

void RunPeriodic(std::chrono::milliseconds period_ms, std::function<void()> shard_handler);
void RunHeartbeatPeriodic(std::chrono::milliseconds period_ms,
std::function<void()> shard_handler);
void RunShardHandlerPeriodic(std::chrono::milliseconds period_ms,
std::function<void()> shard_handler);

void CacheStats();

Expand Down Expand Up @@ -247,8 +254,11 @@ class EngineShard {
IntentLock shard_lock_;

uint32_t defrag_task_ = 0;
util::fb2::Fiber fiber_periodic_;
util::fb2::Done fiber_periodic_done_;
util::fb2::Fiber fiber_heartbeat_periodic_;
util::fb2::Done fiber_heartbeat_periodic_done_;

util::fb2::Fiber fiber_shard_handler_periodic_;
util::fb2::Done fiber_shard_handler_periodic_done_;

DefragTaskState defrag_state_;
std::unique_ptr<TieredStorage> tiered_storage_;
Expand Down
3 changes: 2 additions & 1 deletion src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ void EngineShardSet::Init(uint32_t sz, std::function<void()> shard_handler) {

// Must be last, as it accesses objects initialized above.
// We can not move shard_handler because this code is called multiple times.
shard->StartPeriodicFiber(pb, shard_handler);
shard->StartPeriodicFiber(pb, {});
shard->StartPeriodicFiberWithoutHeartbeat(pb, shard_handler);
}
});
}
Expand Down
1 change: 0 additions & 1 deletion src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ class EngineShardSet {

private:
void InitThreadLocal(util::ProactorBase* pb);

util::ProactorPool* pp_;
std::unique_ptr<EngineShard*[]> shards_;
uint32_t size_ = 0;
Expand Down
51 changes: 50 additions & 1 deletion tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2322,7 +2322,7 @@ async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory,

await c_replica.execute_command(
"debug replica pause"
) # puase replica to trigger reconnect on master
) # pause replica to trigger reconnect on master

await asyncio.sleep(1)

Expand Down Expand Up @@ -2631,3 +2631,52 @@ async def test_replica_of_replica(df_factory):
await c_replica2.execute_command(f"REPLICAOF localhost {replica.port}")

assert await c_replica2.execute_command(f"REPLICAOF localhost {master.port}") == "OK"


@pytest.mark.asyncio
async def test_replication_timeout_on_full_sync_heartbeat_expiry(
df_factory: DflyInstanceFactory, df_seeder_factory
):
# setting replication_timeout to a very small value to force the replica to timeout
master = df_factory.create(
proactor_threads=2, replication_timeout=100, vmodule="replica=2,dflycmd=2"
)
replica = df_factory.create()

df_factory.start_all([master, replica])

c_master = master.client()
c_replica = replica.client()

await c_master.execute_command("debug", "populate", "200000", "foo", "5000")
seeder = df_seeder_factory.create(port=master.port)
seeder_task = asyncio.create_task(seeder.run())

await asyncio.sleep(0.5) # wait for seeder running

await c_replica.execute_command(f"REPLICAOF localhost {master.port}")

# wait for full sync
async with async_timeout.timeout(3):
await wait_for_replicas_state(c_replica, state="full_sync", timeout=0.05)

for i in range(1, 10000):
await c_master.execute_command(f"SET key{i} val{i} EX 2")

await c_replica.execute_command("debug replica pause")

# Dragonfly will get stack here. The journal writes to a channel which will block on write.
# Hearbeat will be called and will get stack while it tries to evict an expired item (because
# it will try to write that item to the journal which in turn will block while it writes to
# the channel). BreakStalledFlows() will never be called and the reconnect count will stay 0.

await asyncio.sleep(2)

await c_replica.execute_command("debug replica resume") # resume replication

await asyncio.sleep(1) # replica will start resync
seeder.stop()
await seeder_task

await check_all_replicas_finished([c_replica], c_master)
await assert_replica_reconnections(replica, 0)

0 comments on commit 0e8405e

Please sign in to comment.