diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnector.java b/src/main/java/io/debezium/connector/vitess/VitessConnector.java index 8043aedb..8d03ad3a 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnector.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnector.java @@ -14,7 +14,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef; @@ -28,7 +27,7 @@ import io.debezium.connector.vitess.connection.VitessReplicationConnection; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; -import io.debezium.util.Strings; +import io.debezium.relational.Tables; import io.grpc.StatusRuntimeException; /** Vitess Connector entry point */ @@ -273,17 +272,15 @@ protected void validateConnection(Map configValues, Configu } } - public static List getIncludedTables(String keyspace, String tableIncludeList, List allTables) { + public static List getIncludedTables(VitessConnectorConfig connectorConfig, List allTables) { // table.include.list are list of patterns, filter all the tables in the keyspace through those patterns // to get the list of table names. - final List patterns = Strings.listOfRegex(tableIncludeList, Pattern.CASE_INSENSITIVE); List includedTables = new ArrayList<>(); - for (String ksTable : allTables) { - for (Pattern pattern : patterns) { - if (pattern.asPredicate().test(String.format("%s.%s", keyspace, ksTable))) { - includedTables.add(ksTable); - break; - } + String keyspace = connectorConfig.getKeyspace(); + Tables.TableFilter filter = new Filters(connectorConfig).tableFilter(); + for (String table : allTables) { + if (filter.isIncluded(new TableId("", keyspace, table))) { + includedTables.add(table); } } return includedTables; @@ -330,8 +327,7 @@ public List getMatchingCollections(Configuration configuration) { VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(configuration); String keyspace = vitessConnectorConfig.getKeyspace(); List allTables = vitessMetadata.getTables(); - List includedTables = getIncludedTables(keyspace, - vitessConnectorConfig.tableIncludeList(), allTables); + List includedTables = getIncludedTables(vitessConnectorConfig, allTables); return includedTables.stream() .map(table -> new TableId(keyspace, null, table)) .collect(Collectors.toList()); diff --git a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java index 9bffb4d9..81369899 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java @@ -295,8 +295,7 @@ private void setError(String msg) { if (!Strings.isNullOrEmpty(config.tableIncludeList())) { final String keyspace = config.getKeyspace(); final List allTables = new VitessMetadata(config).getTables(); - List includedTables = VitessConnector.getIncludedTables(config.getKeyspace(), - config.tableIncludeList(), allTables); + List includedTables = VitessConnector.getIncludedTables(config, allTables); for (String table : includedTables) { String sql = "select * from `" + table + "`"; // See rule in: https://github.com/vitessio/vitess/blob/release-14.0/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go#L316 diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorTest.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorTest.java index 4d5b3b6b..6ede5971 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorTest.java @@ -1646,11 +1646,29 @@ public void testTableIncludeList() { String keyspace = "ks"; List allTables = Arrays.asList("t1", "t22", "t3"); String tableIncludeList = new String("ks.t1,ks.t2.*"); - List includedTables = VitessConnector.getIncludedTables(keyspace, tableIncludeList, allTables); + Configuration config = Configuration.from(Map.of( + VitessConnectorConfig.TABLE_INCLUDE_LIST.name(), tableIncludeList, + VitessConnectorConfig.KEYSPACE.name(), keyspace)); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(config); + List includedTables = VitessConnector.getIncludedTables(connectorConfig, allTables); List expectedTables = Arrays.asList("t1", "t22"); assertEquals(expectedTables, includedTables); } + @Test + public void testTableIncludeListShouldExcludeTablesWithSuffix() { + String keyspace = "ks"; + List allTables = Arrays.asList("t1", "t2", "t22", "t13"); + String tableIncludeList = new String("ks.t1,ks.t2"); + Configuration config = Configuration.from(Map.of( + VitessConnectorConfig.TABLE_INCLUDE_LIST.name(), tableIncludeList, + VitessConnectorConfig.KEYSPACE.name(), keyspace)); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(config); + List includedTables = VitessConnector.getIncludedTables(connectorConfig, allTables); + List expectedTables = Arrays.asList("t1", "t2"); + assertEquals(expectedTables, includedTables); + } + private boolean isEmptyOffsets(Map offsets) { return offsets == null || offsets.isEmpty(); }