-
Notifications
You must be signed in to change notification settings - Fork 943
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: separate Heartbeat and ShardHandler to fibers #3936
base: main
Are you sure you want to change the base?
Changes from 3 commits
4c3baeb
8aa1430
f08414f
b21f01d
6b4233f
9fb76ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -207,11 +207,18 @@ class EngineShard { | |
void Shutdown(); // called before destructing EngineShard. | ||
|
||
void StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> shard_handler); | ||
void StartPeriodicFiberWithoutHeartbeat(util::ProactorBase* pb, | ||
std::function<void()> shard_handler); | ||
void StartPeriodicFiberImpl(util::ProactorBase* pb, std::function<void()> shard_handler, | ||
bool heartbeat); | ||
|
||
void Heartbeat(); | ||
void RetireExpiredAndEvict(); | ||
|
||
void RunPeriodic(std::chrono::milliseconds period_ms, std::function<void()> shard_handler); | ||
void RunHeartbeatPeriodic(std::chrono::milliseconds period_ms, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why does heartbeat still gets shard handler? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to keep the callback just in case we want to use it the future but I agree it redundant. I will remove |
||
std::function<void()> shard_handler); | ||
void RunShardHandlerPeriodic(std::chrono::milliseconds period_ms, | ||
std::function<void()> shard_handler); | ||
|
||
void CacheStats(); | ||
|
||
|
@@ -253,8 +260,11 @@ class EngineShard { | |
IntentLock shard_lock_; | ||
|
||
uint32_t defrag_task_ = 0; | ||
util::fb2::Fiber fiber_periodic_; | ||
util::fb2::Done fiber_periodic_done_; | ||
util::fb2::Fiber fiber_heartbeat_periodic_; | ||
util::fb2::Done fiber_heartbeat_periodic_done_; | ||
|
||
util::fb2::Fiber fiber_shard_handler_periodic_; | ||
util::fb2::Done fiber_shard_handler_periodic_done_; | ||
|
||
DefragTaskState defrag_state_; | ||
std::unique_ptr<TieredStorage> tiered_storage_; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2295,7 +2295,9 @@ async def test_announce_ip_port(df_factory): | |
@pytest.mark.asyncio | ||
async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory, df_seeder_factory): | ||
# setting replication_timeout to a very small value to force the replica to timeout | ||
master = df_factory.create(replication_timeout=100, vmodule="replica=2,dflycmd=2") | ||
master = df_factory.create( | ||
proactor_threads=2, replication_timeout=100, vmodule="replica=2,dflycmd=2" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, no need of |
||
) | ||
replica = df_factory.create() | ||
|
||
df_factory.start_all([master, replica]) | ||
|
@@ -2317,9 +2319,9 @@ async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory, | |
|
||
await c_replica.execute_command( | ||
"debug replica pause" | ||
) # puase replica to trigger reconnect on master | ||
) # pause replica to trigger reconnect on master | ||
|
||
await asyncio.sleep(1) | ||
await asyncio.sleep(10) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this change needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nope, it was accidental |
||
|
||
await c_replica.execute_command("debug replica resume") # resume replication | ||
|
||
|
@@ -2626,3 +2628,73 @@ async def test_replica_of_replica(df_factory): | |
await c_replica2.execute_command(f"REPLICAOF localhost {replica.port}") | ||
|
||
assert await c_replica2.execute_command(f"REPLICAOF localhost {master.port}") == "OK" | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_replication_timeout_on_full_sync_heartbeat_expiry( | ||
df_factory: DflyInstanceFactory, df_seeder_factory | ||
): | ||
# Timeout set to 3 seconds because we must first saturate the socket such that subsequent | ||
# writes block. Otherwise, we will break the flows before Heartbeat actually deadlocks. | ||
master = df_factory.create( | ||
proactor_threads=2, replication_timeout=3000, vmodule="replica=2,dflycmd=2" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why specify proactor_threads=2 ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because it was easier to reproduce with 2 threads. I haven;t tried with more so maybe the test requires tuning. |
||
) | ||
replica = df_factory.create() | ||
|
||
df_factory.start_all([master, replica]) | ||
|
||
c_master = master.client() | ||
c_replica = replica.client() | ||
|
||
await c_master.execute_command("debug", "populate", "100000", "foo", "5000") | ||
|
||
class ExpirySeeder: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put in utility.py |
||
def __init__(self): | ||
self.stop_flag = False | ||
self.i = 0 | ||
|
||
async def run(self, client): | ||
while not self.stop_flag: | ||
await client.execute_command(f"SET tmp{self.i} bar{self.i} EX 4") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use pipeline to speed up things |
||
self.i = self.i + 1 | ||
|
||
async def wait_until(self, count): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wait_untill_n_inserts or something like this |
||
while not self.i > count: | ||
await asyncio.sleep(0.5) | ||
|
||
def stop(self): | ||
self.stop_flag = True | ||
|
||
c_master = master.client() | ||
c_replica = replica.client() | ||
|
||
seeder = ExpirySeeder() | ||
seeder_task = asyncio.create_task(seeder.run(c_master)) | ||
await seeder.wait_until(50000) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the expire seeder will stop setting here wouldnt the bug will reproduce as well? |
||
|
||
await c_replica.execute_command(f"REPLICAOF localhost {master.port}") | ||
|
||
# wait for full sync | ||
async with async_timeout.timeout(3): | ||
await wait_for_replicas_state(c_replica, state="full_sync", timeout=0.05) | ||
|
||
await c_replica.execute_command("debug replica pause") | ||
|
||
# Dragonfly will get stuck here. The journal writes to a channel which will block on write. | ||
# Hearbeat() will be called and will get stuck while it tries to evict an expired item (because | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a. please write "expires" and not "evict" expired item |
||
# it will try to write that item to the journal which in turn will block while it pushes to | ||
# the channel). BreakStalledFlows() will never be called and the reconnect count will stay 0. | ||
# Furthermore, that's why we pick 3 seconds for the replica to timeout. If it was less, | ||
# we would not reach this state because the flow would break before the channel gets filled | ||
# (even though the write to sink from the channel is blocked). | ||
|
||
await asyncio.sleep(6) | ||
|
||
await c_replica.execute_command("debug replica resume") # resume replication | ||
|
||
await asyncio.sleep(1) # replica will start resync | ||
seeder.stop() | ||
await seeder_task | ||
|
||
await check_all_replicas_finished([c_replica], c_master) | ||
await assert_replica_reconnections(replica, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see the condition in origin code RunPeriodic to run the shard handler
if (shard_handler && last_handler_ms + 100 < last_heartbeat_ms)
we run shard_handler every 100 ms this is unrelated to to FLAGS_hz
I believe this function StartPeriodicFiberImpl is rednudant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on the condition implicitly when the period is greater than 100 ms because we wait
period_ms
. So if let's say,period_ms=500
the shard handler will run every500ms
(we always wait period_ms between runs) so that100
is really a minimum. I refactored this a little bit to mirror this behaviour