Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: separate Heartbeat and ShardHandler to fibers #3936

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open

Conversation

kostasrim
Copy link
Contributor

@kostasrim kostasrim commented Oct 16, 2024

The issue surfaced when the test started failing in #3891. Heartbeat will block indefinitely if TriggerJournalWriteToSink can't push the serialized data to the channel (if the replica becomes unresponsive). The consequence of that is that the shard_handler and the BreakStalledFlows will never be called and df will stall. The solution is to split the blocking/preemptive flow (Heartbeat) and the BreakStalledFlows into separate fibers.

  • separate shard_handler from Heartbeat
  • add test

Resolves: #3937


void Heartbeat();
void RetireExpiredAndEvict();

void RunPeriodic(std::chrono::milliseconds period_ms, std::function<void()> shard_handler);
void RunHeartbeatPeriodic(std::chrono::milliseconds period_ms,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does heartbeat still gets shard handler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep the callback just in case we want to use it the future but I agree it redundant. I will remove

});
} else {
fiber_shard_handler_periodic_ =
MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the condition in origin code RunPeriodic to run the shard handler
if (shard_handler && last_handler_ms + 100 < last_heartbeat_ms)

we run shard_handler every 100 ms this is unrelated to to FLAGS_hz
I believe this function StartPeriodicFiberImpl is rednudant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on the condition implicitly when the period is greater than 100 ms because we wait period_ms. So if let's say, period_ms=500 the shard handler will run every 500ms (we always wait period_ms between runs) so that 100 is really a minimum. I refactored this a little bit to mirror this behaviour


await asyncio.sleep(1)
await asyncio.sleep(10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, it was accidental


await c_master.execute_command("debug", "populate", "100000", "foo", "5000")

class ExpirySeeder:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put in utility.py


seeder = ExpirySeeder()
seeder_task = asyncio.create_task(seeder.run(c_master))
await seeder.wait_until(50000)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the expire seeder will stop setting here wouldnt the bug will reproduce as well?
do we need to continue sending commands to get to the bug?


async def run(self, client):
while not self.stop_flag:
await client.execute_command(f"SET tmp{self.i} bar{self.i} EX 4")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use pipeline to speed up things

await client.execute_command(f"SET tmp{self.i} bar{self.i} EX 4")
self.i = self.i + 1

async def wait_until(self, count):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_untill_n_inserts or something like this

await c_replica.execute_command("debug replica pause")

# Dragonfly will get stuck here. The journal writes to a channel which will block on write.
# Hearbeat() will be called and will get stuck while it tries to evict an expired item (because
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a. please write "expires" and not "evict" expired item
b. I would not write blocked while pushes to channel as this is very specific implemenation detail which also going to change. The reason we get stuck is that we block on write to socket as replica does not reads from it
c. please dont write that BreakStalledFlows() will never be called as this is your bug fix, I would describe the bug and this details in github bug issue

# Timeout set to 3 seconds because we must first saturate the socket such that subsequent
# writes block. Otherwise, we will break the flows before Heartbeat actually deadlocks.
master = df_factory.create(
proactor_threads=2, replication_timeout=3000, vmodule="replica=2,dflycmd=2"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why specify proactor_threads=2 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it was easier to reproduce with 2 threads. I haven;t tried with more so maybe the test requires tuning.

@@ -2295,7 +2295,9 @@ async def test_announce_ip_port(df_factory):
@pytest.mark.asyncio
async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory, df_seeder_factory):
# setting replication_timeout to a very small value to force the replica to timeout
master = df_factory.create(replication_timeout=100, vmodule="replica=2,dflycmd=2")
master = df_factory.create(
proactor_threads=2, replication_timeout=100, vmodule="replica=2,dflycmd=2"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, no need of proactor_threads=2


await asyncio.sleep(1)
await asyncio.sleep(10)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, it was accidental


void Heartbeat();
void RetireExpiredAndEvict();

void RunPeriodic(std::chrono::milliseconds period_ms, std::function<void()> shard_handler);
void RunHeartbeatPeriodic(std::chrono::milliseconds period_ms,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep the callback just in case we want to use it the future but I agree it redundant. I will remove

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

separate Heartbeat and shard_handler to different fibers
2 participants