Skip to content

Commit

Permalink
DBZ-7473 Defer transaction capture until the first DML event occurs
Browse files Browse the repository at this point in the history
Do not handle start event from V$LOGMNR_CONTENTS view if CLOB/BLOB support is disabled and memory buffer is used.
  • Loading branch information
jchipmunk authored and Naros committed Mar 4, 2024
1 parent cd1b8ce commit edac20f
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,13 @@ private static String getOperationCodePredicate(OracleConnectorConfig connectorC
operationInClause.withValues(OPERATION_CODES_LOB);
}
else {
operationInClause.withValues(OPERATION_CODES_NO_LOB);
final List<Integer> operationCodes = new ArrayList<>(OPERATION_CODES_NO_LOB);
// The transaction start event needs to be handled when a persistent buffer (Infinispan) is used
// because it is needed to reset the event id counter when re-mining transaction events.
if (connectorConfig.getLogMiningBufferType() == OracleConnectorConfig.LogMiningBufferType.MEMORY) {
operationCodes.removeIf(operationCode -> operationCode == 6);
}
operationInClause.withValues(operationCodes);
}
predicate.append("(").append(operationInClause.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,50 +475,45 @@ protected void handleCommit(OraclePartition partition, LogMinerEventRow row) thr

final T transaction = getAndRemoveTransactionFromCache(transactionId);
if (transaction == null) {
LOGGER.debug("Transaction {} not found in cache, no events to commit.", transactionId);
handleCommitNotFoundInBuffer(row);
LOGGER.debug("Transaction {} not found, commit skipped.", transactionId);
return;
}

// Calculate the smallest SCN that remains in the transaction cache
final Optional<T> oldestTransaction = getOldestTransactionInCache();
final Scn smallestScn;
if (oldestTransaction.isPresent()) {
smallestScn = oldestTransaction.get().getStartScn();
metrics.setOldestScnDetails(smallestScn, oldestTransaction.get().getChangeTime());
}
else {
smallestScn = Scn.NULL;
metrics.setOldestScnDetails(Scn.valueOf(-1), null);
}

final Scn smallestScn = calculateSmallestScn();
final Scn commitScn = row.getScn();
if (offsetContext.getCommitScn().hasCommitAlreadyBeenHandled(row)) {
if (transaction.getNumberOfEvents() > 0) {
final Scn lastCommittedScn = offsetContext.getCommitScn().getCommitScnForRedoThread(row.getThread());
LOGGER.debug("Transaction {} has already been processed. "
+ "Offset Commit SCN {}, Transaction Commit SCN {}, Last Seen Commit SCN {}.",
transactionId, offsetContext.getCommitScn(), commitScn, lastCommittedScn);
if (transaction != null) {
if (transaction.getNumberOfEvents() > 0) {
final Scn lastCommittedScn = offsetContext.getCommitScn().getCommitScnForRedoThread(row.getThread());
LOGGER.debug("Transaction {} has already been processed. "
+ "Offset Commit SCN {}, Transaction Commit SCN {}, Last Seen Commit SCN {}.",
transactionId, offsetContext.getCommitScn(), commitScn, lastCommittedScn);
}
cleanupAfterTransactionRemovedFromCache(transaction, false);
metrics.setActiveTransactionCount(getTransactionCache().size());
}
cleanupAfterTransactionRemovedFromCache(transaction, false);
metrics.setActiveTransactionCount(getTransactionCache().size());
return;
}

counters.commitCount++;

int numEvents = getTransactionEventCount(transaction);
int numEvents = (transaction == null) ? 0 : getTransactionEventCount(transaction);
LOGGER.debug("Committing transaction {} with {} events (scn: {}, oldest buffer scn: {}): {}",
transactionId, numEvents, row.getScn(), smallestScn, row);

final ZoneOffset databaseOffset = metrics.getDatabaseOffset();

final boolean skipExcludedUserName = isTransactionUserExcluded(transaction);
TransactionCommitConsumer.Handler<LogMinerEvent> delegate = new TransactionCommitConsumer.Handler<>() {
private int numEvents = getTransactionEventCount(transaction);
// When a COMMIT is received, regardless of the number of events it has, it still
// must be recorded in the commit scn for the node to guarantee updates to the
// offsets. This must be done prior to dispatching the transaction-commit or the
// heartbeat event that follows commit dispatch.
offsetContext.getCommitScn().recordCommit(row);

@Override
public void accept(LogMinerEvent event, long eventsProcessed) throws InterruptedException {
Instant start = Instant.now();
boolean dispatchTransactionCommittedEvent = false;
if (numEvents > 0) {
final boolean skipExcludedUserName = isTransactionUserExcluded(transaction);
dispatchTransactionCommittedEvent = !skipExcludedUserName;
final ZoneOffset databaseOffset = metrics.getDatabaseOffset();
TransactionCommitConsumer.Handler<LogMinerEvent> delegate = (event, eventsProcessed) -> {
// Update SCN in offset context only if processed SCN less than SCN of other transactions
if (smallestScn.isNull() || commitScn.compareTo(smallestScn) < 0) {
offsetContext.setScn(event.getScn());
Expand Down Expand Up @@ -571,20 +566,9 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted

// Clear redo SQL
offsetContext.setRedoSql(null);

}
};

// When a COMMIT is received, regardless of the number of events it has, it still
// must be recorded in the commit scn for the node to guarantee updates to the
// offsets. This must be done prior to dispatching the transaction-commit or the
// heartbeat event that follows commit dispatch.
offsetContext.getCommitScn().recordCommit(row);

Instant start = Instant.now();
int dispatchedEventCount = 0;
if (numEvents > 0) {
};
try (TransactionCommitConsumer commitConsumer = new TransactionCommitConsumer(delegate, connectorConfig, schema)) {
int dispatchedEventCount = 0;
final Iterator<LogMinerEvent> iterator = getTransactionEventIterator(transaction);
while (iterator.hasNext()) {
if (!context.isRunning()) {
Expand All @@ -601,7 +585,7 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
offsetContext.setEventScn(commitScn);
offsetContext.setRsId(row.getRsId());

if (getTransactionEventCount(transaction) > 0 && !skipExcludedUserName) {
if (dispatchTransactionCommittedEvent) {
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, transaction.getChangeTime());
}
else {
Expand All @@ -610,16 +594,37 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted

metrics.calculateLagFromSource(row.getChangeTime());

finalizeTransactionCommit(transactionId, commitScn);
cleanupAfterTransactionRemovedFromCache(transaction, false);
if (transaction != null) {
finalizeTransactionCommit(transactionId, commitScn);
cleanupAfterTransactionRemovedFromCache(transaction, false);
metrics.setActiveTransactionCount(getTransactionCache().size());
}

metrics.incrementCommittedTransactionCount();
metrics.setActiveTransactionCount(getTransactionCache().size());
metrics.setCommitScn(commitScn);
metrics.setOffsetScn(offsetContext.getScn());
metrics.setLastCommitDuration(Duration.between(start, Instant.now()));
}

/**
* Calculate the smallest SCN that remains in the transaction cache.
*
* @return the smallest SCN
*/
private Scn calculateSmallestScn() {
final Optional<T> oldestTransaction = getOldestTransactionInCache();
final Scn smallestScn;
if (oldestTransaction.isPresent()) {
smallestScn = oldestTransaction.get().getStartScn();
metrics.setOldestScnDetails(smallestScn, oldestTransaction.get().getChangeTime());
}
else {
smallestScn = Scn.NULL;
metrics.setOldestScnDetails(Scn.valueOf(-1), null);
}
return smallestScn;
}

/**
* Allow for post-processing of a transaction commit in the stream that was not found in the
* transaction buffer, perhaps because it aged out due to retention policies.
Expand Down Expand Up @@ -722,14 +727,14 @@ protected void handleRollback(LogMinerEventRow row) {
LOGGER.debug("Transaction {} was rolled back.", row.getTransactionId());
finalizeTransactionRollback(row.getTransactionId(), row.getScn());
metrics.setActiveTransactionCount(getTransactionCache().size());
metrics.incrementRolledBackTransactionCount();
metrics.addRolledBackTransactionId(row.getTransactionId());
counters.rollbackCount++;
}
else {
LOGGER.debug("Could not rollback transaction {}, was not found in cache.", row.getTransactionId());
LOGGER.debug("Transaction {} not found in cache, no events to rollback.", row.getTransactionId());
handleRollbackNotFoundInBuffer(row);
}
metrics.incrementRolledBackTransactionCount();
metrics.addRolledBackTransactionId(row.getTransactionId());
counters.rollbackCount++;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static io.debezium.config.CommonConnectorConfig.SIGNAL_DATA_COLLECTION;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOB_ENABLED;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_TYPE;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_USERNAME_EXCLUDE_LIST;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_USERNAME_INCLUDE_LIST;
Expand Down Expand Up @@ -61,7 +62,8 @@ public class LogMinerQueryBuilderTest {
private static final String PDB_PREDICATE = "SRC_CON_NAME = '${pdbName}'";

private static final String OPERATION_CODES_LOB_ENABLED = "1,2,3,6,7,9,10,11,29,34,36,68,70,71,255";
private static final String OPERATION_CODES_LOB_DISABLED = "1,2,3,6,7,34,36,255";
private static final String OPERATION_CODES_LOB_DISABLED = "1,2,3,7,34,36,255";
private static final String OPERATION_CODES_LOB_DISABLED_AND_PERSISTENT_BUFFER = "1,2,3,6,7,34,36,255";

private static final String OPERATION_CODES_PREDICATE = "(OPERATION_CODE IN (${operationCodes})${operationDdl})";

Expand Down Expand Up @@ -96,6 +98,24 @@ public void testLogMinerQueryWithLobDisabled() {
assertThat(result).isEqualTo(getQueryFromTemplate(connectorConfig));
}

@Test
@FixFor("DBZ-7473")
public void testLogMinerQueryWithLobDisabledAndPersistentBuffer() {
Configuration config = TestHelper.defaultConfig()
.with(LOG_MINING_BUFFER_TYPE, OracleConnectorConfig.LogMiningBufferType.INFINISPAN_EMBEDDED)
.build();
OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);

String result = LogMinerQueryBuilder.build(connectorConfig);
assertThat(result).isEqualTo(getQueryFromTemplate(connectorConfig));

config = TestHelper.defaultConfig().with(PDB_NAME, "").build();
connectorConfig = new OracleConnectorConfig(config);

result = LogMinerQueryBuilder.build(connectorConfig);
assertThat(result).isEqualTo(getQueryFromTemplate(connectorConfig));
}

@Test
@FixFor("DBZ-5648")
public void testLogMinerQueryWithLobEnabled() {
Expand Down Expand Up @@ -191,7 +211,10 @@ private String getPdbPredicate(OracleConnectorConfig config) {
}

private String getOperationCodePredicate(OracleConnectorConfig config) {
final String codes = config.isLobEnabled() ? OPERATION_CODES_LOB_ENABLED : OPERATION_CODES_LOB_DISABLED;
final String codes = config.isLobEnabled() ? OPERATION_CODES_LOB_ENABLED
: (config.getLogMiningBufferType() == OracleConnectorConfig.LogMiningBufferType.MEMORY)
? OPERATION_CODES_LOB_DISABLED
: OPERATION_CODES_LOB_DISABLED_AND_PERSISTENT_BUFFER;
final String predicate = OPERATION_CODES_PREDICATE.replace("${operationCodes}", codes);
return predicate.replace("${operationDdl}", config.storeOnlyCapturedTables() ? getOperationDdlPredicate() : "");
}
Expand Down
Loading

0 comments on commit edac20f

Please sign in to comment.