diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 9923226d121..55968f19a48 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -76,6 +76,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; @@ -140,6 +141,7 @@ public DataSource createDataSource(Context context) { boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED); boolean treatTinyInt1AsBoolean = config.get(TREAT_TINYINT1_AS_BOOLEAN_ENABLED); + boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL); Duration connectTimeout = config.get(CONNECT_TIMEOUT); @@ -201,7 +203,8 @@ public DataSource createDataSource(Context context) { .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .parseOnLineSchemaChanges(isParsingOnLineSchemaChanges) .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean) - .useLegacyJsonFormat(useLegacyJsonFormat); + .useLegacyJsonFormat(useLegacyJsonFormat) + .skipSnapshotBackfill(skipSnapshotBackfill); List tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); @@ -336,6 +339,7 @@ public Set> optionalOptions() { options.add(INCLUDE_COMMENTS_ENABLED); options.add(USE_LEGACY_JSON_FORMAT); options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED); + options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index b57ebe04456..c8585183378 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -313,4 +313,12 @@ public class MySqlDataSourceOptions { .defaultValue(true) .withDescription( "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format."); + + @Experimental + public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP = + ConfigOptions.key("scan.incremental.snapshot.backfill.skip") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index cd3c697e44b..5001a847044 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -89,6 +89,7 @@ public class BinlogSplitReader implements DebeziumReader(); this.isParsingOnLineSchemaChanges = statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges(); + this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill(); } public void submitSplit(MySqlSplit mySqlSplit) { @@ -267,6 +269,9 @@ private boolean shouldEmit(SourceRecord sourceRecord) { Object[] chunkKey = RecordUtils.getSplitKey( splitKeyType, statefulTaskContext.getSchemaNameAdjuster(), target); + if (isBackfillSkipped) { + return true; + } for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) { if (RecordUtils.splitKeyRangeContains( chunkKey, splitInfo.getSplitStart(), splitInfo.getSplitEnd())