Skip to content

Commit

Permalink
[hotfix][tests][oceanbase] Fix oceanbase test failure, possibly cause…
Browse files Browse the repository at this point in the history
…d by some interactions among cases (#3712)
  • Loading branch information
yuxiqian authored Dec 2, 2024
1 parent 23e8149 commit 26f5880
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -47,6 +49,8 @@ public abstract class OceanBaseTestBase extends AbstractTestBase {

@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;

public static final Duration FETCH_TIMEOUT = Duration.ofSeconds(60);

protected abstract OceanBaseCdcMetadata metadata();

protected String commonOptionsString() {
Expand Down Expand Up @@ -130,10 +134,19 @@ protected void initializeTable(String sqlFile) {
}

public static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
throws InterruptedException, TimeoutException {
long deadlineTimestamp = System.currentTimeMillis() + FETCH_TIMEOUT.toMillis();
while (System.currentTimeMillis() < deadlineTimestamp) {
if (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
} else {
return;
}
}
throw new TimeoutException(
String.format(
"Failed to fetch enough records in sink.\nExpected size: %d\nActual values: %s",
expectedSize, TestValuesTableFactory.getRawResults(sinkName)));
}

public static int sinkSize(String sinkName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@

package org.apache.flink.cdc.connectors.oceanbase.table;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestBase;
import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer;
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseMySQLCdcMetadata;
import org.apache.flink.cdc.connectors.oceanbase.testutils.UniqueDatabase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;

import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -38,6 +41,8 @@
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createLogProxyContainer;
import static org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createOceanBaseContainerForCDC;
Expand All @@ -64,6 +69,9 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {
private static final OceanBaseCdcMetadata METADATA =
new OceanBaseMySQLCdcMetadata(OB_SERVER, LOG_PROXY);

private UniqueDatabase inventoryDatabase;
private UniqueDatabase columnTypesDatabase;

@Override
protected OceanBaseCdcMetadata metadata() {
return METADATA;
Expand All @@ -76,6 +84,19 @@ public void before() {
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
}

@After
public void after() {
if (inventoryDatabase != null) {
inventoryDatabase.dropDatabase();
inventoryDatabase = null;
}

if (columnTypesDatabase != null) {
columnTypesDatabase.dropDatabase();
columnTypesDatabase = null;
}
}

@Override
protected String logProxyOptionsString() {
return super.logProxyOptionsString()
Expand All @@ -85,8 +106,8 @@ protected String logProxyOptionsString() {

@Test
public void testTableList() throws Exception {
initializeTable("inventory");

inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory");
inventoryDatabase.createAndInitialize("mysql");
String sourceDDL =
String.format(
"CREATE TABLE ob_source ("
Expand All @@ -100,7 +121,7 @@ public void testTableList() throws Exception {
+ ", "
+ " 'table-list' = '%s'"
+ ")",
"inventory.products");
inventoryDatabase.getDatabaseName() + ".products");

String sinkDDL =
"CREATE TABLE sink ("
Expand All @@ -125,17 +146,18 @@ public void testTableList() throws Exception {

try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("USE %s;", inventoryDatabase.getDatabaseName()));
statement.execute(
"UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;");
statement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;");
"UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
statement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
statement.execute(
"INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM inventory.products WHERE id=111;");
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;");
}

waitForSinkSize("sink", snapshotSize + 7);
Expand Down Expand Up @@ -188,8 +210,8 @@ public void testTableList() throws Exception {

@Test
public void testMetadataColumns() throws Exception {
initializeTable("inventory");

inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory");
inventoryDatabase.createAndInitialize("mysql");
String sourceDDL =
String.format(
"CREATE TABLE ob_source ("
Expand All @@ -207,7 +229,7 @@ public void testMetadataColumns() throws Exception {
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
"^inventory$",
String.format("^%s$", inventoryDatabase.getDatabaseName()),
"^products$");

String sinkDDL =
Expand Down Expand Up @@ -236,46 +258,32 @@ public void testMetadataColumns() throws Exception {

try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("USE %s;", inventoryDatabase.getDatabaseName()));
statement.execute(
"UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;");
"UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
}

waitForSinkSize("sink", snapshotSize + 1);

String tenant = metadata().getTenantName();

List<String> expected =
Arrays.asList(
"+I("
+ tenant
+ ",inventory,products,101,scooter,Small 2-wheel scooter,3.1400000000)",
"+I("
+ tenant
+ ",inventory,products,102,car battery,12V car battery,8.1000000000)",
"+I("
+ tenant
+ ",inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000)",
"+I("
+ tenant
+ ",inventory,products,104,hammer,12oz carpenter's hammer,0.7500000000)",
"+I("
+ tenant
+ ",inventory,products,105,hammer,14oz carpenter's hammer,0.8750000000)",
"+I("
+ tenant
+ ",inventory,products,106,hammer,16oz carpenter's hammer,1.0000000000)",
"+I("
+ tenant
+ ",inventory,products,107,rocks,box of assorted rocks,5.3000000000)",
"+I("
+ tenant
+ ",inventory,products,108,jacket,water resistent black wind breaker,0.1000000000)",
"+I("
+ tenant
+ ",inventory,products,109,spare tire,24 inch spare tire,22.2000000000)",
"+U("
+ tenant
+ ",inventory,products,106,hammer,18oz carpenter hammer,1.0000000000)");
Stream.of(
"+I(%s,%s,products,101,scooter,Small 2-wheel scooter,3.1400000000)",
"+I(%s,%s,products,102,car battery,12V car battery,8.1000000000)",
"+I(%s,%s,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000)",
"+I(%s,%s,products,104,hammer,12oz carpenter's hammer,0.7500000000)",
"+I(%s,%s,products,105,hammer,14oz carpenter's hammer,0.8750000000)",
"+I(%s,%s,products,106,hammer,16oz carpenter's hammer,1.0000000000)",
"+I(%s,%s,products,107,rocks,box of assorted rocks,5.3000000000)",
"+I(%s,%s,products,108,jacket,water resistent black wind breaker,0.1000000000)",
"+I(%s,%s,products,109,spare tire,24 inch spare tire,22.2000000000)",
"+U(%s,%s,products,106,hammer,18oz carpenter hammer,1.0000000000)")
.map(
line ->
String.format(
line, tenant, inventoryDatabase.getDatabaseName()))
.collect(Collectors.toList());
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertContainsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
Expand All @@ -287,7 +295,9 @@ public void testAllDataTypes() throws Exception {
setGlobalTimeZone(serverTimeZone);
tEnv.getConfig().setLocalTimeZone(ZoneId.of(serverTimeZone));

initializeTable("column_type_test");
columnTypesDatabase = new UniqueDatabase(OB_SERVER, "column_type_test");
columnTypesDatabase.createAndInitialize("mysql");

String sourceDDL =
String.format(
"CREATE TABLE ob_source (\n"
Expand Down Expand Up @@ -340,7 +350,7 @@ public void testAllDataTypes() throws Exception {
+ " 'table-name' = '%s',"
+ " 'server-time-zone' = '%s'"
+ ")",
"^column_type_test$",
String.format("^%s$", columnTypesDatabase.getDatabaseName()),
"^full_types$",
serverTimeZone);
String sinkDDL =
Expand Down Expand Up @@ -445,8 +455,9 @@ public void testAllDataTypes() throws Exception {

try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("USE %s;", columnTypesDatabase.getDatabaseName()));
statement.execute(
"UPDATE column_type_test.full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
"UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
}

waitForSinkSize("sink", snapshotSize + 1);
Expand Down Expand Up @@ -475,7 +486,9 @@ public void testTimeDataTypes(String serverTimeZone) throws Exception {
setGlobalTimeZone(serverTimeZone);
tEnv.getConfig().setLocalTimeZone(ZoneId.of(serverTimeZone));

initializeTable("column_type_test");
columnTypesDatabase = new UniqueDatabase(OB_SERVER, "column_type_test");
columnTypesDatabase.createAndInitialize("mysql");

String sourceDDL =
String.format(
"CREATE TABLE ob_source (\n"
Expand All @@ -493,7 +506,7 @@ public void testTimeDataTypes(String serverTimeZone) throws Exception {
+ " 'table-name' = '%s',"
+ " 'server-time-zone' = '%s'"
+ ")",
"column_type_test",
columnTypesDatabase.getDatabaseName(),
"full_types",
serverTimeZone);

Expand Down Expand Up @@ -525,8 +538,9 @@ public void testTimeDataTypes(String serverTimeZone) throws Exception {

try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("USE %s;", columnTypesDatabase.getDatabaseName()));
statement.execute(
"UPDATE column_type_test.full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
"UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
}

waitForSinkSize("sink", snapshotSize + 1);
Expand All @@ -543,7 +557,8 @@ public void testTimeDataTypes(String serverTimeZone) throws Exception {

@Test
public void testSnapshotOnly() throws Exception {
initializeTable("inventory");
inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory");
inventoryDatabase.createAndInitialize("mysql");

String sourceDDL =
String.format(
Expand All @@ -558,7 +573,7 @@ public void testSnapshotOnly() throws Exception {
+ ", "
+ " 'table-list' = '%s'"
+ ")",
"inventory.products");
inventoryDatabase.getDatabaseName() + ".products");

String sinkDDL =
"CREATE TABLE sink ("
Expand Down Expand Up @@ -593,5 +608,10 @@ public void testSnapshotOnly() throws Exception {
"+I(109,spare tire,24 inch spare tire,22.2000000000)");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
assertContainsInAnyOrder(expected, actual);

while (result.getJobClient().get().getJobStatus().get().equals(JobStatus.RUNNING)) {
Thread.sleep(100);
// Waiting for job to quit, in case if
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,16 @@ public String qualifiedTableName(final String tableName) {
return String.format("%s.%s", databaseName, tableName);
}

/** Creates the database and populates it with initialization SQL script. */
public void createAndInitialize() {
final String ddlFile = String.format("ddl/%s.sql", templateName);
createAndInitializeWithDdlFile(String.format("ddl/%s.sql", templateName));
}

public void createAndInitialize(String variant) {
createAndInitializeWithDdlFile(String.format("ddl/%s/%s.sql", variant, templateName));
}

/** Creates the database and populates it with initialization SQL script. */
public void createAndInitializeWithDdlFile(String ddlFile) {
final URL ddlTestFile = UniqueDatabase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
-- DATABASE: column_type_test
-- ----------------------------------------------------------------------------------------------------------------

CREATE DATABASE IF NOT EXISTS column_type_test;
USE column_type_test;

DROP TABLE IF EXISTS full_types;
CREATE TABLE full_types
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
-- DATABASE: inventory
-- ----------------------------------------------------------------------------------------------------------------

CREATE DATABASE IF NOT EXISTS inventory;
USE inventory;

DROP TABLE IF EXISTS products;
CREATE TABLE products
(
Expand Down

0 comments on commit 26f5880

Please sign in to comment.