Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37128][pipeline-connector/mysql] Binlog reader skip shouldEmit filter when snapshot reader skip backfill #3874

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
Expand Down Expand Up @@ -140,6 +141,7 @@ public DataSource createDataSource(Context context) {
boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED);
boolean treatTinyInt1AsBoolean = config.get(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);

Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
Expand Down Expand Up @@ -201,7 +203,8 @@ public DataSource createDataSource(Context context) {
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges)
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat);
.useLegacyJsonFormat(useLegacyJsonFormat)
.skipSnapshotBackfill(skipSnapshotBackfill);

List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);

Expand Down Expand Up @@ -336,6 +339,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(INCLUDE_COMMENTS_ENABLED);
options.add(USE_LEGACY_JSON_FORMAT);
options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,4 +313,12 @@ public class MySqlDataSourceOptions {
.defaultValue(true)
.withDescription(
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");

@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP =
ConfigOptions.key("scan.incremental.snapshot.backfill.skip")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
private final StoppableChangeEventSourceContext changeEventSourceContext =
new StoppableChangeEventSourceContext();
private final boolean isParsingOnLineSchemaChanges;
private final boolean isBackfillSkipped;

private static final long READER_CLOSE_TIMEOUT = 30L;

Expand All @@ -110,6 +111,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId)
this.pureBinlogPhaseTables = new HashSet<>();
this.isParsingOnLineSchemaChanges =
statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges();
this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill();
}

public void submitSplit(MySqlSplit mySqlSplit) {
Expand Down Expand Up @@ -267,6 +269,9 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
Object[] chunkKey =
RecordUtils.getSplitKey(
splitKeyType, statefulTaskContext.getSchemaNameAdjuster(), target);
if (isBackfillSkipped) {
return true;
}
for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
if (RecordUtils.splitKeyRangeContains(
chunkKey, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
Expand Down
Loading