Skip to content

Commit

Permalink
IGNITE-24209 Added tests for filters and storage profiles awareness o…
Browse files Browse the repository at this point in the history
…n partition reset (#5064)
  • Loading branch information
kgusakov authored Jan 23, 2025
1 parent 9eda3c6 commit 9acce8b
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

package org.apache.ignite.internal.table.distributed.disaster;

import static java.util.Collections.emptySet;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.table.TableTestUtils.getTableId;
import static org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.RECOVERY_TRIGGER_KEY;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -38,8 +43,10 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
Expand All @@ -50,9 +57,12 @@
import org.apache.ignite.internal.distributionzones.Node;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.versioned.VersionedSerialization;

Expand Down Expand Up @@ -132,10 +142,89 @@ private void waitAndAssertStableAssignmentsOfPartitionEqualTo(
.collect(Collectors.toUnmodifiableSet()));
}

/**
* Wait for the 2 facts simultaneously.
*
* <ul>
* <li>All planned rebalances have finished (pending and planned keys is empty).</li>
* <li>Stable assignments is equal to expected</li>
* </ul>
*
* @param gatewayNode Node for communication with cluster and components.
* @param tableName Table name.
* @param partitionIds Set of target partition ids to check.
* @param nodes Expected set of nodes in stable assignments.
*/
final void waitThatAllRebalancesHaveFinishedAndStableAssignmentsEqualsToExpected(
IgniteImpl gatewayNode, String tableName, Set<Integer> partitionIds, Set<String> nodes) {
partitionIds.forEach(p -> {
try {
waitThatAllRebalancesHaveFinishedAndStableAssignmentsEqualsToExpectedForPartition(gatewayNode, tableName, p, nodes);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}

/**
* The same as the previous one, but for concrete partition.
*
* @see AbstractHighAvailablePartitionsRecoveryTest#waitThatAllRebalancesHaveFinishedAndStableAssignmentsEqualsToExpected
*/
private void waitThatAllRebalancesHaveFinishedAndStableAssignmentsEqualsToExpectedForPartition(
IgniteImpl gatewayNode, String tableName, int partNum, Set<String> nodes
) throws InterruptedException {
AtomicReference<Set<String>> stableNodes = new AtomicReference<>();
AtomicReference<Set<String>> pendingNodes = new AtomicReference<>();
AtomicReference<Set<String>> plannedNodes = new AtomicReference<>();

assertTrue(
waitForCondition(() -> {
Integer tableId = getTableId(gatewayNode.catalogManager(), tableName, clock.nowLong());

assert tableId != null;

TablePartitionId tablePartitionId = new TablePartitionId(tableId, partNum);

ByteArray stableKey = stablePartAssignmentsKey(tablePartitionId);
ByteArray pendingKey = pendingPartAssignmentsKey(tablePartitionId);
ByteArray plannedKey = plannedPartAssignmentsKey(tablePartitionId);

Map<ByteArray, Entry> results = await(gatewayNode.metaStorageManager()
.getAll(Set.of(stableKey, pendingKey, plannedKey)), 1, TimeUnit.SECONDS);

boolean isStableAsExpected = nodes.equals(assignmentsFromEntry(results.get(stableKey)));
boolean isPendingEmpty = results.get(pendingKey).value() == null;
boolean isPlannedEmpty = results.get(plannedKey).value() == null;

stableNodes.set(assignmentsFromEntry(results.get(stableKey)));
pendingNodes.set(assignmentsFromEntry(results.get(pendingKey)));
plannedNodes.set(assignmentsFromEntry(results.get(plannedKey)));

return isStableAsExpected && isPendingEmpty && isPlannedEmpty;
},
500,
30_000
), IgniteStringFormatter.format(
"Expected: (stable: {}, pending: [], planned: []), but actual: (stable: {}, pending: {}, planned: {})",
nodes, stableNodes, pendingNodes, plannedNodes
)
);
}

static Entry getRecoveryTriggerKey(IgniteImpl node) {
return node.metaStorageManager().getLocally(RECOVERY_TRIGGER_KEY, Long.MAX_VALUE);
}

private static Set<String> assignmentsFromEntry(Entry entry) {
return (entry.value() != null)
? Assignments.fromBytes(entry.value()).nodes()
.stream()
.map(Assignment::consistentId)
.collect(Collectors.toUnmodifiableSet())
: emptySet();
}

private Set<Assignment> getPartitionClusterNodes(IgniteImpl node, String tableName, int partNum) {
return Optional.ofNullable(getTableId(node.catalogManager(), tableName, clock.nowLong()))
.map(tableId -> partitionAssignments(node.metaStorageManager(), tableId, partNum).join())
Expand All @@ -148,10 +237,16 @@ final int zoneIdByName(CatalogService catalogService, String zoneName) {

private void createHaZoneWithTables(
String zoneName, List<String> tableNames, String filter, Set<String> targetNodes) throws InterruptedException {
createHaZoneWithTables(zoneName, tableNames, filter, DEFAULT_STORAGE_PROFILE, targetNodes);
}

private void createHaZoneWithTables(
String zoneName, List<String> tableNames, String filter, String storageProfiles, Set<String> targetNodes
) throws InterruptedException {
executeSql(String.format(
"CREATE ZONE %s WITH REPLICAS=%s, PARTITIONS=%s, STORAGE_PROFILES='%s', "
+ "CONSISTENCY_MODE='HIGH_AVAILABILITY', DATA_NODES_FILTER='%s'",
zoneName, targetNodes.size(), PARTITIONS_NUMBER, DEFAULT_STORAGE_PROFILE, filter
zoneName, targetNodes.size(), PARTITIONS_NUMBER, storageProfiles, filter
));

Set<Integer> tableIds = new HashSet<>();
Expand Down Expand Up @@ -186,10 +281,23 @@ final void createHaZoneWithTable(String zoneName, String tableName) throws Inter
createHaZoneWithTables(zoneName, List.of(tableName));
}

final void createHaZoneWithTable(int replicas, String filter, Set<String> targetNodes) throws InterruptedException {
createHaZoneWithTables(
HA_ZONE_NAME, List.of(HA_TABLE_NAME), filter, targetNodes
);
}

final void createHaZoneWithTable() throws InterruptedException {
createHaZoneWithTable(HA_ZONE_NAME, HA_TABLE_NAME);
}

final void createHaZoneWithTableWithStorageProfile(int replicas, String storageProfiles, Set<String> targetNodes)
throws InterruptedException {
createHaZoneWithTables(
HA_ZONE_NAME, List.of(HA_TABLE_NAME), DEFAULT_FILTER, storageProfiles, targetNodes
);
}

final void createScZoneWithTable() {
executeSql(String.format(
"CREATE ZONE %s WITH REPLICAS=%s, PARTITIONS=%s, STORAGE_PROFILES='%s', CONSISTENCY_MODE='STRONG_CONSISTENCY'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@
import java.util.Set;
import org.apache.ignite.internal.app.IgniteImpl;
import org.intellij.lang.annotations.Language;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;

/** Test suite for the cases with a recovery of the group replication factor after reset by zone filter update. */
public class ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends AbstractHighAvailablePartitionsRecoveryTest {
private static final String GLOBAL_EU_NODES_CONFIG = nodeConfig("{region = EU, zone = global}");
private static final String GLOBAL_EU_NODES_CONFIG =
nodeConfig("{region = EU, zone = global}", "{segmented_aipersist.engine = aipersist}");

private static final String EU_ONLY_NODES_CONFIG = nodeConfig("{region = EU}");
private static final String EU_ONLY_NODES_CONFIG = nodeConfig("{region = EU}", null);

private static final String GLOBAL_NODES_CONFIG = nodeConfig("{zone = global}");
private static final String GLOBAL_NODES_CONFIG = nodeConfig("{zone = global}", null);

private static final String ROCKS_NODES_CONFIG = nodeConfig(null, "{lru_rocks.engine = rocksdb}");

private static final String AIPERSIST_NODES_CONFIG = nodeConfig(null, "{segmented_aipersist.engine = aipersist}");

@Override
protected int initialNodes() {
Expand Down Expand Up @@ -72,13 +78,61 @@ void testScaleUpAfterZoneFilterUpdate() throws InterruptedException {
waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, globalNodes);
}

@Test
void testThatPartitionResetZoneFilterAware() throws InterruptedException {
startNode(1, EU_ONLY_NODES_CONFIG);
startNode(2, GLOBAL_NODES_CONFIG);

String euFilter = "$[?(@.region == \"EU\")]";

Set<String> euNodes = nodeNames(0, 1);

createHaZoneWithTable(2, euFilter, euNodes);

IgniteImpl node = igniteImpl(0);

waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, euNodes);

assertRecoveryKeyIsEmpty(node);

stopNodes(1);

// Due to the fact that only one [0] node is suitable according to filter:
waitThatAllRebalancesHaveFinishedAndStableAssignmentsEqualsToExpected(node, HA_TABLE_NAME, PARTITION_IDS, nodeNames(0));
}

@Test
void testThatPartitionResetZoneStorageProfileFilterAware() throws InterruptedException {
startNode(1, AIPERSIST_NODES_CONFIG);
startNode(2, ROCKS_NODES_CONFIG);

Set<String> nodesWithAiProfile = nodeNames(0, 1);

createHaZoneWithTableWithStorageProfile(2, "segmented_aipersist", nodesWithAiProfile);

IgniteImpl node = igniteImpl(0);

waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, nodesWithAiProfile);

assertRecoveryKeyIsEmpty(node);

stopNodes(1);

// Due to the fact that only one [0] node is suitable according to storage profiles:
waitThatAllRebalancesHaveFinishedAndStableAssignmentsEqualsToExpected(node, HA_TABLE_NAME, PARTITION_IDS, nodeNames(0));
}

private void alterZoneSql(String filter, String zoneName) {
executeSql(String.format("ALTER ZONE \"%s\" SET \"DATA_NODES_FILTER\" = '%s'", zoneName, filter));
}

private static String nodeConfig(@Language("HOCON") String nodeAtrributes) {
private static String nodeConfig(
@Nullable @Language("HOCON") String nodeAtrributes,
@Nullable @Language("HOCON") String storageProfiles
) {
return "ignite {\n"
+ " nodeAttributes.nodeAttributes: " + nodeAtrributes + ",\n"
+ " storage.profiles: " + storageProfiles + ",\n"
+ " network: {\n"
+ " port: {},\n"
+ " nodeFinder: {\n"
Expand Down

0 comments on commit 9acce8b

Please sign in to comment.