diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/IncrementalSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/IncrementalSource.java index df7614defd6..dfee377562f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/IncrementalSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/IncrementalSource.java @@ -121,7 +121,6 @@ public IncrementalSourceReader createReader(SourceReaderContext readerCont final SourceReaderMetrics sourceReaderMetrics = new SourceReaderMetrics(readerContext.metricGroup()); - sourceReaderMetrics.registerMetrics(); IncrementalSourceReaderContext incrementalSourceReaderContext = new IncrementalSourceReaderContext(readerContext); Supplier> splitReaderSupplier = @@ -161,13 +160,16 @@ public SplitEnumerator createEnumerator( remainingTables, isTableIdCaseSensitive, dataSourceDialect, - offsetFactory); + offsetFactory, + enumContext); } catch (Exception e) { throw new FlinkRuntimeException( "Failed to discover captured tables for enumerator", e); } } else { - splitAssigner = new StreamSplitAssigner(sourceConfig, dataSourceDialect, offsetFactory); + splitAssigner = + new StreamSplitAssigner( + sourceConfig, dataSourceDialect, offsetFactory, enumContext); } return new IncrementalSourceEnumerator( @@ -187,14 +189,16 @@ public SplitEnumerator restoreEnumerator( enumContext.currentParallelism(), (HybridPendingSplitsState) checkpoint, dataSourceDialect, - offsetFactory); + offsetFactory, + enumContext); } else if (checkpoint instanceof StreamPendingSplitsState) { splitAssigner = new StreamSplitAssigner( sourceConfig, (StreamPendingSplitsState) checkpoint, dataSourceDialect, - offsetFactory); + offsetFactory, + enumContext); } else { throw new UnsupportedOperationException( "Unsupported restored PendingSplitsState: " + checkpoint); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java index 6764daabda6..2a084d6c2bc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.connectors.base.source.assigner; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.cdc.connectors.base.config.SourceConfig; import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect; import org.apache.flink.cdc.connectors.base.source.assigner.state.HybridPendingSplitsState; @@ -27,6 +29,7 @@ import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; +import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics; import io.debezium.relational.TableId; import org.slf4j.Logger; @@ -61,13 +64,17 @@ public class HybridSplitAssigner implements SplitAssigne private final OffsetFactory offsetFactory; + private final SplitEnumeratorContext enumeratorContext; + private SourceEnumeratorMetrics enumeratorMetrics; + public HybridSplitAssigner( C sourceConfig, int currentParallelism, List remainingTables, boolean isTableIdCaseSensitive, DataSourceDialect dialect, - OffsetFactory offsetFactory) { + OffsetFactory offsetFactory, + SplitEnumeratorContext enumeratorContext) { this( sourceConfig, new SnapshotSplitAssigner<>( @@ -79,7 +86,8 @@ public HybridSplitAssigner( offsetFactory), false, sourceConfig.getSplitMetaGroupSize(), - offsetFactory); + offsetFactory, + enumeratorContext); } public HybridSplitAssigner( @@ -87,7 +95,8 @@ public HybridSplitAssigner( int currentParallelism, HybridPendingSplitsState checkpoint, DataSourceDialect dialect, - OffsetFactory offsetFactory) { + OffsetFactory offsetFactory, + SplitEnumeratorContext enumeratorContext) { this( sourceConfig, new SnapshotSplitAssigner<>( @@ -98,7 +107,8 @@ public HybridSplitAssigner( offsetFactory), checkpoint.isStreamSplitAssigned(), sourceConfig.getSplitMetaGroupSize(), - offsetFactory); + offsetFactory, + enumeratorContext); } private HybridSplitAssigner( @@ -106,17 +116,29 @@ private HybridSplitAssigner( SnapshotSplitAssigner snapshotSplitAssigner, boolean isStreamSplitAssigned, int splitMetaGroupSize, - OffsetFactory offsetFactory) { + OffsetFactory offsetFactory, + SplitEnumeratorContext enumeratorContext) { this.sourceConfig = sourceConfig; this.snapshotSplitAssigner = snapshotSplitAssigner; this.isStreamSplitAssigned = isStreamSplitAssigned; this.splitMetaGroupSize = splitMetaGroupSize; this.offsetFactory = offsetFactory; + this.enumeratorContext = enumeratorContext; } @Override public void open() { + this.enumeratorMetrics = new SourceEnumeratorMetrics(enumeratorContext.metricGroup()); + + if (isStreamSplitAssigned) { + enumeratorMetrics.enterStreamReading(); + } else { + enumeratorMetrics.exitStreamReading(); + } + snapshotSplitAssigner.open(); + // init enumerator metrics + snapshotSplitAssigner.initEnumeratorMetrics(enumeratorMetrics); } @Override @@ -126,6 +148,7 @@ public Optional getNext() { return Optional.empty(); } if (snapshotSplitAssigner.noMoreSplits()) { + enumeratorMetrics.exitSnapshotPhase(); // stream split assigning if (isStreamSplitAssigned) { // no more splits for the assigner @@ -137,6 +160,7 @@ public Optional getNext() { // assigning the stream split. Otherwise, records emitted from stream split // might be out-of-order in terms of same primary key with snapshot splits. isStreamSplitAssigned = true; + enumeratorMetrics.enterStreamReading(); StreamSplit streamSplit = createStreamSplit(); LOG.trace( "SnapshotSplitAssigner is finished: creating a new stream split {}", @@ -145,6 +169,7 @@ public Optional getNext() { } else if (isNewlyAddedAssigningFinished(snapshotSplitAssigner.getAssignerStatus())) { // do not need to create stream split, but send event to wake up the binlog reader isStreamSplitAssigned = true; + enumeratorMetrics.enterStreamReading(); return Optional.empty(); } else { // stream split is not ready by now @@ -184,6 +209,9 @@ public void addSplits(Collection splits) { isStreamSplitAssigned = false; } } + if (!snapshotSplits.isEmpty()) { + enumeratorMetrics.exitStreamReading(); + } snapshotSplitAssigner.addSplits(snapshotSplits); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java index d534aef632a..732e788219f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java @@ -27,6 +27,7 @@ import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; +import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; @@ -49,6 +50,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus.INITIAL_ASSIGNING; @@ -81,6 +83,10 @@ public class SnapshotSplitAssigner implements SplitAssig private final DataSourceDialect dialect; private final OffsetFactory offsetFactory; + private SourceEnumeratorMetrics enumeratorMetrics; + private final Map splitFinishedCheckpointIds; + private static final long UNDEFINED_CHECKPOINT_ID = -1; + public SnapshotSplitAssigner( C sourceConfig, int currentParallelism, @@ -101,7 +107,8 @@ public SnapshotSplitAssigner( isTableIdCaseSensitive, true, dialect, - offsetFactory); + offsetFactory, + new ConcurrentHashMap<>()); } public SnapshotSplitAssigner( @@ -123,7 +130,8 @@ public SnapshotSplitAssigner( checkpoint.isTableIdCaseSensitive(), checkpoint.isRemainingTablesCheckpointed(), dialect, - offsetFactory); + offsetFactory, + new ConcurrentHashMap<>()); } private SnapshotSplitAssigner( @@ -139,7 +147,8 @@ private SnapshotSplitAssigner( boolean isTableIdCaseSensitive, boolean isRemainingTablesCheckpointed, DataSourceDialect dialect, - OffsetFactory offsetFactory) { + OffsetFactory offsetFactory, + Map splitFinishedCheckpointIds) { this.sourceConfig = sourceConfig; this.currentParallelism = currentParallelism; this.alreadyProcessedTables = alreadyProcessedTables; @@ -163,6 +172,7 @@ private SnapshotSplitAssigner( this.isTableIdCaseSensitive = isTableIdCaseSensitive; this.dialect = dialect; this.offsetFactory = offsetFactory; + this.splitFinishedCheckpointIds = splitFinishedCheckpointIds; } @Override @@ -269,6 +279,46 @@ private void captureNewlyAddedTables() { } } + /** This should be invoked after this class's open method. */ + public void initEnumeratorMetrics(SourceEnumeratorMetrics enumeratorMetrics) { + this.enumeratorMetrics = enumeratorMetrics; + + this.enumeratorMetrics.enterSnapshotPhase(); + this.enumeratorMetrics.registerMetrics( + alreadyProcessedTables::size, assignedSplits::size, remainingSplits::size); + this.enumeratorMetrics.addNewTables(computeTablesPendingSnapshot()); + for (SchemalessSnapshotSplit snapshotSplit : remainingSplits) { + this.enumeratorMetrics + .getTableMetrics(snapshotSplit.getTableId()) + .addNewSplit(snapshotSplit.splitId()); + } + for (SchemalessSnapshotSplit snapshotSplit : assignedSplits.values()) { + this.enumeratorMetrics + .getTableMetrics(snapshotSplit.getTableId()) + .addProcessedSplit(snapshotSplit.splitId()); + } + for (String splitId : splitFinishedOffsets.keySet()) { + TableId tableId = SnapshotSplit.extractTableId(splitId); + this.enumeratorMetrics.getTableMetrics(tableId).addFinishedSplit(splitId); + } + } + + // remainingTables + tables has been split but not processed + private int computeTablesPendingSnapshot() { + int numTablesPendingSnapshot = remainingTables.size(); + Set computedTables = new HashSet<>(); + for (SchemalessSnapshotSplit split : remainingSplits) { + TableId tableId = split.getTableId(); + if (!computedTables.contains(tableId) + && !alreadyProcessedTables.contains(tableId) + && !remainingTables.contains(tableId)) { + computedTables.add(tableId); + numTablesPendingSnapshot++; + } + } + return numTablesPendingSnapshot; + } + @Override public Optional getNext() { if (!remainingSplits.isEmpty()) { @@ -277,6 +327,9 @@ public Optional getNext() { SchemalessSnapshotSplit split = iterator.next(); iterator.remove(); assignedSplits.put(split.splitId(), split); + enumeratorMetrics + .getTableMetrics(split.getTableId()) + .finishProcessSplit(split.splitId()); return Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId()))); } else { // it's turn for new table @@ -294,7 +347,15 @@ public Optional getNext() { .collect(Collectors.toList()); remainingSplits.addAll(schemalessSnapshotSplits); tableSchemas.putAll(tableSchema); + if (!alreadyProcessedTables.contains(nextTable)) { + enumeratorMetrics.startSnapshotTables(1); + } alreadyProcessedTables.add(nextTable); + List splitIds = + schemalessSnapshotSplits.stream() + .map(SchemalessSnapshotSplit::splitId) + .collect(Collectors.toList()); + enumeratorMetrics.getTableMetrics(nextTable).addNewSplits(splitIds); return getNext(); } else { return Optional.empty(); @@ -335,6 +396,12 @@ public List getFinishedSplitInfos() { @Override public void onFinishedSplits(Map splitFinishedOffsets) { this.splitFinishedOffsets.putAll(splitFinishedOffsets); + for (String splitId : splitFinishedOffsets.keySet()) { + splitFinishedCheckpointIds.put(splitId, UNDEFINED_CHECKPOINT_ID); + } + LOG.info( + "splitFinishedCheckpointIds size in onFinishedSplits: {}", + splitFinishedCheckpointIds == null ? 0 : splitFinishedCheckpointIds.size()); if (allSnapshotSplitsFinished() && isAssigningSnapshotSplits(assignerStatus)) { // Skip the waiting checkpoint when current parallelism is 1 which means we do not need // to care about the global output data order of snapshot splits and stream split. @@ -359,11 +426,31 @@ public void addSplits(Collection splits) { // because they are failed assignedSplits.remove(split.splitId()); splitFinishedOffsets.remove(split.splitId()); + + enumeratorMetrics + .getTableMetrics(split.asSnapshotSplit().getTableId()) + .reprocessSplit(split.splitId()); + TableId tableId = split.asSnapshotSplit().getTableId(); + + enumeratorMetrics.getTableMetrics(tableId).removeFinishedSplit(split.splitId()); } } @Override public SnapshotPendingSplitsState snapshotState(long checkpointId) { + if (splitFinishedCheckpointIds != null && !splitFinishedCheckpointIds.isEmpty()) { + for (Map.Entry splitFinishedCheckpointId : + splitFinishedCheckpointIds.entrySet()) { + if (splitFinishedCheckpointId.getValue() == UNDEFINED_CHECKPOINT_ID) { + splitFinishedCheckpointId.setValue(checkpointId); + } + } + LOG.info( + "SnapshotSplitAssigner snapshotState on checkpoint {} with splitFinishedCheckpointIds size {}.", + checkpointId, + splitFinishedCheckpointIds.size()); + } + SnapshotPendingSplitsState state = new SnapshotPendingSplitsState( alreadyProcessedTables, @@ -374,7 +461,8 @@ public SnapshotPendingSplitsState snapshotState(long checkpointId) { assignerStatus, remainingTables, isTableIdCaseSensitive, - true); + true, + splitFinishedCheckpointIds); // we need a complete checkpoint before mark this assigner to be finished, to wait for all // records of snapshot splits are completely processed if (checkpointIdToFinish == null @@ -397,6 +485,27 @@ && allSnapshotSplitsFinished()) { } LOG.info("Snapshot split assigner is turn into finished status."); } + + if (splitFinishedCheckpointIds != null && !splitFinishedCheckpointIds.isEmpty()) { + Iterator> iterator = + splitFinishedCheckpointIds.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry splitFinishedCheckpointId = iterator.next(); + String splitId = splitFinishedCheckpointId.getKey(); + Long splitCheckpointId = splitFinishedCheckpointId.getValue(); + if (splitCheckpointId != UNDEFINED_CHECKPOINT_ID + && checkpointId >= splitCheckpointId) { + // record table-level splits metrics + TableId tableId = SnapshotSplit.extractTableId(splitId); + enumeratorMetrics.getTableMetrics(tableId).addFinishedSplit(splitId); + iterator.remove(); + } + } + LOG.info( + "Checkpoint completed on checkpoint {} with splitFinishedCheckpointIds size {}.", + checkpointId, + splitFinishedCheckpointIds.size()); + } } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java index 1e7b2fa1ec2..cbb749932e6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.connectors.base.source.assigner; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.cdc.connectors.base.config.SourceConfig; import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect; import org.apache.flink.cdc.connectors.base.options.StartupOptions; @@ -27,6 +29,7 @@ import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; +import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics; import java.io.IOException; import java.util.ArrayList; @@ -49,32 +52,53 @@ public class StreamSplitAssigner implements SplitAssigner { private final DataSourceDialect dialect; private final OffsetFactory offsetFactory; + private final SplitEnumeratorContext enumeratorContext; + private SourceEnumeratorMetrics enumeratorMetrics; + public StreamSplitAssigner( - SourceConfig sourceConfig, DataSourceDialect dialect, OffsetFactory offsetFactory) { - this(sourceConfig, false, dialect, offsetFactory); + SourceConfig sourceConfig, + DataSourceDialect dialect, + OffsetFactory offsetFactory, + SplitEnumeratorContext enumeratorContext) { + this(sourceConfig, false, dialect, offsetFactory, enumeratorContext); } public StreamSplitAssigner( SourceConfig sourceConfig, StreamPendingSplitsState checkpoint, DataSourceDialect dialect, - OffsetFactory offsetFactory) { - this(sourceConfig, checkpoint.isStreamSplitAssigned(), dialect, offsetFactory); + OffsetFactory offsetFactory, + SplitEnumeratorContext enumeratorContext) { + this( + sourceConfig, + checkpoint.isStreamSplitAssigned(), + dialect, + offsetFactory, + enumeratorContext); } private StreamSplitAssigner( SourceConfig sourceConfig, boolean isStreamSplitAssigned, DataSourceDialect dialect, - OffsetFactory offsetFactory) { + OffsetFactory offsetFactory, + SplitEnumeratorContext enumeratorContext) { this.sourceConfig = sourceConfig; this.isStreamSplitAssigned = isStreamSplitAssigned; this.dialect = dialect; this.offsetFactory = offsetFactory; + this.enumeratorContext = enumeratorContext; } @Override - public void open() {} + public void open() { + this.enumeratorMetrics = new SourceEnumeratorMetrics(enumeratorContext.metricGroup()); + if (isStreamSplitAssigned) { + enumeratorMetrics.enterStreamReading(); + } else { + enumeratorMetrics.exitStreamReading(); + } + } @Override public Optional getNext() { @@ -82,6 +106,7 @@ public Optional getNext() { return Optional.empty(); } else { isStreamSplitAssigned = true; + enumeratorMetrics.enterStreamReading(); return Optional.of(createStreamSplit()); } } @@ -105,6 +130,7 @@ public void onFinishedSplits(Map splitFinishedOffsets) { public void addSplits(Collection splits) { // we don't store the split, but will re-create stream split later isStreamSplitAssigned = false; + enumeratorMetrics.exitStreamReading(); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java index 83dbdc0e8f9..cc5fb359217 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java @@ -53,7 +53,7 @@ */ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer { - private static final int VERSION = 6; + private static final int VERSION = 7; private static final ThreadLocal SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); @@ -114,6 +114,7 @@ public PendingSplitsState deserialize(int version, byte[] serialized) throws IOE case 4: case 5: case 6: + case 7: return deserializePendingSplitsState(version, serialized); default: throw new IOException("Unknown version: " + version); @@ -168,6 +169,8 @@ private void serializeSnapshotPendingSplitsState( writeTableIds(state.getRemainingTables(), out); out.writeBoolean(state.isTableIdCaseSensitive()); writeTableSchemas(state.getTableSchemas(), out); + + writeSplitFinishedCheckpointIds(state.getSplitFinishedCheckpointIds(), out); } private void serializeHybridPendingSplitsState( @@ -226,7 +229,8 @@ private SnapshotPendingSplitsState deserializeLegacySnapshotPendingSplitsState( assignerStatus, new ArrayList<>(), false, - false); + false, + new HashMap<>()); } private HybridPendingSplitsState deserializeLegacyHybridPendingSplitsState( @@ -277,6 +281,10 @@ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState( if (version >= 4) { tableSchemas.putAll(readTableSchemas(splitVersion, in)); } + Map splitFinishedCheckpointIds = new HashMap<>(); + if (version >= 7) { + splitFinishedCheckpointIds = readSplitFinishedCheckpointIds(in); + } return new SnapshotPendingSplitsState( alreadyProcessedTables, remainingSchemalessSplits, @@ -286,7 +294,8 @@ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState( assignerStatus, remainingTableIds, isTableIdCaseSensitive, - true); + true, + splitFinishedCheckpointIds); } private HybridPendingSplitsState deserializeHybridPendingSplitsState( @@ -306,6 +315,30 @@ private StreamPendingSplitsState deserializeStreamPendingSplitsState(DataInputDe // Utilities // ------------------------------------------------------------------------------------------ + private void writeSplitFinishedCheckpointIds( + Map splitFinishedCheckpointIds, DataOutputSerializer out) + throws IOException { + final int size = splitFinishedCheckpointIds.size(); + out.writeInt(size); + for (Map.Entry splitFinishedCheckpointId : + splitFinishedCheckpointIds.entrySet()) { + out.writeUTF(splitFinishedCheckpointId.getKey()); + out.writeLong(splitFinishedCheckpointId.getValue()); + } + } + + private Map readSplitFinishedCheckpointIds(DataInputDeserializer in) + throws IOException { + Map splitFinishedCheckpointIds = new HashMap<>(); + final int size = in.readInt(); + for (int i = 0; i < size; i++) { + String splitId = in.readUTF(); + Long checkpointId = in.readLong(); + splitFinishedCheckpointIds.put(splitId, checkpointId); + } + return splitFinishedCheckpointIds; + } + private void writeFinishedOffsets(Map splitsInfo, DataOutputSerializer out) throws IOException { final int size = splitsInfo.size(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java index 42fb0e23dc2..0641f699cc1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java @@ -68,6 +68,9 @@ public class SnapshotPendingSplitsState extends PendingSplitsState { private final Map tableSchemas; + /** Map to record splitId and the checkpointId mark the split is finished. */ + private final Map splitFinishedCheckpointIds; + public SnapshotPendingSplitsState( List alreadyProcessedTables, List remainingSplits, @@ -77,7 +80,8 @@ public SnapshotPendingSplitsState( AssignerStatus assignerStatus, List remainingTables, boolean isTableIdCaseSensitive, - boolean isRemainingTablesCheckpointed) { + boolean isRemainingTablesCheckpointed, + Map splitFinishedCheckpointIds) { this.alreadyProcessedTables = alreadyProcessedTables; this.remainingSplits = remainingSplits; this.assignedSplits = assignedSplits; @@ -87,6 +91,11 @@ public SnapshotPendingSplitsState( this.isTableIdCaseSensitive = isTableIdCaseSensitive; this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; this.tableSchemas = tableSchemas; + this.splitFinishedCheckpointIds = splitFinishedCheckpointIds; + } + + public Map getSplitFinishedCheckpointIds() { + return splitFinishedCheckpointIds; } public List getAlreadyProcessedTables() { @@ -141,7 +150,8 @@ public boolean equals(Object o) { && Objects.equals(alreadyProcessedTables, that.alreadyProcessedTables) && Objects.equals(remainingSplits, that.remainingSplits) && Objects.equals(assignedSplits, that.assignedSplits) - && Objects.equals(splitFinishedOffsets, that.splitFinishedOffsets); + && Objects.equals(splitFinishedOffsets, that.splitFinishedOffsets) + && Objects.equals(splitFinishedCheckpointIds, that.splitFinishedCheckpointIds); } @Override @@ -154,7 +164,8 @@ public int hashCode() { splitFinishedOffsets, assignerStatus, isTableIdCaseSensitive, - isRemainingTablesCheckpointed); + isRemainingTablesCheckpointed, + splitFinishedCheckpointIds); } @Override @@ -176,6 +187,8 @@ public String toString() { + isTableIdCaseSensitive + ", isRemainingTablesCheckpointed=" + isRemainingTablesCheckpointed + + ", splitFinishedCheckpointIds=" + + splitFinishedCheckpointIds + '}'; } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceEnumeratorMetrics.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceEnumeratorMetrics.java new file mode 100644 index 00000000000..71d057735a4 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceEnumeratorMetrics.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.source.metrics; + +import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup; + +import io.debezium.relational.TableId; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** A collection class for handling metrics in {@link SourceEnumeratorMetrics}. */ +public class SourceEnumeratorMetrics { + private static final Logger LOGGER = LoggerFactory.getLogger(SourceEnumeratorMetrics.class); + // Constants + public static final int UNDEFINED = 0; + + // Metric names + public static final String IS_SNAPSHOTTING = "isSnapshotting"; + public static final String IS_STREAM_READING = "isStreamReading"; + public static final String NUM_TABLES_SNAPSHOTTED = "numTablesSnapshotted"; + public static final String NUM_TABLES_REMAINING = "numTablesRemaining"; + public static final String NUM_SNAPSHOT_SPLITS_PROCESSED = "numSnapshotSplitsProcessed"; + public static final String NUM_SNAPSHOT_SPLITS_REMAINING = "numSnapshotSplitsRemaining"; + public static final String NUM_SNAPSHOT_SPLITS_FINISHED = "numSnapshotSplitsFinished"; + public static final String SNAPSHOT_START_TIME = "snapshotStartTime"; + public static final String SNAPSHOT_END_TIME = "snapshotEndTime"; + public static final String NAMESPACE_GROUP_KEY = "namespace"; + public static final String SCHEMA_GROUP_KEY = "schema"; + public static final String TABLE_GROUP_KEY = "table"; + + private final SplitEnumeratorMetricGroup metricGroup; + + private volatile int isSnapshotting = UNDEFINED; + private volatile int isStreamReading = UNDEFINED; + private volatile int numTablesRemaining = 0; + + // Map for managing per-table metrics by table identifier + // Key: Identifier of the table + // Value: TableMetrics related to the table + private final Map tableMetricsMap = new ConcurrentHashMap<>(); + + public SourceEnumeratorMetrics(SplitEnumeratorMetricGroup metricGroup) { + this.metricGroup = metricGroup; + metricGroup.gauge(IS_SNAPSHOTTING, () -> isSnapshotting); + metricGroup.gauge(IS_STREAM_READING, () -> isStreamReading); + metricGroup.gauge(NUM_TABLES_REMAINING, () -> numTablesRemaining); + } + + public void enterSnapshotPhase() { + this.isSnapshotting = 1; + } + + public void exitSnapshotPhase() { + this.isSnapshotting = 0; + } + + public void enterStreamReading() { + this.isStreamReading = 1; + } + + public void exitStreamReading() { + this.isStreamReading = 0; + } + + public void registerMetrics( + Gauge numTablesSnapshotted, + Gauge numSnapshotSplitsProcessed, + Gauge numSnapshotSplitsRemaining) { + metricGroup.gauge(NUM_TABLES_SNAPSHOTTED, numTablesSnapshotted); + metricGroup.gauge(NUM_SNAPSHOT_SPLITS_PROCESSED, numSnapshotSplitsProcessed); + metricGroup.gauge(NUM_SNAPSHOT_SPLITS_REMAINING, numSnapshotSplitsRemaining); + } + + public void addNewTables(int numNewTables) { + numTablesRemaining += numNewTables; + } + + public void startSnapshotTables(int numSnapshottedTables) { + numTablesRemaining -= numSnapshottedTables; + } + + public TableMetrics getTableMetrics(TableId tableId) { + return tableMetricsMap.computeIfAbsent( + tableId, + key -> new TableMetrics(key.catalog(), key.schema(), key.table(), metricGroup)); + } + + // ----------------------------------- Helper classes -------------------------------- + + /** + * Collection class for managing metrics of a table. + * + *

Metrics of table level are registered in its corresponding subgroup under the {@link + * SplitEnumeratorMetricGroup}. + */ + public static class TableMetrics { + private AtomicInteger numSnapshotSplitsProcessed = new AtomicInteger(0); + private AtomicInteger numSnapshotSplitsRemaining = new AtomicInteger(0); + private AtomicInteger numSnapshotSplitsFinished = new AtomicInteger(0); + private volatile long snapshotStartTime = UNDEFINED; + private volatile long snapshotEndTime = UNDEFINED; + + private Set remainingSplitChunkIds = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + private Set processedSplitChunkIds = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + private Set finishedSplitChunkIds = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + + public TableMetrics( + String databaseName, String schemaName, String tableName, MetricGroup parentGroup) { + databaseName = processNull(databaseName); + schemaName = processNull(schemaName); + tableName = processNull(tableName); + MetricGroup metricGroup = + parentGroup + .addGroup(NAMESPACE_GROUP_KEY, databaseName) + .addGroup(SCHEMA_GROUP_KEY, schemaName) + .addGroup(TABLE_GROUP_KEY, tableName); + metricGroup.gauge( + NUM_SNAPSHOT_SPLITS_PROCESSED, () -> numSnapshotSplitsProcessed.intValue()); + metricGroup.gauge( + NUM_SNAPSHOT_SPLITS_REMAINING, () -> numSnapshotSplitsRemaining.intValue()); + metricGroup.gauge( + NUM_SNAPSHOT_SPLITS_FINISHED, () -> numSnapshotSplitsFinished.intValue()); + metricGroup.gauge(SNAPSHOT_START_TIME, () -> snapshotStartTime); + metricGroup.gauge(SNAPSHOT_END_TIME, () -> snapshotEndTime); + snapshotStartTime = System.currentTimeMillis(); + } + + private String processNull(String name) { + if (StringUtils.isBlank(name)) { + // If null, convert to an empty string + return ""; + } + return name; + } + + public void addNewSplit(String newSplitId) { + int chunkId = SnapshotSplit.extractChunkId(newSplitId); + if (!remainingSplitChunkIds.contains(chunkId)) { + remainingSplitChunkIds.add(chunkId); + numSnapshotSplitsRemaining.getAndAdd(1); + LOGGER.info("add remaining split: {}", newSplitId); + } + } + + public void addNewSplits(List newSplitIds) { + if (newSplitIds != null) { + for (String newSplitId : newSplitIds) { + addNewSplit(newSplitId); + } + } + } + + public void removeRemainingSplit(String removeSplitId) { + int chunkId = SnapshotSplit.extractChunkId(removeSplitId); + if (remainingSplitChunkIds.contains(chunkId)) { + remainingSplitChunkIds.remove(chunkId); + numSnapshotSplitsRemaining.getAndUpdate(num -> num - 1); + LOGGER.info("remove remaining split: {}", removeSplitId); + } + } + + public void addProcessedSplit(String processedSplitId) { + int chunkId = SnapshotSplit.extractChunkId(processedSplitId); + if (!processedSplitChunkIds.contains(chunkId)) { + processedSplitChunkIds.add(chunkId); + numSnapshotSplitsProcessed.getAndAdd(1); + LOGGER.info("add processed split: {}", processedSplitId); + } + } + + public void removeProcessedSplit(String removeSplitId) { + int chunkId = SnapshotSplit.extractChunkId(removeSplitId); + if (processedSplitChunkIds.contains(chunkId)) { + processedSplitChunkIds.remove(chunkId); + numSnapshotSplitsProcessed.getAndUpdate(num -> num - 1); + LOGGER.info("remove processed split: {}", removeSplitId); + } + } + + public void reprocessSplit(String reprocessSplitId) { + addNewSplit(reprocessSplitId); + removeProcessedSplit(reprocessSplitId); + } + + public void finishProcessSplit(String processedSplitId) { + addProcessedSplit(processedSplitId); + removeRemainingSplit(processedSplitId); + } + + public void tryToMarkSnapshotEndTime() { + if (numSnapshotSplitsRemaining.get() == 0 + && (numSnapshotSplitsFinished.get() == numSnapshotSplitsProcessed.get())) { + // Mark the end time of snapshot when remained splits is zero and processed splits + // are all finished + snapshotEndTime = System.currentTimeMillis(); + } + } + + public void addFinishedSplits(Set finishedSplitIds) { + if (finishedSplitIds != null) { + for (String finishedSplitId : finishedSplitIds) { + addFinishedSplit(finishedSplitId); + } + } + } + + public void addFinishedSplit(String finishedSplitId) { + int chunkId = SnapshotSplit.extractChunkId(finishedSplitId); + if (!finishedSplitChunkIds.contains(chunkId)) { + finishedSplitChunkIds.add(chunkId); + numSnapshotSplitsFinished.getAndAdd(1); + tryToMarkSnapshotEndTime(); + LOGGER.info("add finished split: {}", finishedSplitId); + } + } + + public void removeFinishedSplit(String removeSplitId) { + int chunkId = SnapshotSplit.extractChunkId(removeSplitId); + if (finishedSplitChunkIds.contains(chunkId)) { + finishedSplitChunkIds.remove(chunkId); + numSnapshotSplitsFinished.getAndUpdate(num -> num - 1); + LOGGER.info("remove finished split: {}", removeSplitId); + } + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java index a1ead2f77c0..c597e6a0a39 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java @@ -20,16 +20,56 @@ import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.SourceReaderMetricGroup; import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.util.clock.SystemClock; + +import io.debezium.data.Envelope; +import io.debezium.relational.TableId; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getTableId; +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord; +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent; /** A collection class for handling metrics in {@link IncrementalSourceReader}. */ public class SourceReaderMetrics { + private static final Logger LOG = LoggerFactory.getLogger(SourceReaderMetrics.class); + public static final long UNDEFINED = -1; + // Metric group keys + public static final String NAMESPACE_GROUP_KEY = "namespace"; + public static final String SCHEMA_GROUP_KEY = "schema"; + public static final String TABLE_GROUP_KEY = "table"; + + // Metric names + public static final String NUM_SNAPSHOT_RECORDS = "numSnapshotRecords"; + public static final String NUM_INSERT_DML_RECORDS = "numInsertDMLRecords"; + public static final String NUM_UPDATE_DML_RECORDS = "numUpdateDMLRecords"; + public static final String NUM_DELETE_DML_RECORDS = "numDeleteDMLRecords"; + public static final String NUM_DDL_RECORDS = "numDDLRecords"; + public static final String CURRENT_EVENT_TIME_LAG = "currentEventTimeLag"; + private final SourceReaderMetricGroup metricGroup; + // Reader-level metrics + private final Counter snapshotCounter; + private final Counter insertCounter; + private final Counter updateCounter; + private final Counter deleteCounter; + private final Counter schemaChangeCounter; + + private final Map tableMetricsMap = new HashMap<>(); + /** * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the * record fetched into the source operator. @@ -38,15 +78,22 @@ public class SourceReaderMetrics { /** The total number of record that failed to consume, process or emit. */ private final Counter numRecordsInErrorsCounter; + /** The timestamp of the last record received. */ + private volatile long lastReceivedEventTime = UNDEFINED; public SourceReaderMetrics(SourceReaderMetricGroup metricGroup) { this.metricGroup = metricGroup; this.numRecordsInErrorsCounter = metricGroup.getNumRecordsInErrorsCounter(); - } - public void registerMetrics() { metricGroup.gauge( MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, (Gauge) this::getFetchDelay); + metricGroup.gauge(CURRENT_EVENT_TIME_LAG, this::getCurrentEventTimeLag); + + snapshotCounter = metricGroup.counter(NUM_SNAPSHOT_RECORDS); + insertCounter = metricGroup.counter(NUM_INSERT_DML_RECORDS); + updateCounter = metricGroup.counter(NUM_UPDATE_DML_RECORDS); + deleteCounter = metricGroup.counter(NUM_DELETE_DML_RECORDS); + schemaChangeCounter = metricGroup.counter(NUM_DDL_RECORDS); } public long getFetchDelay() { @@ -60,4 +107,150 @@ public void recordFetchDelay(long fetchDelay) { public void addNumRecordsInErrors(long delta) { this.numRecordsInErrorsCounter.inc(delta); } + + public void updateLastReceivedEventTime(Long eventTimestamp) { + if (eventTimestamp != null && eventTimestamp > 0L) { + lastReceivedEventTime = eventTimestamp; + } + } + + public void markRecord() { + try { + metricGroup.getIOMetricGroup().getNumRecordsInCounter().inc(); + } catch (Exception e) { + LOG.warn("Failed to update record counters.", e); + } + } + + public void updateRecordCounters(SourceRecord record) { + catchAndWarnLogAllExceptions( + () -> { + // Increase reader and table level input counters + if (isDataChangeRecord(record)) { + TableMetrics tableMetrics = getTableMetrics(getTableId(record)); + Envelope.Operation op = Envelope.operationFor(record); + switch (op) { + case READ: + snapshotCounter.inc(); + tableMetrics.markSnapshotRecord(); + break; + case CREATE: + insertCounter.inc(); + tableMetrics.markInsertRecord(); + break; + case DELETE: + deleteCounter.inc(); + tableMetrics.markDeleteRecord(); + break; + case UPDATE: + updateCounter.inc(); + tableMetrics.markUpdateRecord(); + break; + } + } else if (isSchemaChangeEvent(record)) { + schemaChangeCounter.inc(); + TableId tableId = getTableId(record); + if (tableId != null) { + getTableMetrics(tableId).markSchemaChangeRecord(); + } + } + }); + } + + private TableMetrics getTableMetrics(TableId tableId) { + return tableMetricsMap.computeIfAbsent( + tableId, + id -> new TableMetrics(id.catalog(), id.schema(), id.table(), metricGroup)); + } + + // ------------------------------- Helper functions ----------------------------- + + private void catchAndWarnLogAllExceptions(Runnable runnable) { + try { + runnable.run(); + } catch (Exception e) { + // Catch all exceptions as errors in metric handling should not fail the job + LOG.warn("Failed to update metrics", e); + } + } + + private long getCurrentEventTimeLag() { + if (lastReceivedEventTime == UNDEFINED) { + return UNDEFINED; + } + return SystemClock.getInstance().absoluteTimeMillis() - lastReceivedEventTime; + } + + // ----------------------------------- Helper classes -------------------------------- + + /** + * Collection class for managing metrics of a table. + * + *

Metrics of table level are registered in its corresponding subgroup under the {@link + * SourceReaderMetricGroup}. + */ + private static class TableMetrics { + // Snapshot + Stream + private final Counter recordsCounter; + + // Snapshot phase + private final Counter snapshotCounter; + + // Stream phase + private final Counter insertCounter; + private final Counter updateCounter; + private final Counter deleteCounter; + private final Counter schemaChangeCounter; + + public TableMetrics( + String databaseName, String schemaName, String tableName, MetricGroup parentGroup) { + databaseName = processNull(databaseName); + schemaName = processNull(schemaName); + tableName = processNull(tableName); + MetricGroup metricGroup = + parentGroup + .addGroup(NAMESPACE_GROUP_KEY, databaseName) + .addGroup(SCHEMA_GROUP_KEY, schemaName) + .addGroup(TABLE_GROUP_KEY, tableName); + recordsCounter = metricGroup.counter(MetricNames.IO_NUM_RECORDS_IN); + snapshotCounter = metricGroup.counter(NUM_SNAPSHOT_RECORDS); + insertCounter = metricGroup.counter(NUM_INSERT_DML_RECORDS); + updateCounter = metricGroup.counter(NUM_UPDATE_DML_RECORDS); + deleteCounter = metricGroup.counter(NUM_DELETE_DML_RECORDS); + schemaChangeCounter = metricGroup.counter(NUM_DDL_RECORDS); + } + + private String processNull(String name) { + if (StringUtils.isBlank(name)) { + // If null, convert to an empty string + return ""; + } + return name; + } + + public void markSnapshotRecord() { + recordsCounter.inc(); + snapshotCounter.inc(); + } + + public void markInsertRecord() { + recordsCounter.inc(); + insertCounter.inc(); + } + + public void markDeleteRecord() { + recordsCounter.inc(); + deleteCounter.inc(); + } + + public void markUpdateRecord() { + recordsCounter.inc(); + updateCounter.inc(); + } + + public void markSchemaChangeRecord() { + recordsCounter.inc(); + schemaChangeCounter.inc(); + } + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java index 0ca034bd1dd..25ebb8aaa45 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java @@ -152,6 +152,9 @@ public Offset getOffsetPosition(Map offset) { } protected void emitElement(SourceRecord element, SourceOutput output) throws Exception { + sourceReaderMetrics.markRecord(); + sourceReaderMetrics.updateRecordCounters(element); + outputCollector.output = output; outputCollector.currentMessageTimestamp = getMessageTimestamp(element); debeziumDeserializationSchema.deserialize(element, outputCollector); @@ -169,9 +172,10 @@ protected void reportMetrics(SourceRecord element) { } } - private static class OutputCollector implements Collector { - private SourceOutput output; - private Long currentMessageTimestamp; + /** An adapter between {@link SourceOutput} and {@link Collector}. */ + protected static class OutputCollector implements Collector { + public SourceOutput output; + public Long currentMessageTimestamp; @Override public void collect(T record) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java index 09f0f27c389..2755e0c3712 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java @@ -24,6 +24,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.history.HistoryRecord; import io.debezium.util.SchemaNameAdjuster; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -119,7 +120,11 @@ public static TableId getTableId(SourceRecord dataRecord) { Struct value = (Struct) dataRecord.value(); Struct source = value.getStruct(Envelope.FieldName.SOURCE); String dbName = source.getString(DATABASE_NAME_KEY); - String schemaName = source.getString(SCHEMA_NAME_KEY); + Field field = source.schema().field(SCHEMA_NAME_KEY); + String schemaName = null; + if (field != null) { + schemaName = source.getString(SCHEMA_NAME_KEY); + } String tableName = source.getString(TABLE_NAME_KEY); return new TableId(dbName, schemaName, tableName); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/MySqlSourceMetricsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/MySqlSourceMetricsTest.java new file mode 100644 index 00000000000..cbcebbc4626 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/MySqlSourceMetricsTest.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.connectors.base.experimental.MySqlSourceBuilder; +import org.apache.flink.cdc.connectors.base.source.MySqlEventDeserializer; +import org.apache.flink.cdc.connectors.base.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.base.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.base.testutils.UniqueDatabase; +import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.conversion.RowRowConverter; +import org.apache.flink.table.types.DataType; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CloseableIterator; + +import io.debezium.connector.mysql.MySqlConnection; +import io.debezium.jdbc.JdbcConnection; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.sql.SQLException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** MySQL Source Metrics Tests. */ +public class MySqlSourceMetricsTest { + + private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceMetricsTest.class); + + private static final int DEFAULT_PARALLELISM = 4; + private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7); + protected InMemoryReporter metricReporter = InMemoryReporter.createWithRetainedMetrics(); + + @Rule + public final MiniClusterWithClientResource miniClusterResource = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .setConfiguration( + metricReporter.addToConfiguration(new Configuration())) + .build()); + + @BeforeClass + public static void startContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + LOG.info("Containers are started."); + } + + private final UniqueDatabase inventoryDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "metrics", "mysqluser", "mysqlpw"); + + @Test + public void testSourceMetrics() throws Exception { + final DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("age", DataTypes.INT())); + + inventoryDatabase.createAndInitialize(); + final String tableId = inventoryDatabase.getDatabaseName() + ".users"; + MySqlSourceBuilder.MySqlIncrementalSource mySqlChangeEventSource = + new MySqlSourceBuilder() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList(tableId) + .username(inventoryDatabase.getUsername()) + .password(inventoryDatabase.getPassword()) + .serverId("5401-5404") + .deserializer(buildRowDataDebeziumDeserializeSchema()) + .includeSchemaChanges(true) // output the schema changes as well + .splitSize(2) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // enable checkpoint + env.enableCheckpointing(3000); + // set the source parallelism to 4 + CloseableIterator iterator = + env.fromSource( + mySqlChangeEventSource, + WatermarkStrategy.noWatermarks(), + "MySqlParallelSource") + .setParallelism(1) + .executeAndCollect(); // collect record + String[] snapshotExpectedRecords = + new String[] { + "+I[101, Tom, 3]", + "+I[102, Jack, 5]", + "+I[103, Allen, 10]", + "+I[104, Andrew, 13]", + "+I[105, Arnold, 15]", + "+I[106, Claud, 19]", + "+I[107, Howard, 37]", + "+I[108, Jacob, 46]", + "+I[109, Lionel, 58]" + }; + + // step-1: consume snapshot data + List snapshotRowDataList = new ArrayList<>(); + for (int i = 0; i < snapshotExpectedRecords.length && iterator.hasNext(); i++) { + snapshotRowDataList.add(iterator.next()); + } + + List snapshotActualRecords = formatResult(snapshotRowDataList, dataType); + assertEqualsInAnyOrder(Arrays.asList(snapshotExpectedRecords), snapshotActualRecords); + + // step-2: make 6 change events in one MySQL transaction + makeBinlogEvents(getConnection(), tableId); + // mock ddl events + makeDdlEvents(getConnection(), tableId); + + String[] binlogExpectedRecords = + new String[] { + "-U[103, Allen, 10]", + "+U[103, Oswald, 10]", + "+I[110, Terence, 78]", + "-D[110, Terence, 78]", + "-U[103, Oswald, 10]", + "+U[103, Marry, 10]" + }; + + // step-3: consume binlog change events + List binlogRowDataList = new ArrayList<>(); + for (int i = 0; i < 4 && iterator.hasNext(); i++) { + binlogRowDataList.add(iterator.next()); + } + List binlogActualRecords = formatResult(binlogRowDataList, dataType); + assertEqualsInAnyOrder(Arrays.asList(binlogExpectedRecords), binlogActualRecords); + + Set metricGroups = metricReporter.findGroups("users"); + for (MetricGroup enumeratorGroup : metricGroups) { + boolean isTableMetric = true; + for (String scopeComponent : enumeratorGroup.getScopeComponents()) { + if (scopeComponent.contains("enumerator")) { + isTableMetric = false; + break; + } + } + if (!isTableMetric) { + break; + } + Map enumeratorMetrics = + metricReporter.getMetricsByGroup(enumeratorGroup); + Assert.assertEquals( + 1, ((Counter) enumeratorMetrics.get("numDeleteDMLRecords")).getCount()); + Assert.assertEquals( + 1, ((Counter) enumeratorMetrics.get("numInsertDMLRecords")).getCount()); + Assert.assertEquals( + 9, ((Counter) enumeratorMetrics.get("numSnapshotRecords")).getCount()); + // ddl eventd + Assert.assertEquals(1, ((Counter) enumeratorMetrics.get("numDDLRecords")).getCount()); + Assert.assertEquals(13, ((Counter) enumeratorMetrics.get("numRecordsIn")).getCount()); + Assert.assertEquals( + 2, ((Counter) enumeratorMetrics.get("numUpdateDMLRecords")).getCount()); + } + Set enumeratorGroups = metricReporter.findGroups("enumerator"); + for (MetricGroup enumeratorGroup : enumeratorGroups) { + boolean isTableMetric = false; + for (String scopeComponent : enumeratorGroup.getScopeComponents()) { + if (scopeComponent.contains("users")) { + isTableMetric = true; + break; + } + } + Map enumeratorMetrics = + metricReporter.getMetricsByGroup(enumeratorGroup); + if (isTableMetric) { + Assert.assertEquals( + 0, + ((Gauge) enumeratorMetrics.get("numSnapshotSplitsRemaining")) + .getValue() + .intValue()); + Assert.assertEquals( + 5, + ((Gauge) enumeratorMetrics.get("numSnapshotSplitsProcessed")) + .getValue() + .intValue()); + Assert.assertEquals( + 5, + ((Gauge) enumeratorMetrics.get("numSnapshotSplitsFinished")) + .getValue() + .intValue()); + Assert.assertTrue( + ((Gauge) enumeratorMetrics.get("snapshotEndTime")) + .getValue() + .longValue() + > 0); + Assert.assertTrue( + ((Gauge) enumeratorMetrics.get("snapshotStartTime")) + .getValue() + .longValue() + > 0); + } else { + Assert.assertEquals( + 0, + ((Gauge) enumeratorMetrics.get("isSnapshotting")) + .getValue() + .intValue()); + Assert.assertEquals( + 1, + ((Gauge) enumeratorMetrics.get("isStreamReading")) + .getValue() + .intValue()); + Assert.assertEquals( + 1, + ((Gauge) enumeratorMetrics.get("numTablesSnapshotted")) + .getValue() + .intValue()); + Assert.assertEquals( + 0, + ((Gauge) enumeratorMetrics.get("numSnapshotSplitsRemaining")) + .getValue() + .intValue()); + Assert.assertEquals( + 5, + ((Gauge) enumeratorMetrics.get("numSnapshotSplitsProcessed")) + .getValue() + .intValue()); + } + } + // stop the worker + iterator.close(); + } + + private MySqlEventDeserializer buildRowDataDebeziumDeserializeSchema() { + MySqlEventDeserializer deserializer = + new MySqlEventDeserializer(DebeziumChangelogMode.ALL, true); + return deserializer; + } + + private List formatResult(List records, DataType dataType) { + RowRowConverter rowRowConverter = RowRowConverter.create(dataType); + rowRowConverter.open(Thread.currentThread().getContextClassLoader()); + return records.stream() + .flatMap( + item -> { + DataChangeEvent changeEvent = ((DataChangeEvent) item); + RecordData before = changeEvent.before(); + RecordData after = changeEvent.after(); + + switch (changeEvent.op()) { + case INSERT: + GenericRowData insertData = new GenericRowData(3); + insertData.setRowKind(RowKind.INSERT); + convertData(changeEvent.after(), insertData); + return Arrays.stream(new GenericRowData[] {insertData}); + case DELETE: + GenericRowData deleteData = null; + deleteData = new GenericRowData(3); + deleteData.setRowKind(RowKind.DELETE); + convertData(before, deleteData); + return Arrays.stream(new GenericRowData[] {deleteData}); + case UPDATE: + case REPLACE: + GenericRowData beforeData = new GenericRowData(3); + beforeData.setRowKind(RowKind.UPDATE_BEFORE); + convertData(before, beforeData); + + GenericRowData afterData = new GenericRowData(3); + afterData.setRowKind(RowKind.UPDATE_AFTER); + convertData(after, afterData); + return Stream.of(beforeData, afterData) + .filter(row -> row != null); + } + return Stream.empty(); + }) + .map(rowRowConverter::toExternal) + .map(Object::toString) + .collect(Collectors.toList()); + } + + private void convertData(RecordData inputData, GenericRowData outputData) { + outputData.setField(0, inputData.getLong(0)); + outputData.setField(1, StringData.fromString(inputData.getString(1).toString())); + outputData.setField(2, inputData.getInt(2)); + } + + private MySqlConnection getConnection() { + Map properties = new HashMap<>(); + properties.put("database.hostname", MYSQL_CONTAINER.getHost()); + properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + properties.put("database.user", inventoryDatabase.getUsername()); + properties.put("database.password", inventoryDatabase.getPassword()); + properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); + // properties.put("transaction.topic", "transaction_topic"); + io.debezium.config.Configuration configuration = + io.debezium.config.Configuration.from(properties); + return new MySqlConnection(new MySqlConnection.MySqlConnectionConfiguration(configuration)); + } + + private void makeBinlogEvents(JdbcConnection connection, String tableId) throws SQLException { + try { + connection.setAutoCommit(false); + + // make binlog events + connection.execute( + "UPDATE " + tableId + " SET name = 'Oswald' where id = 103", + "INSERT INTO " + tableId + " VALUES(110,'Terence',78)", + "DELETE FROM " + tableId + " where id = 110", + "UPDATE " + tableId + " SET name = 'Marry' where id = 103"); + connection.commit(); + } finally { + connection.close(); + } + } + + private void makeDdlEvents(JdbcConnection connection, String tableId) throws SQLException { + try { + connection.setAutoCommit(false); + // make binlog events + connection.execute("alter table " + tableId + " add test_add_col int null"); + connection.commit(); + } finally { + connection.close(); + } + } + + public static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + } + + private static MySqlContainer createMySqlContainer(MySqlVersion version) { + return new MySqlContainer(version) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/MySqlEventDeserializer.java new file mode 100644 index 00000000000..fa241a46c47 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/MySqlEventDeserializer.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.source; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.connectors.base.source.parser.CustomMySqlAntlrDdlParser; +import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema; +import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.debezium.data.Envelope; +import io.debezium.relational.Tables; +import io.debezium.relational.history.HistoryRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getHistoryRecord; + +/** Event deserializer for {@link MySqlDataSource}. */ +@Internal +public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { + + private static final long serialVersionUID = 1L; + + public static final String SCHEMA_CHANGE_EVENT_KEY_NAME = + "io.debezium.connector.mysql.SchemaChangeKey"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final boolean includeSchemaChanges; + + private transient Tables tables; + private transient CustomMySqlAntlrDdlParser customParser; + + public MySqlEventDeserializer( + DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) { + super(new MySqlSchemaDataTypeInference(), changelogMode); + this.includeSchemaChanges = includeSchemaChanges; + } + + @Override + protected List deserializeSchemaChangeRecord(SourceRecord record) { + if (includeSchemaChanges) { + if (customParser == null) { + customParser = new CustomMySqlAntlrDdlParser(); + tables = new Tables(); + } + + try { + HistoryRecord historyRecord = getHistoryRecord(record); + + String databaseName = + historyRecord.document().getString(HistoryRecord.Fields.DATABASE_NAME); + String ddl = + historyRecord.document().getString(HistoryRecord.Fields.DDL_STATEMENTS); + customParser.setCurrentDatabase(databaseName); + customParser.parse(ddl, tables); + return customParser.getAndClearParsedEvents(); + } catch (IOException e) { + throw new IllegalStateException("Failed to parse the schema change : " + record, e); + } + } + return Collections.emptyList(); + } + + @Override + protected boolean isDataChangeRecord(SourceRecord record) { + Schema valueSchema = record.valueSchema(); + Struct value = (Struct) record.value(); + return value != null + && valueSchema != null + && valueSchema.field(Envelope.FieldName.OPERATION) != null + && value.getString(Envelope.FieldName.OPERATION) != null; + } + + @Override + protected boolean isSchemaChangeRecord(SourceRecord record) { + Schema keySchema = record.keySchema(); + return keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()); + } + + @Override + protected TableId getTableId(SourceRecord record) { + String[] parts = record.topic().split("\\."); + return TableId.tableId(parts[1], parts[2]); + } + + @Override + protected Map getMetadata(SourceRecord record) { + return Collections.emptyMap(); + } + + @Override + protected Object convertToString(Object dbzObj, Schema schema) { + return BinaryStringData.fromString(dbzObj.toString()); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/MySqlSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/MySqlSchemaDataTypeInference.java new file mode 100644 index 00000000000..3042a61e4ea --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/MySqlSchemaDataTypeInference.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.source; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference; + +import io.debezium.data.geometry.Geometry; +import io.debezium.data.geometry.Point; +import org.apache.kafka.connect.data.Schema; + +/** {@link DataType} inference for MySQL debezium {@link Schema}. */ +@Internal +public class MySqlSchemaDataTypeInference extends DebeziumSchemaDataTypeInference { + + private static final long serialVersionUID = 1L; + + protected DataType inferStruct(Object value, Schema schema) { + // the Geometry datatype in MySQL will be converted to + // a String with Json format + if (Point.LOGICAL_NAME.equals(schema.name()) + || Geometry.LOGICAL_NAME.equals(schema.name())) { + return DataTypes.STRING(); + } else { + return super.inferStruct(value, schema); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java index 33852f99797..0ef107a2ec1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java @@ -21,6 +21,9 @@ import org.apache.flink.cdc.connectors.base.source.assigner.state.version5.HybridPendingSplitsStateVersion5; import org.apache.flink.cdc.connectors.base.source.assigner.state.version5.PendingSplitsStateSerializerVersion5; import org.apache.flink.cdc.connectors.base.source.assigner.state.version5.SnapshotPendingSplitsStateVersion5; +import org.apache.flink.cdc.connectors.base.source.assigner.state.version6.HybridPendingSplitsStateVersion6; +import org.apache.flink.cdc.connectors.base.source.assigner.state.version6.PendingSplitsStateSerializerVersion6; +import org.apache.flink.cdc.connectors.base.source.assigner.state.version6.SnapshotPendingSplitsStateVersion6; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory; import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit; @@ -57,14 +60,14 @@ public void testPendingSplitsStateSerializerAndDeserialize() throws IOException new PendingSplitsStateSerializer(constructSourceSplitSerializer()); PendingSplitsState streamSplitsStateAfter = pendingSplitsStateSerializer.deserializePendingSplitsState( - 6, pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore)); + 7, pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore)); Assert.assertEquals(streamPendingSplitsStateBefore, streamSplitsStateAfter); SnapshotPendingSplitsState snapshotPendingSplitsStateBefore = constructSnapshotPendingSplitsState(AssignerStatus.NEWLY_ADDED_ASSIGNING); PendingSplitsState snapshotPendingSplitsStateAfter = pendingSplitsStateSerializer.deserializePendingSplitsState( - 6, + 7, pendingSplitsStateSerializer.serialize(snapshotPendingSplitsStateBefore)); Assert.assertEquals(snapshotPendingSplitsStateBefore, snapshotPendingSplitsStateAfter); @@ -72,12 +75,12 @@ public void testPendingSplitsStateSerializerAndDeserialize() throws IOException new HybridPendingSplitsState(snapshotPendingSplitsStateBefore, false); PendingSplitsState hybridPendingSplitsStateAfter = pendingSplitsStateSerializer.deserializePendingSplitsState( - 6, pendingSplitsStateSerializer.serialize(hybridPendingSplitsStateBefore)); + 7, pendingSplitsStateSerializer.serialize(hybridPendingSplitsStateBefore)); Assert.assertEquals(hybridPendingSplitsStateBefore, hybridPendingSplitsStateAfter); } @Test - public void testPendingSplitsStateSerializerCompatibility() throws IOException { + public void testPendingSplitsStateSerializerCompatibilityVersion5() throws IOException { StreamPendingSplitsState streamPendingSplitsStateBefore = new StreamPendingSplitsState(true); PendingSplitsStateSerializer pendingSplitsStateSerializer = @@ -95,7 +98,7 @@ public void testPendingSplitsStateSerializerCompatibility() throws IOException { pendingSplitsStateSerializer.deserializePendingSplitsState( 5, PendingSplitsStateSerializerVersion5.serialize( - constructSnapshotPendingSplitsStateVersion4(false))); + constructSnapshotPendingSplitsStateVersion5(false))); Assert.assertEquals(expectedSnapshotSplitsState, snapshotPendingSplitsStateAfter); HybridPendingSplitsState expectedHybridPendingSplitsState = @@ -108,7 +111,46 @@ public void testPendingSplitsStateSerializerCompatibility() throws IOException { 5, PendingSplitsStateSerializerVersion5.serialize( new HybridPendingSplitsStateVersion5( - constructSnapshotPendingSplitsStateVersion4(true), false))); + constructSnapshotPendingSplitsStateVersion5(true), false))); + Assert.assertEquals(expectedHybridPendingSplitsState, hybridPendingSplitsStateAfter); + } + + @Test + public void testPendingSplitsStateSerializerCompatibilityVersion6() throws IOException { + StreamPendingSplitsState streamPendingSplitsStateBefore = + new StreamPendingSplitsState(true); + PendingSplitsStateSerializer pendingSplitsStateSerializer = + new PendingSplitsStateSerializer(constructSourceSplitSerializer()); + PendingSplitsState streamSplitsStateAfter = + pendingSplitsStateSerializer.deserializePendingSplitsState( + 6, + PendingSplitsStateSerializerVersion6.serialize( + streamPendingSplitsStateBefore)); + Assert.assertEquals(streamPendingSplitsStateBefore, streamSplitsStateAfter); + + SnapshotPendingSplitsState expectedSnapshotSplitsState = + constructSnapshotPendingSplitsState(AssignerStatus.INITIAL_ASSIGNING); + PendingSplitsState snapshotPendingSplitsStateAfter = + pendingSplitsStateSerializer.deserializePendingSplitsState( + 6, + PendingSplitsStateSerializerVersion6.serialize( + constructSnapshotPendingSplitsStateVersion6( + AssignerStatus.INITIAL_ASSIGNING))); + Assert.assertEquals(expectedSnapshotSplitsState, snapshotPendingSplitsStateAfter); + + HybridPendingSplitsState expectedHybridPendingSplitsState = + new HybridPendingSplitsState( + constructSnapshotPendingSplitsState( + AssignerStatus.INITIAL_ASSIGNING_FINISHED), + false); + PendingSplitsState hybridPendingSplitsStateAfter = + pendingSplitsStateSerializer.deserializePendingSplitsState( + 6, + PendingSplitsStateSerializerVersion6.serialize( + new HybridPendingSplitsStateVersion6( + constructSnapshotPendingSplitsStateVersion6( + AssignerStatus.INITIAL_ASSIGNING_FINISHED), + false))); Assert.assertEquals(expectedHybridPendingSplitsState, hybridPendingSplitsStateAfter); } @@ -181,10 +223,11 @@ private SnapshotPendingSplitsState constructSnapshotPendingSplitsState( assignerStatus, Arrays.asList(TableId.parse("catalog2.schema2.table2")), true, - true); + true, + new HashMap<>()); } - private SnapshotPendingSplitsStateVersion5 constructSnapshotPendingSplitsStateVersion4( + private SnapshotPendingSplitsStateVersion5 constructSnapshotPendingSplitsStateVersion5( boolean isAssignerFinished) { SchemalessSnapshotSplit schemalessSnapshotSplit = constuctSchemalessSnapshotSplit(); Map assignedSplits = new HashMap<>(); @@ -206,6 +249,28 @@ private SnapshotPendingSplitsStateVersion5 constructSnapshotPendingSplitsStateVe true); } + private SnapshotPendingSplitsStateVersion6 constructSnapshotPendingSplitsStateVersion6( + AssignerStatus assignerStatus) { + SchemalessSnapshotSplit schemalessSnapshotSplit = constuctSchemalessSnapshotSplit(); + Map assignedSplits = new HashMap<>(); + assignedSplits.put(tableId.toQuotedString('`'), schemalessSnapshotSplit); + Map tableSchemas = new HashMap<>(); + tableSchemas.put( + tableId, + new TableChanges.TableChange( + TableChanges.TableChangeType.CREATE, createTable(tableId))); + return new SnapshotPendingSplitsStateVersion6( + Arrays.asList(tableId), + Arrays.asList(schemalessSnapshotSplit), + assignedSplits, + tableSchemas, + new HashMap<>(), + assignerStatus, + Arrays.asList(TableId.parse("catalog2.schema2.table2")), + true, + true); + } + private static Table createTable(TableId id) { TableEditor editor = Table.editor().tableId(id).setDefaultCharsetName("UTF8"); editor.setComment("comment"); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/version6/HybridPendingSplitsStateVersion6.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/version6/HybridPendingSplitsStateVersion6.java new file mode 100644 index 00000000000..de39156c4d8 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/version6/HybridPendingSplitsStateVersion6.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.source.assigner.state.version6; + +import org.apache.flink.cdc.connectors.base.source.assigner.state.PendingSplitsState; + +/** The 6th version of HybridPendingSplitsState. */ +public class HybridPendingSplitsStateVersion6 extends PendingSplitsState { + private final SnapshotPendingSplitsStateVersion6 snapshotPendingSplits; + private final boolean isStreamSplitAssigned; + + public HybridPendingSplitsStateVersion6( + SnapshotPendingSplitsStateVersion6 snapshotPendingSplits, + boolean isStreamSplitAssigned) { + this.snapshotPendingSplits = snapshotPendingSplits; + this.isStreamSplitAssigned = isStreamSplitAssigned; + } + + public SnapshotPendingSplitsStateVersion6 getSnapshotPendingSplits() { + return snapshotPendingSplits; + } + + public boolean isStreamSplitAssigned() { + return isStreamSplitAssigned; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/version6/PendingSplitsStateSerializerVersion6.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/version6/PendingSplitsStateSerializerVersion6.java new file mode 100644 index 00000000000..1e75e0543df --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/version6/PendingSplitsStateSerializerVersion6.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.source.assigner.state.version6; + +import org.apache.flink.cdc.connectors.base.source.assigner.state.PendingSplitsState; +import org.apache.flink.cdc.connectors.base.source.assigner.state.StreamPendingSplitsState; +import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; +import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit; +import org.apache.flink.cdc.connectors.base.source.meta.split.version4.LegacySourceSplitSerializierVersion4; +import org.apache.flink.cdc.connectors.base.utils.SerializerUtils; +import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; + +import io.debezium.document.DocumentWriter; +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** The 6th version of PendingSplitsStateSerializer. */ +public class PendingSplitsStateSerializerVersion6 { + + private static final ThreadLocal SERIALIZER_CACHE = + ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); + + private static final int SNAPSHOT_PENDING_SPLITS_STATE_FLAG = 1; + private static final int STREAM_PENDING_SPLITS_STATE_FLAG = 2; + private static final int HYBRID_PENDING_SPLITS_STATE_FLAG = 3; + + public static byte[] serialize(PendingSplitsState state) throws IOException { + final DataOutputSerializer out = SERIALIZER_CACHE.get(); + out.writeInt(5); + + if (state instanceof SnapshotPendingSplitsStateVersion6) { + out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG); + serializeSnapshotPendingSplitsState((SnapshotPendingSplitsStateVersion6) state, out); + } else if (state instanceof HybridPendingSplitsStateVersion6) { + out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG); + serializeHybridPendingSplitsState((HybridPendingSplitsStateVersion6) state, out); + } else if (state instanceof StreamPendingSplitsState) { + out.writeInt(STREAM_PENDING_SPLITS_STATE_FLAG); + serializeStreamPendingSplitsState((StreamPendingSplitsState) state, out); + } + + final byte[] result = out.getCopyOfBuffer(); + out.clear(); + return result; + } + + private static void serializeHybridPendingSplitsState( + HybridPendingSplitsStateVersion6 state, DataOutputSerializer out) throws IOException { + serializeSnapshotPendingSplitsState(state.getSnapshotPendingSplits(), out); + out.writeBoolean(state.isStreamSplitAssigned()); + } + + private static void serializeSnapshotPendingSplitsState( + SnapshotPendingSplitsStateVersion6 state, DataOutputSerializer out) throws IOException { + writeTableIds(state.getAlreadyProcessedTables(), out); + writeRemainingSplits(state.getRemainingSplits(), out); + writeAssignedSnapshotSplits(state.getAssignedSplits(), out); + writeFinishedOffsets(state.getSplitFinishedOffsets(), out); + out.writeInt(state.getSnapshotAssignerStatus().getStatusCode()); + writeTableIds(state.getRemainingTables(), out); + out.writeBoolean(state.isTableIdCaseSensitive()); + writeTableSchemas(state.getTableSchemas(), out); + } + + private static void serializeStreamPendingSplitsState( + StreamPendingSplitsState state, DataOutputSerializer out) throws IOException { + out.writeBoolean(state.isStreamSplitAssigned()); + } + + private static void writeTableIds(Collection tableIds, DataOutputSerializer out) + throws IOException { + final int size = tableIds.size(); + out.writeInt(size); + for (TableId tableId : tableIds) { + boolean useCatalogBeforeSchema = SerializerUtils.shouldUseCatalogBeforeSchema(tableId); + out.writeBoolean(useCatalogBeforeSchema); + out.writeUTF(tableId.toString()); + } + } + + private static void writeRemainingSplits( + List remainingSplits, DataOutputSerializer out) + throws IOException { + final int size = remainingSplits.size(); + out.writeInt(size); + for (SchemalessSnapshotSplit split : remainingSplits) { + byte[] splitBytes = LegacySourceSplitSerializierVersion4.serialize(split); + out.writeInt(splitBytes.length); + out.write(splitBytes); + } + } + + private static void writeAssignedSnapshotSplits( + Map assignedSplits, DataOutputSerializer out) + throws IOException { + final int size = assignedSplits.size(); + out.writeInt(size); + for (Map.Entry entry : assignedSplits.entrySet()) { + out.writeUTF(entry.getKey()); + byte[] splitBytes = LegacySourceSplitSerializierVersion4.serialize(entry.getValue()); + out.writeInt(splitBytes.length); + out.write(splitBytes); + } + } + + private static void writeFinishedOffsets( + Map splitsInfo, DataOutputSerializer out) throws IOException { + final int size = splitsInfo.size(); + out.writeInt(size); + for (Map.Entry splitInfo : splitsInfo.entrySet()) { + out.writeUTF(splitInfo.getKey()); + LegacySourceSplitSerializierVersion4.writeOffsetPosition(splitInfo.getValue(), out); + } + } + + private static void writeTableSchemas( + Map tableSchemas, DataOutputSerializer out) + throws IOException { + FlinkJsonTableChangeSerializer jsonSerializer = new FlinkJsonTableChangeSerializer(); + DocumentWriter documentWriter = DocumentWriter.defaultWriter(); + final int size = tableSchemas.size(); + out.writeInt(size); + for (Map.Entry entry : tableSchemas.entrySet()) { + boolean useCatalogBeforeSchema = + SerializerUtils.shouldUseCatalogBeforeSchema(entry.getKey()); + out.writeBoolean(useCatalogBeforeSchema); + out.writeUTF(entry.getKey().toString()); + final String tableChangeStr = + documentWriter.write(jsonSerializer.toDocument(entry.getValue())); + final byte[] tableChangeBytes = tableChangeStr.getBytes(StandardCharsets.UTF_8); + out.writeInt(tableChangeBytes.length); + out.write(tableChangeBytes); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/version6/SnapshotPendingSplitsStateVersion6.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/version6/SnapshotPendingSplitsStateVersion6.java new file mode 100644 index 00000000000..0ac12e44e29 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/version6/SnapshotPendingSplitsStateVersion6.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.source.assigner.state.version6; + +import org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus; +import org.apache.flink.cdc.connectors.base.source.assigner.state.PendingSplitsState; +import org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator; +import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; +import org.apache.flink.cdc.connectors.base.source.meta.split.SchemalessSnapshotSplit; +import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader; + +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; + +import java.util.List; +import java.util.Map; + +/** + * The 6th version of PendingSplitsStateSerializer. The modification of the 7th version: Add + * splitFinishedCheckpointIds variables. + */ +public class SnapshotPendingSplitsStateVersion6 extends PendingSplitsState { + + /** The tables in the checkpoint. */ + private final List remainingTables; + + /** + * The paths that are no longer in the enumerator checkpoint, but have been processed before and + * should this be ignored. Relevant only for sources in continuous monitoring mode. + */ + private final List alreadyProcessedTables; + + /** The splits in the checkpoint. */ + private final List remainingSplits; + + /** + * The snapshot splits that the {@link IncrementalSourceEnumerator} has assigned to {@link + * IncrementalSourceSplitReader}s. + */ + private final Map assignedSplits; + + /* The {@link AssignerStatus} that indicates the snapshot assigner status. */ + private final AssignerStatus assignerStatus; + + /** + * The offsets of finished (snapshot) splits that the {@link IncrementalSourceEnumerator} has + * received from {@link IncrementalSourceSplitReader}s. + */ + private final Map splitFinishedOffsets; + + /** Whether the table identifier is case sensitive. */ + private final boolean isTableIdCaseSensitive; + + /** Whether the remaining tables are keep when snapshot state. */ + private final boolean isRemainingTablesCheckpointed; + + private final Map tableSchemas; + + public SnapshotPendingSplitsStateVersion6( + List alreadyProcessedTables, + List remainingSplits, + Map assignedSplits, + Map tableSchemas, + Map splitFinishedOffsets, + AssignerStatus assignerStatus, + List remainingTables, + boolean isTableIdCaseSensitive, + boolean isRemainingTablesCheckpointed) { + this.alreadyProcessedTables = alreadyProcessedTables; + this.remainingSplits = remainingSplits; + this.assignedSplits = assignedSplits; + this.splitFinishedOffsets = splitFinishedOffsets; + this.assignerStatus = assignerStatus; + this.remainingTables = remainingTables; + this.isTableIdCaseSensitive = isTableIdCaseSensitive; + this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; + this.tableSchemas = tableSchemas; + } + + public List getAlreadyProcessedTables() { + return alreadyProcessedTables; + } + + public List getRemainingSplits() { + return remainingSplits; + } + + public Map getAssignedSplits() { + return assignedSplits; + } + + public Map getTableSchemas() { + return tableSchemas; + } + + public Map getSplitFinishedOffsets() { + return splitFinishedOffsets; + } + + public List getRemainingTables() { + return remainingTables; + } + + public boolean isTableIdCaseSensitive() { + return isTableIdCaseSensitive; + } + + public boolean isRemainingTablesCheckpointed() { + return isRemainingTablesCheckpointed; + } + + public AssignerStatus getSnapshotAssignerStatus() { + return assignerStatus; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/parser/CustomAlterTableParserListener.java new file mode 100644 index 00000000000..ac051724738 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/parser/CustomAlterTableParserListener.java @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.source.parser; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TruncateTableEvent; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.connector.mysql.antlr.listener.AlterTableParserListener; +import io.debezium.ddl.parser.mysql.generated.MySqlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener; +import io.debezium.relational.Column; +import io.debezium.relational.ColumnEditor; +import io.debezium.relational.TableEditor; +import io.debezium.relational.TableId; +import org.antlr.v4.runtime.tree.ParseTreeListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.cdc.connectors.base.experimental.utils.MySqlTypeUtils.fromDbzColumn; + +/** Copied from {@link AlterTableParserListener} in Debezium 1.9.8.Final. */ +public class CustomAlterTableParserListener extends MySqlParserBaseListener { + + private static final int STARTING_INDEX = 1; + + private static final Logger LOG = LoggerFactory.getLogger(CustomAlterTableParserListener.class); + + private final MySqlAntlrDdlParser parser; + private final List listeners; + private final LinkedList changes; + private org.apache.flink.cdc.common.event.TableId currentTable; + private List columnEditors; + private CustomColumnDefinitionParserListener columnDefinitionListener; + private TableEditor tableEditor; + + private int parsingColumnIndex = STARTING_INDEX; + + public CustomAlterTableParserListener( + MySqlAntlrDdlParser parser, + List listeners, + LinkedList changes) { + this.parser = parser; + this.listeners = listeners; + this.changes = changes; + } + + @Override + public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) { + TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId()); + if (parser.databaseTables().forTable(tableId) == null) { + tableEditor = parser.databaseTables().editOrCreateTable(tableId); + } + super.enterColumnCreateTable(ctx); + } + + @Override + public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) { + parser.runIfNotNull( + () -> { + // Make sure that the table's character set has been set ... + if (!tableEditor.hasDefaultCharsetName()) { + tableEditor.setDefaultCharsetName( + parser.charsetForTable(tableEditor.tableId())); + } + listeners.remove(columnDefinitionListener); + columnDefinitionListener = null; + // remove column definition parser listener + final String defaultCharsetName = tableEditor.create().defaultCharsetName(); + tableEditor.setColumns( + tableEditor.columns().stream() + .map( + column -> { + final ColumnEditor columnEditor = column.edit(); + if (columnEditor.charsetNameOfTable() == null) { + columnEditor.charsetNameOfTable( + defaultCharsetName); + } + return columnEditor; + }) + .map(ColumnEditor::create) + .collect(Collectors.toList())); + parser.databaseTables().overwriteTable(tableEditor.create()); + parser.signalCreateTable(tableEditor.tableId(), ctx); + + Schema.Builder builder = Schema.newBuilder(); + tableEditor.columns().forEach(column -> builder.column(toCdcColumn(column))); + if (tableEditor.hasPrimaryKey()) { + builder.primaryKey(tableEditor.primaryKeyColumnNames()); + } + changes.add( + new CreateTableEvent( + toCdcTableId(tableEditor.tableId()), builder.build())); + }, + tableEditor); + super.exitColumnCreateTable(ctx); + } + + @Override + public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) { + parser.runIfNotNull( + () -> { + String columnName = parser.parseName(ctx.uid()); + ColumnEditor columnEditor = Column.editor().name(columnName); + if (columnDefinitionListener == null) { + columnDefinitionListener = + new CustomColumnDefinitionParserListener( + tableEditor, columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + } else { + columnDefinitionListener.setColumnEditor(columnEditor); + } + }, + tableEditor); + super.enterColumnDeclaration(ctx); + } + + @Override + public void exitColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) { + parser.runIfNotNull( + () -> { + tableEditor.addColumn(columnDefinitionListener.getColumn()); + }, + tableEditor, + columnDefinitionListener); + super.exitColumnDeclaration(ctx); + } + + @Override + public void enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraintContext ctx) { + parser.runIfNotNull( + () -> { + parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor); + }, + tableEditor); + super.enterPrimaryKeyTableConstraint(ctx); + } + + @Override + public void enterUniqueKeyTableConstraint(MySqlParser.UniqueKeyTableConstraintContext ctx) { + parser.runIfNotNull( + () -> { + if (!tableEditor.hasPrimaryKey()) { + parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor); + } + }, + tableEditor); + super.enterUniqueKeyTableConstraint(ctx); + } + + @Override + public void enterAlterTable(MySqlParser.AlterTableContext ctx) { + this.currentTable = toCdcTableId(parser.parseQualifiedTableId(ctx.tableName().fullId())); + super.enterAlterTable(ctx); + } + + @Override + public void exitAlterTable(MySqlParser.AlterTableContext ctx) { + listeners.remove(columnDefinitionListener); + super.exitAlterTable(ctx); + this.currentTable = null; + } + + @Override + public void enterAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) { + String columnName = parser.parseName(ctx.uid(0)); + ColumnEditor columnEditor = Column.editor().name(columnName); + columnDefinitionListener = + new CustomColumnDefinitionParserListener( + tableEditor, columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + super.exitAlterByAddColumn(ctx); + } + + @Override + public void exitAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) { + parser.runIfNotNull( + () -> { + Column column = columnDefinitionListener.getColumn(); + if (ctx.FIRST() != null) { + changes.add( + new AddColumnEvent( + currentTable, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + toCdcColumn(column), + AddColumnEvent.ColumnPosition.FIRST, + null)))); + } else if (ctx.AFTER() != null) { + String afterColumn = parser.parseName(ctx.uid(1)); + changes.add( + new AddColumnEvent( + currentTable, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + toCdcColumn(column), + AddColumnEvent.ColumnPosition.AFTER, + afterColumn)))); + } else { + changes.add( + new AddColumnEvent( + currentTable, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + toCdcColumn(column))))); + } + listeners.remove(columnDefinitionListener); + }, + columnDefinitionListener); + super.exitAlterByAddColumn(ctx); + } + + @Override + public void enterAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) { + // multiple columns are added. Initialize a list of column editors for them + columnEditors = new ArrayList<>(ctx.uid().size()); + for (MySqlParser.UidContext uidContext : ctx.uid()) { + String columnName = parser.parseName(uidContext); + columnEditors.add(Column.editor().name(columnName)); + } + columnDefinitionListener = + new CustomColumnDefinitionParserListener( + tableEditor, columnEditors.get(0), parser, listeners); + listeners.add(columnDefinitionListener); + super.enterAlterByAddColumns(ctx); + } + + @Override + public void exitColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) { + parser.runIfNotNull( + () -> { + if (columnEditors != null) { + // column editor list is not null when a multiple columns are parsed in one + // statement + if (columnEditors.size() > parsingColumnIndex) { + // assign next column editor to parse another column definition + columnDefinitionListener.setColumnEditor( + columnEditors.get(parsingColumnIndex++)); + } + } + }, + columnEditors); + super.exitColumnDefinition(ctx); + } + + @Override + public void exitAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) { + parser.runIfNotNull( + () -> { + List addedColumns = new ArrayList<>(); + columnEditors.forEach( + columnEditor -> { + Column column = columnEditor.create(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition(toCdcColumn(column))); + }); + changes.add(new AddColumnEvent(currentTable, addedColumns)); + listeners.remove(columnDefinitionListener); + columnEditors = null; + parsingColumnIndex = STARTING_INDEX; + }, + columnEditors); + super.exitAlterByAddColumns(ctx); + } + + @Override + public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) { + String oldColumnName = parser.parseName(ctx.oldColumn); + ColumnEditor columnEditor = Column.editor().name(oldColumnName); + columnEditor.unsetDefaultValueExpression(); + + columnDefinitionListener = + new CustomColumnDefinitionParserListener( + tableEditor, columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + super.enterAlterByChangeColumn(ctx); + } + + @Override + public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) { + parser.runIfNotNull( + () -> { + Column column = columnDefinitionListener.getColumn(); + String newColumnName = parser.parseName(ctx.newColumn); + + Map typeMapping = new HashMap<>(); + typeMapping.put(column.name(), convertDataType(fromDbzColumn(column))); + changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); + + if (newColumnName != null && !column.name().equalsIgnoreCase(newColumnName)) { + Map renameMap = new HashMap<>(); + renameMap.put(column.name(), newColumnName); + changes.add(new RenameColumnEvent(currentTable, renameMap)); + } + listeners.remove(columnDefinitionListener); + }, + columnDefinitionListener); + super.exitAlterByChangeColumn(ctx); + } + + private DataType convertDataType(org.apache.flink.table.types.DataType dataType) { + if (dataType.getLogicalType().is(LogicalTypeRoot.INTEGER)) { + return DataTypes.INT(); + } + if (dataType.getLogicalType().is(LogicalTypeRoot.BIGINT)) { + return DataTypes.BIGINT(); + } + if (dataType.getLogicalType().is(LogicalTypeRoot.FLOAT)) { + return DataTypes.FLOAT(); + } + if (dataType.getLogicalType().is(LogicalTypeRoot.DATE)) { + return DataTypes.DATE(); + } + return DataTypes.STRING(); + } + + @Override + public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext ctx) { + String removedColName = parser.parseName(ctx.uid()); + changes.add(new DropColumnEvent(currentTable, Collections.singletonList(removedColName))); + super.enterAlterByDropColumn(ctx); + } + + @Override + public void enterAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) { + String oldColumnName = parser.parseName(ctx.oldColumn); + ColumnEditor columnEditor = Column.editor().name(oldColumnName); + columnDefinitionListener = + new CustomColumnDefinitionParserListener( + tableEditor, columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + super.enterAlterByRenameColumn(ctx); + } + + @Override + public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) { + String oldColumnName = parser.parseName(ctx.uid(0)); + ColumnEditor columnEditor = Column.editor().name(oldColumnName); + columnEditor.unsetDefaultValueExpression(); + + columnDefinitionListener = + new CustomColumnDefinitionParserListener( + tableEditor, columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + super.enterAlterByModifyColumn(ctx); + } + + @Override + public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) { + parser.runIfNotNull( + () -> { + Column column = columnDefinitionListener.getColumn(); + Map typeMapping = new HashMap<>(); + typeMapping.put(column.name(), convertDataType(fromDbzColumn(column))); + changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); + listeners.remove(columnDefinitionListener); + }, + columnDefinitionListener); + super.exitAlterByModifyColumn(ctx); + } + + @Override + public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) { + parser.runIfNotNull( + () -> { + Column column = columnDefinitionListener.getColumn(); + String newColumnName = parser.parseName(ctx.newColumn); + if (newColumnName != null && !column.name().equalsIgnoreCase(newColumnName)) { + Map renameMap = new HashMap<>(); + renameMap.put(column.name(), newColumnName); + changes.add(new RenameColumnEvent(currentTable, renameMap)); + } + listeners.remove(columnDefinitionListener); + }, + columnDefinitionListener); + super.exitAlterByRenameColumn(ctx); + } + + @Override + public void exitTruncateTable(MySqlParser.TruncateTableContext ctx) { + TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId()); + changes.add(new TruncateTableEvent(toCdcTableId(tableId))); + super.exitTruncateTable(ctx); + } + + @Override + public void exitDropTable(MySqlParser.DropTableContext ctx) { + ctx.tables() + .tableName() + .forEach( + evt -> { + TableId tableId = parser.parseQualifiedTableId(evt.fullId()); + changes.add(new DropTableEvent(toCdcTableId(tableId))); + }); + super.exitDropTable(ctx); + } + + private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) { + return org.apache.flink.cdc.common.schema.Column.physicalColumn( + dbzColumn.name(), + convertDataType(fromDbzColumn(dbzColumn)), + dbzColumn.comment(), + dbzColumn.defaultValueExpression().orElse(null)); + } + + private org.apache.flink.cdc.common.event.TableId toCdcTableId(TableId dbzTableId) { + return org.apache.flink.cdc.common.event.TableId.tableId( + dbzTableId.catalog(), dbzTableId.table()); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/parser/CustomColumnDefinitionParserListener.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/parser/CustomColumnDefinitionParserListener.java new file mode 100644 index 00000000000..1c7045d2934 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/parser/CustomColumnDefinitionParserListener.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.source.parser; + +import io.debezium.antlr.AntlrDdlParser; +import io.debezium.antlr.DataTypeResolver; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.connector.mysql.antlr.listener.DefaultValueParserListener; +import io.debezium.ddl.parser.mysql.generated.MySqlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener; +import io.debezium.relational.Column; +import io.debezium.relational.ColumnEditor; +import io.debezium.relational.TableEditor; +import io.debezium.relational.ddl.DataType; +import io.debezium.util.Strings; +import org.antlr.v4.runtime.tree.ParseTreeListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Types; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** Parser listener that is parsing column definition part of MySQL statements. */ +public class CustomColumnDefinitionParserListener extends MySqlParserBaseListener { + + private static final Logger LOGGER = + LoggerFactory.getLogger(CustomColumnDefinitionParserListener.class); + + private static final Pattern DOT = Pattern.compile("\\."); + private final MySqlAntlrDdlParser parser; + private final DataTypeResolver dataTypeResolver; + private ColumnEditor columnEditor; + private boolean uniqueColumn; + private AtomicReference optionalColumn = new AtomicReference<>(); + private DefaultValueParserListener defaultValueListener; + private final TableEditor tableEditor; + + private final List listeners; + + public CustomColumnDefinitionParserListener( + TableEditor tableEditor, + ColumnEditor columnEditor, + MySqlAntlrDdlParser parser, + List listeners) { + this.tableEditor = tableEditor; + this.columnEditor = columnEditor; + this.parser = parser; + this.dataTypeResolver = parser.dataTypeResolver(); + this.listeners = listeners; + } + + public void setColumnEditor(ColumnEditor columnEditor) { + this.columnEditor = columnEditor; + } + + public ColumnEditor getColumnEditor() { + return columnEditor; + } + + public Column getColumn() { + return columnEditor.create(); + } + + @Override + public void enterColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) { + uniqueColumn = false; + optionalColumn = new AtomicReference<>(); + resolveColumnDataType(ctx.dataType()); + defaultValueListener = new DefaultValueParserListener(columnEditor, optionalColumn); + listeners.add(defaultValueListener); + super.enterColumnDefinition(ctx); + } + + @Override + public void exitColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) { + if (optionalColumn.get() != null) { + columnEditor.optional(optionalColumn.get().booleanValue()); + } + defaultValueListener.exitDefaultValue(false); + listeners.remove(defaultValueListener); + super.exitColumnDefinition(ctx); + } + + @Override + public void enterUniqueKeyColumnConstraint(MySqlParser.UniqueKeyColumnConstraintContext ctx) { + uniqueColumn = true; + super.enterUniqueKeyColumnConstraint(ctx); + } + + @Override + public void enterPrimaryKeyColumnConstraint(MySqlParser.PrimaryKeyColumnConstraintContext ctx) { + // this rule will be parsed only if no primary key is set in a table + // otherwise the statement can't be executed due to multiple primary key error + optionalColumn.set(Boolean.FALSE); + tableEditor.addColumn(columnEditor.create()); + tableEditor.setPrimaryKeyNames(columnEditor.name()); + super.enterPrimaryKeyColumnConstraint(ctx); + } + + @Override + public void enterCommentColumnConstraint(MySqlParser.CommentColumnConstraintContext ctx) { + if (!parser.skipComments()) { + if (ctx.STRING_LITERAL() != null) { + columnEditor.comment(parser.withoutQuotes(ctx.STRING_LITERAL().getText())); + } + } + super.enterCommentColumnConstraint(ctx); + } + + @Override + public void enterNullNotnull(MySqlParser.NullNotnullContext ctx) { + optionalColumn.set(Boolean.valueOf(ctx.NOT() == null)); + super.enterNullNotnull(ctx); + } + + @Override + public void enterAutoIncrementColumnConstraint( + MySqlParser.AutoIncrementColumnConstraintContext ctx) { + columnEditor.autoIncremented(true); + columnEditor.generated(true); + super.enterAutoIncrementColumnConstraint(ctx); + } + + @Override + public void enterSerialDefaultColumnConstraint( + MySqlParser.SerialDefaultColumnConstraintContext ctx) { + serialColumn(); + super.enterSerialDefaultColumnConstraint(ctx); + } + + private void resolveColumnDataType(MySqlParser.DataTypeContext dataTypeContext) { + String charsetName = null; + DataType dataType = dataTypeResolver.resolveDataType(dataTypeContext); + + if (dataTypeContext instanceof MySqlParser.StringDataTypeContext) { + // Same as LongVarcharDataTypeContext but with dimension handling + MySqlParser.StringDataTypeContext stringDataTypeContext = + (MySqlParser.StringDataTypeContext) dataTypeContext; + + if (stringDataTypeContext.lengthOneDimension() != null) { + Integer length = + parseLength( + stringDataTypeContext + .lengthOneDimension() + .decimalLiteral() + .getText()); + columnEditor.length(length); + } + + charsetName = + parser.extractCharset( + stringDataTypeContext.charsetName(), + stringDataTypeContext.collationName()); + } else if (dataTypeContext instanceof MySqlParser.LongVarcharDataTypeContext) { + // Same as StringDataTypeContext but without dimension handling + MySqlParser.LongVarcharDataTypeContext longVarcharTypeContext = + (MySqlParser.LongVarcharDataTypeContext) dataTypeContext; + + charsetName = + parser.extractCharset( + longVarcharTypeContext.charsetName(), + longVarcharTypeContext.collationName()); + } else if (dataTypeContext instanceof MySqlParser.NationalStringDataTypeContext) { + MySqlParser.NationalStringDataTypeContext nationalStringDataTypeContext = + (MySqlParser.NationalStringDataTypeContext) dataTypeContext; + + if (nationalStringDataTypeContext.lengthOneDimension() != null) { + Integer length = + parseLength( + nationalStringDataTypeContext + .lengthOneDimension() + .decimalLiteral() + .getText()); + columnEditor.length(length); + } + } else if (dataTypeContext instanceof MySqlParser.NationalVaryingStringDataTypeContext) { + MySqlParser.NationalVaryingStringDataTypeContext nationalVaryingStringDataTypeContext = + (MySqlParser.NationalVaryingStringDataTypeContext) dataTypeContext; + + if (nationalVaryingStringDataTypeContext.lengthOneDimension() != null) { + Integer length = + parseLength( + nationalVaryingStringDataTypeContext + .lengthOneDimension() + .decimalLiteral() + .getText()); + columnEditor.length(length); + } + } else if (dataTypeContext instanceof MySqlParser.DimensionDataTypeContext) { + MySqlParser.DimensionDataTypeContext dimensionDataTypeContext = + (MySqlParser.DimensionDataTypeContext) dataTypeContext; + + Integer length = null; + Integer scale = null; + if (dimensionDataTypeContext.lengthOneDimension() != null) { + length = + parseLength( + dimensionDataTypeContext + .lengthOneDimension() + .decimalLiteral() + .getText()); + } + + if (dimensionDataTypeContext.lengthTwoDimension() != null) { + List decimalLiterals = + dimensionDataTypeContext.lengthTwoDimension().decimalLiteral(); + length = parseLength(decimalLiterals.get(0).getText()); + scale = Integer.valueOf(decimalLiterals.get(1).getText()); + } + + if (dimensionDataTypeContext.lengthTwoOptionalDimension() != null) { + List decimalLiterals = + dimensionDataTypeContext.lengthTwoOptionalDimension().decimalLiteral(); + if (decimalLiterals.get(0).REAL_LITERAL() != null) { + String[] digits = DOT.split(decimalLiterals.get(0).getText()); + if (Strings.isNullOrEmpty(digits[0]) || Integer.valueOf(digits[0]) == 0) { + // Set default value 10 according mysql engine + length = 10; + } else { + length = parseLength(digits[0]); + } + } else { + length = parseLength(decimalLiterals.get(0).getText()); + } + + if (decimalLiterals.size() > 1) { + scale = Integer.valueOf(decimalLiterals.get(1).getText()); + } + } + if (length != null) { + columnEditor.length(length); + } + if (scale != null) { + columnEditor.scale(scale); + } + } else if (dataTypeContext instanceof MySqlParser.CollectionDataTypeContext) { + MySqlParser.CollectionDataTypeContext collectionDataTypeContext = + (MySqlParser.CollectionDataTypeContext) dataTypeContext; + if (collectionDataTypeContext.charsetName() != null) { + charsetName = collectionDataTypeContext.charsetName().getText(); + } + + if (dataType.name().equalsIgnoreCase("SET")) { + // After DBZ-132, it will always be comma separated + int optionsSize = + collectionDataTypeContext.collectionOptions().collectionOption().size(); + columnEditor.length( + Math.max(0, optionsSize * 2 - 1)); // number of options + number of commas + } else { + columnEditor.length(1); + } + } + + String dataTypeName = dataType.name().toUpperCase(); + + if (dataTypeName.equals("ENUM") || dataTypeName.equals("SET")) { + // type expression has to be set, because the value converter needs to know the enum or + // set options + MySqlParser.CollectionDataTypeContext collectionDataTypeContext = + (MySqlParser.CollectionDataTypeContext) dataTypeContext; + + List collectionOptions = + collectionDataTypeContext.collectionOptions().collectionOption().stream() + .map(AntlrDdlParser::getText) + .collect(Collectors.toList()); + + columnEditor.type(dataTypeName); + columnEditor.enumValues(collectionOptions); + } else if (dataTypeName.equals("SERIAL")) { + // SERIAL is an alias for BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE + columnEditor.type("BIGINT UNSIGNED"); + serialColumn(); + } else { + columnEditor.type(dataTypeName); + } + + int jdbcDataType = dataType.jdbcType(); + columnEditor.jdbcType(jdbcDataType); + + if (columnEditor.length() == -1) { + columnEditor.length((int) dataType.length()); + } + if (!columnEditor.scale().isPresent() && dataType.scale() != Column.UNSET_INT_VALUE) { + columnEditor.scale(dataType.scale()); + } + if (Types.NCHAR == jdbcDataType || Types.NVARCHAR == jdbcDataType) { + // NCHAR and NVARCHAR columns always uses utf8 as charset + columnEditor.charsetName("utf8"); + } else { + columnEditor.charsetName(charsetName); + } + } + + private Integer parseLength(String lengthStr) { + Long length = Long.parseLong(lengthStr); + if (length > Integer.MAX_VALUE) { + LOGGER.warn( + "The length '{}' of the column `{}` is too large to be supported, truncating it to '{}'", + length, + columnEditor.name(), + Integer.MAX_VALUE); + length = (long) Integer.MAX_VALUE; + } + return length.intValue(); + } + + private void serialColumn() { + if (optionalColumn.get() == null) { + optionalColumn.set(Boolean.FALSE); + } + uniqueColumn = true; + columnEditor.autoIncremented(true); + columnEditor.generated(true); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/parser/CustomMySqlAntlrDdlParser.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/parser/CustomMySqlAntlrDdlParser.java new file mode 100644 index 00000000000..bf6564ded28 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/parser/CustomMySqlAntlrDdlParser.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.source.parser; + +import org.apache.flink.cdc.common.event.SchemaChangeEvent; + +import io.debezium.antlr.AntlrDdlParserListener; +import io.debezium.antlr.DataTypeResolver; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParser; + +import java.sql.Types; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +/** A ddl parser that will use custom listener. */ +public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser { + + private final LinkedList parsedEvents; + + public CustomMySqlAntlrDdlParser() { + super(); + this.parsedEvents = new LinkedList<>(); + } + + // Overriding this method because the BIT type requires default length dimension of 1. + // Remove it when debezium fixed this issue. + @Override + protected DataTypeResolver initializeDataTypeResolver() { + DataTypeResolver.Builder dataTypeResolverBuilder = new DataTypeResolver.Builder(); + + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.StringDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.CHAR), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.CHAR, MySqlParser.VARYING), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.VARCHAR), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.TINYTEXT), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.TEXT), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.MEDIUMTEXT), + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.LONGTEXT), + new DataTypeResolver.DataTypeEntry(Types.NCHAR, MySqlParser.NCHAR), + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, MySqlParser.NCHAR, MySqlParser.VARYING), + new DataTypeResolver.DataTypeEntry(Types.NVARCHAR, MySqlParser.NVARCHAR), + new DataTypeResolver.DataTypeEntry( + Types.CHAR, MySqlParser.CHAR, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.VARCHAR, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.TINYTEXT, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.TEXT, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.MEDIUMTEXT, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.LONGTEXT, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.NCHAR, MySqlParser.NCHAR, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, MySqlParser.NVARCHAR, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.CHARACTER), + new DataTypeResolver.DataTypeEntry( + Types.VARCHAR, MySqlParser.CHARACTER, MySqlParser.VARYING))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.NationalStringDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, MySqlParser.NATIONAL, MySqlParser.VARCHAR) + .setSuffixTokens(MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.NCHAR, MySqlParser.NATIONAL, MySqlParser.CHARACTER) + .setSuffixTokens(MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, MySqlParser.NCHAR, MySqlParser.VARCHAR) + .setSuffixTokens(MySqlParser.BINARY))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.NationalVaryingStringDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, + MySqlParser.NATIONAL, + MySqlParser.CHAR, + MySqlParser.VARYING), + new DataTypeResolver.DataTypeEntry( + Types.NVARCHAR, + MySqlParser.NATIONAL, + MySqlParser.CHARACTER, + MySqlParser.VARYING))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.DimensionDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.TINYINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.INT1) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.SMALLINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.INT2) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.MEDIUMINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT3) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.MIDDLEINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INTEGER) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT4) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.BIGINT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.INT8) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.REAL, MySqlParser.REAL) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.DOUBLE, MySqlParser.DOUBLE) + .setSuffixTokens( + MySqlParser.PRECISION, + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.DOUBLE, MySqlParser.FLOAT8) + .setSuffixTokens( + MySqlParser.PRECISION, + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.FLOAT, MySqlParser.FLOAT) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.FLOAT, MySqlParser.FLOAT4) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL), + new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.DECIMAL) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL) + .setDefaultLengthScaleDimension(10, 0), + new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.DEC) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL) + .setDefaultLengthScaleDimension(10, 0), + new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.FIXED) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL) + .setDefaultLengthScaleDimension(10, 0), + new DataTypeResolver.DataTypeEntry(Types.NUMERIC, MySqlParser.NUMERIC) + .setSuffixTokens( + MySqlParser.SIGNED, + MySqlParser.UNSIGNED, + MySqlParser.ZEROFILL) + .setDefaultLengthScaleDimension(10, 0), + new DataTypeResolver.DataTypeEntry(Types.BIT, MySqlParser.BIT) + .setDefaultLengthDimension(1), + new DataTypeResolver.DataTypeEntry(Types.TIME, MySqlParser.TIME), + new DataTypeResolver.DataTypeEntry( + Types.TIMESTAMP_WITH_TIMEZONE, MySqlParser.TIMESTAMP), + new DataTypeResolver.DataTypeEntry(Types.TIMESTAMP, MySqlParser.DATETIME), + new DataTypeResolver.DataTypeEntry(Types.BINARY, MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry(Types.VARBINARY, MySqlParser.VARBINARY), + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.BLOB), + new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.YEAR))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.SimpleDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.DATE, MySqlParser.DATE), + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.TINYBLOB), + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.MEDIUMBLOB), + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.LONGBLOB), + new DataTypeResolver.DataTypeEntry(Types.BOOLEAN, MySqlParser.BOOL), + new DataTypeResolver.DataTypeEntry(Types.BOOLEAN, MySqlParser.BOOLEAN), + new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.SERIAL))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.CollectionDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.ENUM) + .setSuffixTokens(MySqlParser.BINARY), + new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.SET) + .setSuffixTokens(MySqlParser.BINARY))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.SpatialDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry( + Types.OTHER, MySqlParser.GEOMETRYCOLLECTION), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.GEOMCOLLECTION), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.LINESTRING), + new DataTypeResolver.DataTypeEntry( + Types.OTHER, MySqlParser.MULTILINESTRING), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.MULTIPOINT), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.MULTIPOLYGON), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.POINT), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.POLYGON), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.JSON), + new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.GEOMETRY))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.LongVarbinaryDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.LONG) + .setSuffixTokens(MySqlParser.VARBINARY))); + dataTypeResolverBuilder.registerDataTypes( + MySqlParser.LongVarcharDataTypeContext.class.getCanonicalName(), + Arrays.asList( + new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.LONG) + .setSuffixTokens(MySqlParser.VARCHAR))); + + return dataTypeResolverBuilder.build(); + } + + @Override + protected AntlrDdlParserListener createParseTreeWalkerListener() { + return new CustomMySqlAntlrDdlParserListener(this, parsedEvents); + } + + public List getAndClearParsedEvents() { + List result = new ArrayList<>(parsedEvents); + parsedEvents.clear(); + return result; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/parser/CustomMySqlAntlrDdlParserListener.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/parser/CustomMySqlAntlrDdlParserListener.java new file mode 100644 index 00000000000..076ac06cd76 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/parser/CustomMySqlAntlrDdlParserListener.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.source.parser; + +import org.apache.flink.cdc.common.event.SchemaChangeEvent; + +import io.debezium.antlr.AntlrDdlParserListener; +import io.debezium.antlr.ProxyParseTreeListenerUtil; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.connector.mysql.antlr.listener.AlterViewParserListener; +import io.debezium.connector.mysql.antlr.listener.CreateAndAlterDatabaseParserListener; +import io.debezium.connector.mysql.antlr.listener.CreateTableParserListener; +import io.debezium.connector.mysql.antlr.listener.CreateUniqueIndexParserListener; +import io.debezium.connector.mysql.antlr.listener.CreateViewParserListener; +import io.debezium.connector.mysql.antlr.listener.DropDatabaseParserListener; +import io.debezium.connector.mysql.antlr.listener.DropTableParserListener; +import io.debezium.connector.mysql.antlr.listener.DropViewParserListener; +import io.debezium.connector.mysql.antlr.listener.MySqlAntlrDdlParserListener; +import io.debezium.connector.mysql.antlr.listener.RenameTableParserListener; +import io.debezium.connector.mysql.antlr.listener.SetStatementParserListener; +import io.debezium.connector.mysql.antlr.listener.TruncateTableParserListener; +import io.debezium.connector.mysql.antlr.listener.UseStatementParserListener; +import io.debezium.ddl.parser.mysql.generated.MySqlParser; +import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener; +import io.debezium.text.ParsingException; +import org.antlr.v4.runtime.ParserRuleContext; +import org.antlr.v4.runtime.tree.ErrorNode; +import org.antlr.v4.runtime.tree.ParseTreeListener; +import org.antlr.v4.runtime.tree.TerminalNode; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * Copied from {@link MySqlAntlrDdlParserListener} in Debezium 1.9.8.final. + * + *

This listener's constructor will use some modified listener. + */ +public class CustomMySqlAntlrDdlParserListener extends MySqlParserBaseListener + implements AntlrDdlParserListener { + + /** Collection of listeners for delegation of events. */ + private final List listeners = new CopyOnWriteArrayList<>(); + + /** Flag for skipping phase. */ + private boolean skipNodes; + + /** + * Count of skipped nodes. Each enter event during skipping phase will increase the counter and + * each exit event will decrease it. When counter will be decreased to 0, the skipping phase + * will end. + */ + private int skippedNodesCount = 0; + + /** Collection of catched exceptions. */ + private final Collection errors = new ArrayList<>(); + + public CustomMySqlAntlrDdlParserListener( + MySqlAntlrDdlParser parser, LinkedList parsedEvents) { + // initialize listeners + listeners.add(new CreateAndAlterDatabaseParserListener(parser)); + listeners.add(new DropDatabaseParserListener(parser)); + listeners.add(new CreateTableParserListener(parser, listeners)); + listeners.add(new CustomAlterTableParserListener(parser, listeners, parsedEvents)); + listeners.add(new DropTableParserListener(parser)); + listeners.add(new RenameTableParserListener(parser)); + listeners.add(new TruncateTableParserListener(parser)); + listeners.add(new CreateViewParserListener(parser, listeners)); + listeners.add(new AlterViewParserListener(parser, listeners)); + listeners.add(new DropViewParserListener(parser)); + listeners.add(new CreateUniqueIndexParserListener(parser)); + listeners.add(new SetStatementParserListener(parser)); + listeners.add(new UseStatementParserListener(parser)); + } + + /** + * Returns all caught errors during tree walk. + * + * @return list of Parsing exceptions + */ + @Override + public Collection getErrors() { + return errors; + } + + @Override + public void enterEveryRule(ParserRuleContext ctx) { + if (skipNodes) { + skippedNodesCount++; + } else { + ProxyParseTreeListenerUtil.delegateEnterRule(ctx, listeners, errors); + } + } + + @Override + public void exitEveryRule(ParserRuleContext ctx) { + if (skipNodes) { + if (skippedNodesCount == 0) { + // back in the node where skipping started + skipNodes = false; + } else { + // going up in a tree, means decreasing a number of skipped nodes + skippedNodesCount--; + } + } else { + ProxyParseTreeListenerUtil.delegateExitRule(ctx, listeners, errors); + } + } + + @Override + public void visitErrorNode(ErrorNode node) { + ProxyParseTreeListenerUtil.visitErrorNode(node, listeners, errors); + } + + @Override + public void visitTerminal(TerminalNode node) { + ProxyParseTreeListenerUtil.visitTerminal(node, listeners, errors); + } + + @Override + public void enterRoutineBody(MySqlParser.RoutineBodyContext ctx) { + // this is a grammar rule for BEGIN ... END part of statements. Skip it. + skipNodes = true; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/resources/ddl/metrics.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/resources/ddl/metrics.sql new file mode 100644 index 00000000000..50823a1f032 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/resources/ddl/metrics.sql @@ -0,0 +1,36 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: metrics +-- ---------------------------------------------------------------------------------------------------------------- + +CREATE TABLE users ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255), + age integer +); +ALTER TABLE users AUTO_INCREMENT = 101; + +INSERT INTO users +VALUES (default,"Tom",3), + (default,"Jack",5), + (default,"Allen",10), + (default,"Andrew",13), + (default,"Arnold",15), + (default,"Claud",19), + (default,"Howard",37), + (default,"Jacob",46), + (default,"Lionel",58); \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBRecordEmitter.java index af102c146ab..deabf6469b1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBRecordEmitter.java @@ -26,6 +26,7 @@ import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader; import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter; import org.apache.flink.cdc.connectors.mongodb.source.offset.ChangeStreamOffset; +import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.connector.base.source.reader.RecordEmitter; @@ -71,10 +72,14 @@ protected void processElement( } else if (isHeartbeatEvent(element)) { if (splitState.isStreamSplitState()) { updatePositionForStreamSplit(element, splitState); + sourceReaderMetrics.updateLastReceivedEventTime( + MongoRecordUtils.getMessageTimestamp(element)); } } else if (isDataChangeRecord(element)) { if (splitState.isStreamSplitState()) { updatePositionForStreamSplit(element, splitState); + sourceReaderMetrics.updateLastReceivedEventTime( + MongoRecordUtils.getMessageTimestamp(element)); } reportMetrics(element); emitElement(element, output); @@ -94,6 +99,17 @@ private void updatePositionForStreamSplit(SourceRecord element, SourceSplitState splitState.asStreamSplitState().setStartingOffset(offset); } + @Override + protected void emitElement(SourceRecord element, SourceOutput output) throws Exception { + sourceReaderMetrics.markRecord(); + sourceReaderMetrics.updateRecordCounters(element); + + outputCollector.output = output; + // use mongo timestamp as the current message timestamp + outputCollector.currentMessageTimestamp = MongoRecordUtils.getMessageTimestamp(element); + debeziumDeserializationSchema.deserialize(element, outputCollector); + } + @Override protected void reportMetrics(SourceRecord element) { Long messageTimestamp = getMessageTimestamp(element); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java index af220061a17..520511c061b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -321,14 +321,16 @@ public PostgresSourceEnumerator createEnumerator( remainingTables, isTableIdCaseSensitive, dataSourceDialect, - offsetFactory); + offsetFactory, + enumContext); } catch (Exception e) { throw new FlinkRuntimeException( "Failed to discover captured tables for enumerator", e); } } else { splitAssigner = - new StreamSplitAssigner(sourceConfig, dataSourceDialect, offsetFactory); + new StreamSplitAssigner( + sourceConfig, dataSourceDialect, offsetFactory, enumContext); } return new PostgresSourceEnumerator( @@ -352,14 +354,16 @@ public PostgresSourceEnumerator restoreEnumerator( enumContext.currentParallelism(), (HybridPendingSplitsState) checkpoint, dataSourceDialect, - offsetFactory); + offsetFactory, + enumContext); } else if (checkpoint instanceof StreamPendingSplitsState) { splitAssigner = new StreamSplitAssigner( sourceConfig, (StreamPendingSplitsState) checkpoint, dataSourceDialect, - offsetFactory); + offsetFactory, + enumContext); } else { throw new UnsupportedOperationException( "Unsupported restored PendingSplitsState: " + checkpoint); @@ -385,7 +389,6 @@ public PostgresSourceReader createReader(SourceReaderContext readerContext) final SourceReaderMetrics sourceReaderMetrics = new SourceReaderMetrics(readerContext.metricGroup()); - sourceReaderMetrics.registerMetrics(); IncrementalSourceReaderContext incrementalSourceReaderContext = new IncrementalSourceReaderContext(readerContext); Supplier> splitReaderSupplier =