Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cluster: migration crash fix #4508

Merged
merged 2 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/facade/reply_capture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ CapturingReplyBuilder::Payload CapturingReplyBuilder::Take() {
CHECK(stack_.empty());
Payload pl = std::move(current_);
current_ = monostate{};
ConsumeLastError();
return pl;
}

Expand Down
3 changes: 2 additions & 1 deletion src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -716,14 +716,15 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
PrimeTable* table = GetTables(db_index).first;

auto iterate_bucket = [&](DbIndex db_index, PrimeTable::bucket_iterator it) {
it.AdvanceIfNotOccupied();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered fixing it in dash code? @adiholden
seems weird that callbacks are called with invalid iterators and you need to check them or advance them yourself.

while (!it.is_done()) {
del_entry_cb(it);
++it;
}
};

if (const PrimeTable::bucket_iterator* bit = req.update()) {
if (bit->GetVersion() < next_version) {
if (!bit->is_done() && bit->GetVersion() < next_version) {
iterate_bucket(db_index, *bit);
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ bool RestoreStreamer::ShouldWrite(SlotId slot_id) const {
bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
bool written = false;

if (it.GetVersion() < snapshot_version_) {
if (!it.is_done() && it.GetVersion() < snapshot_version_) {
stats_.buckets_written++;

it.SetVersion(snapshot_version_);
Expand Down
2 changes: 1 addition & 1 deletion src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
const PrimeTable::bucket_iterator* bit = req.update();

if (bit) {
if (bit->GetVersion() < snapshot_version_) {
if (!bit->is_done() && bit->GetVersion() < snapshot_version_) {
stats_.side_saved += SerializeBucket(db_index, *bit);
}
} else {
Expand Down
98 changes: 98 additions & 0 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2763,3 +2763,101 @@ async def test_migration_one_after_another(df_factory: DflyInstanceFactory, df_s
dbsize_node2 = await nodes[2].client.dbsize()
assert dbsize_node1 + dbsize_node2 == dbsize_node0
assert dbsize_node2 > 0 and dbsize_node1 > 0


"""
Test cluster node distributing its slots into 3 other nodes.
In this test we randomize the slot ranges that are migrated to each node
For each migration we start migration, wait for it to finish and once it is finished we send migration finalization config
"""


@pytest.mark.slow
@pytest.mark.exclude_epoll
@pytest.mark.asyncio
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_migration_rebalance_node(df_factory: DflyInstanceFactory, df_seeder_factory):
# 1. Create cluster of 3 nodes with all slots allocated to first node.
instances = [
df_factory.create(
port=next(next_port),
admin_port=next(next_port),
vmodule="outgoing_slot_migration=2,cluster_family=2,incoming_slot_migration=2,streamer=2",
)
for i in range(4)
]
df_factory.start_all(instances)

def create_random_ranges():
# Generate 2 random breakpoints within the range
breakpoints = sorted(random.sample(range(1, 16382), 2))
ranges = [
(0, breakpoints[0] - 1),
(breakpoints[0], breakpoints[1] - 1),
(breakpoints[1], 16383),
]
return ranges

# Create 3 random ranges from 0 to 16383
random_ranges = create_random_ranges()

nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = random_ranges
nodes[1].slots = []
nodes[2].slots = []
nodes[3].slots = []
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

key_num = 100000
logging.debug(f"DEBUG POPULATE first node with number of keys: {key_num}")
await StaticSeeder(key_target=key_num, data_size=100).run(nodes[0].client)
dbsize_node0 = await nodes[0].client.dbsize()
assert dbsize_node0 > (key_num * 0.95)

logging.debug("start seeding")
# Running seeder with pipeline mode when finalizing migrations leads to errors
# TODO: I believe that changing the seeder to generate pipeline command only on specific slot will fix the problem
seeder = df_seeder_factory.create(
keys=50_000, port=instances[0].port, cluster_mode=True, pipeline=False
)
await seeder.run(target_deviation=0.1)
seed = asyncio.create_task(seeder.run())

migration_info = [
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [random_ranges[0]], nodes[1].id),
MigrationInfo("127.0.0.1", nodes[2].instance.admin_port, [random_ranges[1]], nodes[2].id),
MigrationInfo("127.0.0.1", nodes[3].instance.admin_port, [random_ranges[2]], nodes[3].id),
]

nodes_lock = asyncio.Lock()

async def do_migration(index):
await asyncio.sleep(random.randint(1, 10) / 5)
async with nodes_lock:
logging.debug(f"Start migration from node {index}")
nodes[0].migrations.append(migration_info[index - 1])
await push_config(
json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]
)

logging.debug(f"wait migration from node {index}")
await wait_for_status(nodes[0].admin_client, nodes[index].id, "FINISHED", timeout=50)
await wait_for_status(nodes[index].admin_client, nodes[0].id, "FINISHED", timeout=50)
logging.debug(f"finished migration from node {index}")
await asyncio.sleep(random.randint(1, 5) / 5)
async with nodes_lock:
logging.debug(f"Finalize migration from node {index}")
nodes[index].slots = migration_info[index - 1].slots
nodes[0].slots.remove(migration_info[index - 1].slots[0])
nodes[0].migrations.remove(migration_info[index - 1])
await push_config(
json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]
)

all_migrations = [asyncio.create_task(do_migration(i)) for i in range(1, 4)]
for migration in all_migrations:
await migration

logging.debug("stop seeding")
seeder.stop()
await seed
14 changes: 9 additions & 5 deletions tests/dragonfly/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ def __init__(
stop_on_failure=True,
cluster_mode=False,
mirror_to_fake_redis=False,
pipeline=True,
):
if cluster_mode:
max_multikey = 1
Expand All @@ -439,6 +440,7 @@ def __init__(
self.stop_flag = False
self.stop_on_failure = stop_on_failure
self.fake_redis = None
self.use_pipeline = pipeline

self.log_file = log_file
if self.log_file is not None:
Expand All @@ -447,6 +449,7 @@ def __init__(
if mirror_to_fake_redis:
logging.debug("Creating FakeRedis instance")
self.fake_redis = fakeredis.FakeAsyncRedis()
self.use_pipeline = False

async def run(self, target_ops=None, target_deviation=None):
"""
Expand Down Expand Up @@ -604,18 +607,19 @@ async def _executor_task(self, db, queue):
break

try:
if self.fake_redis is None:
if self.use_pipeline:
pipe = client.pipeline(transaction=tx_data[1])
for cmd in tx_data[0]:
pipe.execute_command(*cmd)
await pipe.execute()
else:
# To mirror consistently to Fake Redis we must only send to it successful
# commands. We can't use pipes because they might succeed partially.
for cmd in tx_data[0]:
dfly_resp = await client.execute_command(*cmd)
fake_resp = await self.fake_redis.execute_command(*cmd)
assert dfly_resp == fake_resp
# To mirror consistently to Fake Redis we must only send to it successful
# commands. We can't use pipes because they might succeed partially.
if self.fake_redis is not None:
fake_resp = await self.fake_redis.execute_command(*cmd)
assert dfly_resp == fake_resp
except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e:
if self.stop_on_failure:
await self._close_client(client)
Expand Down
Loading