Skip to content

Commit

Permalink
DBZ-7700 Align snapshot modes
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale committed Apr 8, 2024
1 parent f6e06c2 commit 992dcc3
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,51 +261,65 @@ public static ConfigDef configDef() {
public enum SnapshotMode implements EnumeratedValue {

/**
* Perform a snapshot when it is needed.
* Performs a snapshot of data and schema upon each connector start.
*/
ALWAYS("always"),

/**
* Perform a snapshot of data and schema upon initial startup of a connector.
*/
WHEN_NEEDED("when_needed", true),
INITIAL("initial"),

/**
* Perform a snapshot only upon initial startup of a connector.
* Perform a snapshot of data and schema upon initial startup of a connector but does not transition to streaming.
*/
INITIAL("initial", true),
INITIAL_ONLY("initial_only"),

/**
* Perform a snapshot of only the database schemas (without data) and then begin
* reading the binlog. This should be used with care, but it is very useful when
* the change event consumers need only the changes from the point in time the
* snapshot is made (and doesn't care about any state or changes prior to this
* point).
* Perform a snapshot of the schema but no data upon initial startup of a connector.
* @deprecated to be removed in Debezium 3.0, replaced by {{@link #NO_DATA}}
*/
SCHEMA_ONLY("schema_only", false),
SCHEMA_ONLY("schema_only"),

/**
* Never perform a snapshot and only read the binlog. This assumes the binlog
* contains all the history of those databases and tables that will be captured.
* Perform a snapshot of the schema but no data upon initial startup of a connector.
*/
NO_DATA("no_data"),

/**
* Perform a snapshot of only the database schemas (without data) and then begin reading the redo log at the current redo log position.
* This can be used for recovery only if the connector has existing offsets and the schema.history.internal.kafka.topic does not exist (deleted).
* This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped,
* otherwise some events during the gap may be processed with an incorrect schema and corrupted.
*/
RECOVERY("recovery"),

/**
* Perform a snapshot when it is needed.
*/
NEVER("never", false);
WHEN_NEEDED("when_needed"),

/**
* Allows over snapshots by setting connectors properties prefixed with 'snapshot.mode.configuration.based'.
*/
CONFIGURATION_BASED("configuration_based"),

/**
* Inject a custom snapshotter, which allows for more control over snapshots.
*/
CUSTOM("custom");

private final String value;
private final boolean includeData;

SnapshotMode(String value, boolean includeData) {
SnapshotMode(String value) {
this.value = value;
this.includeData = includeData;
}

@Override
public String getValue() {
return value;
}

/**
* Whether this snapshotting mode should include the actual data or just the
* schema of captured tables.
*/
public boolean includeData() {
return includeData;
}

/**
* Determine if the supplied value is one of the predefined options.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.processors.PostProcessorRegistryServiceProvider;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
Expand Down Expand Up @@ -76,10 +74,20 @@ protected ChangeEventSourceCoordinator<As400Partition, As400OffsetContext> start
final CdcSourceTaskContext ctx = new CdcSourceTaskContext(connectorConfig.getContextName(),
connectorConfig.getLogicalName(), connectorConfig.getCustomMetricTags(), schema::tableIds);

final Offsets<As400Partition, As400OffsetContext> previousOffsetPartition = getPreviousOffsets(
new As400Partition.Provider(connectorConfig), new As400OffsetContext.Loader(connectorConfig));
As400OffsetContext previousOffset = previousOffsetPartition.getTheOnlyOffset();
if (previousOffset == null) {
LOGGER.info("previous offsets not found creating from config");
previousOffset = new As400OffsetContext(connectorConfig);
}

// Manual Bean Registration
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config);
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig);
connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, jdbcConnectionFactory.newConnection());
connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema);
connectorConfig.getBeanRegistry().add(StandardBeanNames.OFFSETS, previousOffsetPartition);

// Service providers
registerServiceProviders(connectorConfig.getServiceRegistry());
Expand All @@ -91,14 +99,6 @@ protected ChangeEventSourceCoordinator<As400Partition, As400OffsetContext> start

final ErrorHandler errorHandler = new ErrorHandler(As400RpcConnector.class, connectorConfig, queue, null);

final Offsets<As400Partition, As400OffsetContext> previousOffsetPartition = getPreviousOffsets(
new As400Partition.Provider(connectorConfig), new As400OffsetContext.Loader(connectorConfig));
As400OffsetContext previousOffset = previousOffsetPartition.getTheOnlyOffset();
if (previousOffset == null) {
LOGGER.info("previous offsets not found creating from config");
previousOffset = new As400OffsetContext(connectorConfig);
}

final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);

final As400EventMetadataProvider metadataProvider = new As400EventMetadataProvider();
Expand All @@ -116,8 +116,11 @@ protected ChangeEventSourceCoordinator<As400Partition, As400OffsetContext> start
final As400RpcConnection rpcConnection = new As400RpcConnection(connectorConfig, streamingMetrics,
shortIncludes);

validateAndLoadSchemaHistory(connectorConfig, rpcConnection::validateLogPosition, previousOffsetPartition, schema,
snapshotterService.getSnapshotter());

As400ConnectorConfig snapshotConnectorConfig = connectorConfig;
final Set<String> additionalTables = additionalTablesInConfigTables(connectorConfig, previousOffset, newConfig);
final Set<String> additionalTables = additionalTablesInConfigTables(previousOffset, newConfig);
if (!additionalTables.isEmpty()) {
final String newIncludes = String.join(",", additionalTables);
LOGGER.info("found new tables to stream {}", newIncludes);
Expand Down Expand Up @@ -160,8 +163,7 @@ protected ChangeEventSourceCoordinator<As400Partition, As400OffsetContext> start
return coordinator;
}

private Set<String> additionalTablesInConfigTables(final As400ConnectorConfig connectorConfig,
As400OffsetContext previousOffset, As400ConnectorConfig newConfig) {
private Set<String> additionalTablesInConfigTables(As400OffsetContext previousOffset, As400ConnectorConfig newConfig) {
final String newInclude = newConfig.tableIncludeList();
final String oldInclude = previousOffset.getIncludeTables();
LOGGER.info("previous includes {} , new includes {}", oldInclude, newInclude);
Expand Down Expand Up @@ -193,11 +195,4 @@ protected void doStop() {
protected Iterable<Field> getAllConfigurationFields() {
return As400ConnectorConfig.ALL_FIELDS;
}

// TODO remove when DBZ-7700 is implemented
@Override
protected void registerServiceProviders(ServiceRegistry serviceRegistry) {

serviceRegistry.registerServiceProvider(new PostProcessorRegistryServiceProvider());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.ibm.as400.access.SecureAS400;
import com.ibm.as400.access.SocketProperties;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.db2as400.metrics.As400StreamingChangeEventSourceMetrics;
import io.debezium.ibmi.db2.journal.retrieve.Connect;
import io.debezium.ibmi.db2.journal.retrieve.FileFilter;
Expand All @@ -32,6 +33,7 @@
import io.debezium.ibmi.db2.journal.retrieve.rjne0200.EntryHeader;
import io.debezium.ibmi.db2.journal.retrieve.rnrn0200.DetailedJournalReceiver;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.pipeline.spi.OffsetContext;

public class As400RpcConnection implements AutoCloseable, Connect<AS400, IOException> {
private static Logger log = LoggerFactory.getLogger(As400RpcConnection.class);
Expand Down Expand Up @@ -185,6 +187,12 @@ private void logOffsets(JournalProcessedPosition position, boolean success) thro
}
}

public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) {

return true; // TODO add check to verify the min available position in the log

}

public interface BlockingReceiverConsumer {
void accept(BigInteger offset, RetrieveJournal r, EntryHeader eheader) throws RpcException, InterruptedException, IOException, SQLNonTransientConnectionException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.snapshot.Snapshotter;
import io.debezium.util.Clock;

public class As400SnapshotChangeEventSource
Expand Down Expand Up @@ -123,7 +124,7 @@ protected Set<TableId> getAllTableIds(RelationalSnapshotContext<As400Partition,
protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext<As400Partition, As400OffsetContext> snapshotContext)
throws Exception {
// TODO lock tables
// TODO lock tables snapshotterService.getSnapshotLock().tableLockingStatement
}

@Override
Expand Down Expand Up @@ -203,36 +204,48 @@ protected SchemaChangeEvent getCreateTableEvent(
protected Optional<String> getSnapshotSelect(
RelationalSnapshotContext<As400Partition, As400OffsetContext> snapshotContext, TableId tableId,
List<String> columns) {
return Optional.of(String.format("SELECT * FROM %s.%s", tableId.schema(), tableId.table()));

String fullTableName = String.format("%s.%s", tableId.schema(), tableId.table());
return snapshotterService.getSnapshotQuery().snapshotQuery(fullTableName, columns);
}

@Override
public SnapshottingTask getSnapshottingTask(As400Partition partition, As400OffsetContext previousOffset) {

final Snapshotter snapshotter = snapshotterService.getSnapshotter();
final List<String> dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted();
final Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable()
.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));

// found a previous offset and the earlier snapshot has completed
if (previousOffset != null && previousOffset.isSnapshotCompplete()) {
// when control tables in place
if (!previousOffset.hasNewTables()) {
boolean offsetExists = previousOffset != null;
boolean snapshotInProgress = false;

if (offsetExists) {
snapshotInProgress = previousOffset.isSnapshotRunning();
}

if (offsetExists && !previousOffset.isSnapshotRunning()) {
if (!previousOffset.hasNewTables()) { // This is a special case for IBMi
log.info(
"A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
return new SnapshottingTask(false, false, dataCollectionsToBeSnapshotted,
snapshotSelectOverridesByTable, false);
}
log.info("A previous offset indicating a completed snapshot has been found.");
}

log.info("No previous offset has been found");
if (this.connectorConfig.getSnapshotMode().includeData()) {
log.info("According to the connector configuration both schema and data will be snapshotted");
boolean shouldSnapshotSchema = snapshotter.shouldSnapshotSchema(offsetExists, snapshotInProgress);
boolean shouldSnapshotData = snapshotter.shouldSnapshotData(offsetExists, snapshotInProgress);

if (shouldSnapshotData && shouldSnapshotSchema) {
log.info("According to the connector configuration both schema and data will be snapshot.");
}
else {
log.info("According to the connector configuration only schema will be snapshotted");
else if (shouldSnapshotSchema) {
log.info("According to the connector configuration only schema will be snapshot.");
}

return new SnapshottingTask(this.connectorConfig.getSnapshotMode().includeData(),
this.connectorConfig.getSnapshotMode().includeData(), dataCollectionsToBeSnapshotted,
return new SnapshottingTask(shouldSnapshotSchema,
shouldSnapshotData, dataCollectionsToBeSnapshotted,
snapshotSelectOverridesByTable, false);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.db2as400.snapshot.query;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.snapshot.spi.SnapshotQuery;

public class SelectAllSnapshotQuery implements SnapshotQuery {

@Override
public String name() {
return CommonConnectorConfig.SnapshotQueryMode.SELECT_ALL.getValue();
}

@Override
public void configure(Map<String, ?> properties) {

}

@Override
public Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns) {

return Optional.of(String.format("SELECT * FROM %s", tableId));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.debezium.connector.db2as400.snapshot.query.SelectAllSnapshotQuery

0 comments on commit 992dcc3

Please sign in to comment.