Skip to content

Commit

Permalink
IGNITE-24047 Sql. Fixed flaky test CatalogCompactionRunnerSelfTest.re…
Browse files Browse the repository at this point in the history
…balancePreventsCompaction (#4932)
  • Loading branch information
xtern authored Dec 19, 2024
1 parent ddacc96 commit ac717ce
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 28 deletions.
1 change: 1 addition & 0 deletions modules/catalog-compaction/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ dependencies {
testImplementation libs.mockito.junit
testImplementation libs.mockito.core
testImplementation libs.hamcrest.core
testImplementation libs.awaitility

integrationTestImplementation libs.fastutil.core
integrationTestImplementation libs.awaitility
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
Expand Down Expand Up @@ -112,6 +114,8 @@
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.awaitility.Awaitility;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Assertions;
Expand All @@ -121,6 +125,8 @@
* Tests for class {@link CatalogCompactionRunner}.
*/
public class CatalogCompactionRunnerSelfTest extends AbstractCatalogCompactionTest {
private static final Duration BUSY_WAIT_TIMEOUT = Duration.of(3, ChronoUnit.SECONDS);

private static final LogicalNode NODE1 = new LogicalNode(nodeId(1), "node1", new NetworkAddress("localhost", 123));

private static final LogicalNode NODE2 = new LogicalNode(nodeId(2), "node2", new NetworkAddress("localhost", 123));
Expand Down Expand Up @@ -150,7 +156,7 @@ private static UUID nodeId(int id) {
}

@Test
public void routineSucceedOnCoordinator() throws InterruptedException {
public void routineSucceedOnCoordinator() {
CatalogCommand createTable = CreateTableCommand.builder()
.tableName("TEST")
.schemaName("PUBLIC")
Expand Down Expand Up @@ -187,8 +193,7 @@ public void routineSucceedOnCoordinator() throws InterruptedException {

int expectedEarliestCatalogVersion = catalog1.version() - 1;

boolean done = waitForCondition(() -> expectedEarliestCatalogVersion == catalogManager.earliestCatalogVersion(), 3_000);
assertTrue(done, "Compaction should have been triggered");
expectEarliestVersion("Compaction should have been triggered", is(expectedEarliestCatalogVersion));

verify(messagingService, times(logicalNodes.size() - 1))
.invoke(any(ClusterNode.class), any(CatalogCompactionMinimumTimesRequest.class), anyLong());
Expand All @@ -206,7 +211,7 @@ public void routineSucceedOnCoordinator() throws InterruptedException {
}

@Test
public void mustTriggerWhenRequiredPartitionsAreSomeSubSetOfAvailablePartitions() throws InterruptedException {
public void mustTriggerWhenRequiredPartitionsAreSomeSubSetOfAvailablePartitions() {
CatalogCommand createTable = CreateTableCommand.builder()
.tableName("TEST")
.schemaName("PUBLIC")
Expand Down Expand Up @@ -251,12 +256,11 @@ public void mustTriggerWhenRequiredPartitionsAreSomeSubSetOfAvailablePartitions(
HybridTimestamp now = clockService.now();
compactionRunner.onLowWatermarkChanged(now);

boolean done = waitForCondition(() -> expectedEarliestCatalogVersion == catalogManager.earliestCatalogVersion(), 3_000);
assertTrue(done, "Compaction should have been triggered");
expectEarliestVersion("Compaction should have been triggered", is(expectedEarliestCatalogVersion));
}

@Test
public void mustTriggerWhenAvailablePartitionsHaveMoreTablesThenRequired() throws InterruptedException {
public void mustTriggerWhenAvailablePartitionsHaveMoreTablesThenRequired() {
CatalogCommand createTable = CreateTableCommand.builder()
.tableName("TEST")
.schemaName("PUBLIC")
Expand Down Expand Up @@ -299,12 +303,11 @@ public void mustTriggerWhenAvailablePartitionsHaveMoreTablesThenRequired() throw
HybridTimestamp now = clockService.now();
compactionRunner.onLowWatermarkChanged(now);

boolean done = waitForCondition(() -> expectedEarliestCatalogVersion == catalogManager.earliestCatalogVersion(), 3_000);
assertTrue(done, "Compaction should have been triggered");
expectEarliestVersion("Compaction should have been triggered", is(expectedEarliestCatalogVersion));
}

@Test
public void mustTriggerWheLogicalTopologyHasMoreNodesThenRequired() throws InterruptedException {
public void mustTriggerWheLogicalTopologyHasMoreNodesThenRequired() {
CatalogCommand createTable = CreateTableCommand.builder()
.tableName("TEST")
.schemaName("PUBLIC")
Expand Down Expand Up @@ -343,8 +346,7 @@ public void mustTriggerWheLogicalTopologyHasMoreNodesThenRequired() throws Inter
HybridTimestamp now = clockService.now();
compactionRunner.onLowWatermarkChanged(now);

boolean done = waitForCondition(() -> expectedEarliestCatalogVersion == catalogManager.earliestCatalogVersion(), 3_000);
assertTrue(done, "Compaction should have been triggered");
expectEarliestVersion("Compaction should have been triggered", is(expectedEarliestCatalogVersion));
}

@Test
Expand Down Expand Up @@ -402,7 +404,7 @@ public void mustNotTriggerCompactionWhenLowWaterMarkIsNotAvailable() {
}

@Test
public void mustNotTriggerCompactionWhenIndexBuildingIsTakingPlace() throws InterruptedException {
public void mustNotTriggerCompactionWhenIndexBuildingIsTakingPlace() {
CatalogCommand command = CreateTableCommand.builder()
.tableName("T1")
.schemaName("PUBLIC")
Expand Down Expand Up @@ -447,8 +449,7 @@ public void mustNotTriggerCompactionWhenIndexBuildingIsTakingPlace() throws Inte
assertThat(compactionRunner.onLowWatermarkChanged(clockService.now()), willBe(false));
assertThat(compactionRunner.lastRunFuture(), willCompleteSuccessfully());

boolean done = waitForCondition(() -> catalogManager.earliestCatalogVersion() > initialVersion, 3_000);
assertTrue(done, "Should have advanced catalog version after initial compaction");
expectEarliestVersion("Should have advanced catalog version after initial compaction", greaterThan(initialVersion));
}

// The first version after initial compaction.
Expand Down Expand Up @@ -484,8 +485,7 @@ public void mustNotTriggerCompactionWhenIndexBuildingIsTakingPlace() throws Inte
assertThat(compactionRunner.onLowWatermarkChanged(clockService.now()), willBe(false));
assertThat(compactionRunner.lastRunFuture(), willCompleteSuccessfully());

boolean done = waitForCondition(() -> catalogManager.earliestCatalogVersion() == firstVersion, 3_000);
assertTrue(done, "Index is being built but catalog compaction was triggered");
expectEarliestVersion("Index is being built but catalog compaction was triggered", is(firstVersion));
}

{
Expand All @@ -506,8 +506,7 @@ public void mustNotTriggerCompactionWhenIndexBuildingIsTakingPlace() throws Inte
assertThat(compactionRunner.onLowWatermarkChanged(clockService.now()), willBe(false));
assertThat(compactionRunner.lastRunFuture(), willCompleteSuccessfully());

boolean done = waitForCondition(() -> catalogManager.earliestCatalogVersion() == latestVersion - 1, 3_000);
assertTrue(done, "Index is available but compaction has not been triggered");
expectEarliestVersion("Index is available but compaction has not been triggered", is(latestVersion - 1));
}
}

Expand Down Expand Up @@ -573,7 +572,7 @@ public void mustNotStartWhenSomePartitionsOnAreMissingAfterValidation() throws I

int expectedEarliestCatalogVersion = catalog1.version() - 1;

boolean failed = waitForCondition(() -> expectedEarliestCatalogVersion == catalogManager.earliestCatalogVersion(), 3_000);
boolean failed = waitForCondition(() -> expectedEarliestCatalogVersion == catalogManager.earliestCatalogVersion(), 500);
assertFalse(failed, "Compaction should not have started");

assertEquals(firstVersion, catalogManager.earliestCatalogVersion());
Expand All @@ -591,8 +590,7 @@ public void mustNotStartWhenSomePartitionsOnAreMissingAfterValidation() throws I

int expectedEarliestCatalogVersion = catalog1.version() - 1;

boolean done = waitForCondition(() -> expectedEarliestCatalogVersion == catalogManager.earliestCatalogVersion(), 3_000);
assertTrue(done, "Compaction should have been triggered");
expectEarliestVersion("Compaction should have been triggered", is(expectedEarliestCatalogVersion));
}
}

Expand Down Expand Up @@ -639,7 +637,7 @@ public void mustNotStartWhenPartitionsOfEntireTableAreMissing() throws Interrupt

int expectedEarliestCatalogVersion = catalog1.version() - 1;

boolean failed = waitForCondition(() -> expectedEarliestCatalogVersion == catalogManager.earliestCatalogVersion(), 3_000);
boolean failed = waitForCondition(() -> expectedEarliestCatalogVersion == catalogManager.earliestCatalogVersion(), 500);
assertFalse(failed, "Compaction should not have started");

assertEquals(firstVersion, catalogManager.earliestCatalogVersion());
Expand All @@ -657,8 +655,7 @@ public void mustNotStartWhenPartitionsOfEntireTableAreMissing() throws Interrupt

int expectedEarliestCatalogVersion = catalog1.version() - 1;

boolean done = waitForCondition(() -> expectedEarliestCatalogVersion == catalogManager.earliestCatalogVersion(), 3_000);
assertTrue(done, "Compaction should have been triggered");
expectEarliestVersion("Compaction should have been triggered", is(expectedEarliestCatalogVersion));
}
}

Expand Down Expand Up @@ -743,9 +740,8 @@ public void mustNotPerformWhenAssignmentNodeIsMissing() throws InterruptedExcept

compactor.triggerCompaction(clockService.now());
assertThat(compactor.lastRunFuture(), willCompleteSuccessfully());
waitForCondition(() -> catalogManager.earliestCatalogVersion() == catalog.version() - 1, 1_000);

assertThat(catalogManager.earliestCatalogVersion(), is(catalog.version() - 1));
expectEarliestVersion("Compaction should have been triggered", is(catalog.version() - 1));
}
}

Expand Down Expand Up @@ -1036,10 +1032,16 @@ NODE1, NODE1, new MinTimeSupplier((n) -> clockService.nowLong(), null), logicalN
assertThat(compactionRunner.onLowWatermarkChanged(clockService.now()), willBe(false));
assertThat(compactionRunner.lastRunFuture(), willCompleteSuccessfully());

assertThat(catalogManager.earliestCatalogVersion(), is(catalogManager.latestCatalogVersion() - 1));
expectEarliestVersion("Compaction should have been successful", is(catalogManager.latestCatalogVersion() - 1));
}
}

private void expectEarliestVersion(String reason, Matcher<Integer> versionMatcher) {
Awaitility.await()
.timeout(BUSY_WAIT_TIMEOUT)
.untilAsserted(() -> assertThat(reason, catalogManager.earliestCatalogVersion(), versionMatcher));
}

private Catalog prepareCatalogWithTables() {
CreateTableCommandBuilder tableCmdBuilder = CreateTableCommand.builder()
.schemaName("PUBLIC")
Expand Down

0 comments on commit ac717ce

Please sign in to comment.