Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: separate Heartbeat and ShardHandler to fibers #3936

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 63 additions & 13 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -391,28 +391,53 @@ void EngineShard::Shutdown() {

queue_.Shutdown();
queue2_.Shutdown();
DCHECK(!fiber_periodic_.IsJoinable());
DCHECK(!fiber_heartbeat_periodic_.IsJoinable());
DCHECK(!fiber_shard_handler_periodic_.IsJoinable());

ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
}

void EngineShard::StopPeriodicFiber() {
fiber_periodic_done_.Notify();
if (fiber_periodic_.IsJoinable()) {
fiber_periodic_.Join();
fiber_heartbeat_periodic_done_.Notify();
if (fiber_heartbeat_periodic_.IsJoinable()) {
fiber_heartbeat_periodic_.Join();
}
fiber_shard_handler_periodic_done_.Notify();
if (fiber_shard_handler_periodic_.IsJoinable()) {
fiber_shard_handler_periodic_.Join();
}
}

void EngineShard::StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> global_handler) {
void EngineShard::StartPeriodicFiberImpl(util::ProactorBase* pb,
std::function<void()> shard_handler, bool heartbeat) {
uint32_t clock_cycle_ms = 1000 / std::max<uint32_t>(1, GetFlag(FLAGS_hz));
if (clock_cycle_ms == 0)
clock_cycle_ms = 1;

fiber_periodic_ = MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms,
handler = std::move(global_handler)] {
ThisFiber::SetName(absl::StrCat("shard_periodic", index));
RunPeriodic(std::chrono::milliseconds(period_ms), std::move(handler));
});
if (heartbeat) {
fiber_heartbeat_periodic_ =
MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms,
handler = std::move(shard_handler)]() mutable {
ThisFiber::SetName(absl::StrCat("heartbeat_periodic", index));
RunHeartbeatPeriodic(std::chrono::milliseconds(period_ms), std::move(handler));
});
} else {
fiber_shard_handler_periodic_ =
MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the condition in origin code RunPeriodic to run the shard handler
if (shard_handler && last_handler_ms + 100 < last_heartbeat_ms)

we run shard_handler every 100 ms this is unrelated to to FLAGS_hz
I believe this function StartPeriodicFiberImpl is rednudant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on the condition implicitly when the period is greater than 100 ms because we wait period_ms. So if let's say, period_ms=500 the shard handler will run every 500ms (we always wait period_ms between runs) so that 100 is really a minimum. I refactored this a little bit to mirror this behaviour

handler = std::move(shard_handler)]() mutable {
ThisFiber::SetName(absl::StrCat("shard_handler_periodic", index));
RunShardHandlerPeriodic(std::chrono::milliseconds(period_ms), std::move(handler));
});
}
}

void EngineShard::StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> shard_handler) {
StartPeriodicFiberImpl(pb, std::move(shard_handler), true);
}

void EngineShard::StartPeriodicFiberWithoutHeartbeat(util::ProactorBase* pb,
std::function<void()> shard_handler) {
StartPeriodicFiberImpl(pb, std::move(shard_handler), false);
}

void EngineShard::InitThreadLocal(ProactorBase* pb) {
Expand Down Expand Up @@ -694,15 +719,15 @@ void EngineShard::RetireExpiredAndEvict() {
}
}

void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms,
std::function<void()> shard_handler) {
void EngineShard::RunHeartbeatPeriodic(std::chrono::milliseconds period_ms,
std::function<void()> shard_handler) {
VLOG(1) << "RunPeriodic with period " << period_ms.count() << "ms";

int64_t last_heartbeat_ms = INT64_MAX;
int64_t last_handler_ms = 0;

while (true) {
if (fiber_periodic_done_.WaitFor(period_ms)) {
if (fiber_heartbeat_periodic_done_.WaitFor(period_ms)) {
VLOG(2) << "finished running engine shard periodic task";
return;
}
Expand All @@ -720,6 +745,31 @@ void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms,
}
}

void EngineShard::RunShardHandlerPeriodic(std::chrono::milliseconds period_ms,
std::function<void()> shard_handler) {
VLOG(1) << "RunShardHandlerPeriodic with period " << period_ms.count() << "ms";

int64_t last_handler_ms = INT64_MAX;

while (true) {
if (fiber_shard_handler_periodic_done_.WaitFor(period_ms)) {
VLOG(2) << "finished running engine shard periodic task";
return;
}

int64_t now_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
if (now_ms - 5 * period_ms.count() > last_handler_ms) {
VLOG(1) << "This shard handler/sleep without heartbeat took " << now_ms - last_handler_ms
<< "ms";
}
last_handler_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
// We need to check cause some tests pass an empty shard_handler
if (shard_handler) {
shard_handler();
}
}
}

void EngineShard::CacheStats() {
uint64_t now = fb2::ProactorBase::GetMonotonicTimeNs();
if (cache_stats_time_ + 1000000 > now) // 1ms
Expand Down
16 changes: 13 additions & 3 deletions src/server/engine_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,18 @@ class EngineShard {
void Shutdown(); // called before destructing EngineShard.

void StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> shard_handler);
void StartPeriodicFiberWithoutHeartbeat(util::ProactorBase* pb,
std::function<void()> shard_handler);
void StartPeriodicFiberImpl(util::ProactorBase* pb, std::function<void()> shard_handler,
bool heartbeat);

void Heartbeat();
void RetireExpiredAndEvict();

void RunPeriodic(std::chrono::milliseconds period_ms, std::function<void()> shard_handler);
void RunHeartbeatPeriodic(std::chrono::milliseconds period_ms,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does heartbeat still gets shard handler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep the callback just in case we want to use it the future but I agree it redundant. I will remove

std::function<void()> shard_handler);
void RunShardHandlerPeriodic(std::chrono::milliseconds period_ms,
std::function<void()> shard_handler);

void CacheStats();

Expand Down Expand Up @@ -253,8 +260,11 @@ class EngineShard {
IntentLock shard_lock_;

uint32_t defrag_task_ = 0;
util::fb2::Fiber fiber_periodic_;
util::fb2::Done fiber_periodic_done_;
util::fb2::Fiber fiber_heartbeat_periodic_;
util::fb2::Done fiber_heartbeat_periodic_done_;

util::fb2::Fiber fiber_shard_handler_periodic_;
util::fb2::Done fiber_shard_handler_periodic_done_;

DefragTaskState defrag_state_;
std::unique_ptr<TieredStorage> tiered_storage_;
Expand Down
3 changes: 2 additions & 1 deletion src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ void EngineShardSet::Init(uint32_t sz, std::function<void()> shard_handler) {

// Must be last, as it accesses objects initialized above.
// We can not move shard_handler because this code is called multiple times.
shard->StartPeriodicFiber(pb, shard_handler);
shard->StartPeriodicFiber(pb, {});
shard->StartPeriodicFiberWithoutHeartbeat(pb, shard_handler);
}
});
}
Expand Down
1 change: 0 additions & 1 deletion src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ class EngineShardSet {

private:
void InitThreadLocal(util::ProactorBase* pb);

util::ProactorPool* pp_;
std::unique_ptr<EngineShard*[]> shards_;
uint32_t size_ = 0;
Expand Down
78 changes: 75 additions & 3 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2295,7 +2295,9 @@ async def test_announce_ip_port(df_factory):
@pytest.mark.asyncio
async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory, df_seeder_factory):
# setting replication_timeout to a very small value to force the replica to timeout
master = df_factory.create(replication_timeout=100, vmodule="replica=2,dflycmd=2")
master = df_factory.create(
proactor_threads=2, replication_timeout=100, vmodule="replica=2,dflycmd=2"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, no need of proactor_threads=2

)
replica = df_factory.create()

df_factory.start_all([master, replica])
Expand All @@ -2317,9 +2319,9 @@ async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory,

await c_replica.execute_command(
"debug replica pause"
) # puase replica to trigger reconnect on master
) # pause replica to trigger reconnect on master

await asyncio.sleep(1)
await asyncio.sleep(10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, it was accidental


await c_replica.execute_command("debug replica resume") # resume replication

Expand Down Expand Up @@ -2626,3 +2628,73 @@ async def test_replica_of_replica(df_factory):
await c_replica2.execute_command(f"REPLICAOF localhost {replica.port}")

assert await c_replica2.execute_command(f"REPLICAOF localhost {master.port}") == "OK"


@pytest.mark.asyncio
async def test_replication_timeout_on_full_sync_heartbeat_expiry(
df_factory: DflyInstanceFactory, df_seeder_factory
):
# Timeout set to 3 seconds because we must first saturate the socket such that subsequent
# writes block. Otherwise, we will break the flows before Heartbeat actually deadlocks.
master = df_factory.create(
proactor_threads=2, replication_timeout=3000, vmodule="replica=2,dflycmd=2"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why specify proactor_threads=2 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it was easier to reproduce with 2 threads. I haven;t tried with more so maybe the test requires tuning.

)
replica = df_factory.create()

df_factory.start_all([master, replica])

c_master = master.client()
c_replica = replica.client()

await c_master.execute_command("debug", "populate", "100000", "foo", "5000")

class ExpirySeeder:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put in utility.py

def __init__(self):
self.stop_flag = False
self.i = 0

async def run(self, client):
while not self.stop_flag:
await client.execute_command(f"SET tmp{self.i} bar{self.i} EX 4")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use pipeline to speed up things

self.i = self.i + 1

async def wait_until(self, count):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_untill_n_inserts or something like this

while not self.i > count:
await asyncio.sleep(0.5)

def stop(self):
self.stop_flag = True

c_master = master.client()
c_replica = replica.client()

seeder = ExpirySeeder()
seeder_task = asyncio.create_task(seeder.run(c_master))
await seeder.wait_until(50000)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the expire seeder will stop setting here wouldnt the bug will reproduce as well?
do we need to continue sending commands to get to the bug?


await c_replica.execute_command(f"REPLICAOF localhost {master.port}")

# wait for full sync
async with async_timeout.timeout(3):
await wait_for_replicas_state(c_replica, state="full_sync", timeout=0.05)

await c_replica.execute_command("debug replica pause")

# Dragonfly will get stuck here. The journal writes to a channel which will block on write.
# Hearbeat() will be called and will get stuck while it tries to evict an expired item (because
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a. please write "expires" and not "evict" expired item
b. I would not write blocked while pushes to channel as this is very specific implemenation detail which also going to change. The reason we get stuck is that we block on write to socket as replica does not reads from it
c. please dont write that BreakStalledFlows() will never be called as this is your bug fix, I would describe the bug and this details in github bug issue

# it will try to write that item to the journal which in turn will block while it pushes to
# the channel). BreakStalledFlows() will never be called and the reconnect count will stay 0.
# Furthermore, that's why we pick 3 seconds for the replica to timeout. If it was less,
# we would not reach this state because the flow would break before the channel gets filled
# (even though the write to sink from the channel is blocked).

await asyncio.sleep(6)

await c_replica.execute_command("debug replica resume") # resume replication

await asyncio.sleep(1) # replica will start resync
seeder.stop()
await seeder_task

await check_all_replicas_finished([c_replica], c_master)
await assert_replica_reconnections(replica, 0)
Loading