Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sub-transaction positioning #2035

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ private void setupInitPosition(OptionSet options) {
}
}

this.initPosition = new Position(new BinlogPosition(pos, initPositionSplit[0]), lastHeartbeat);
this.initPosition = new Position(new BinlogPosition(pos, initPositionSplit[0]), lastHeartbeat, 0L);
}
}

Expand Down
13 changes: 9 additions & 4 deletions src/main/java/com/zendesk/maxwell/MaxwellContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,26 @@ public Thread terminate(Exception error) {

if (taskManager.requestStop()) {
if (this.error == null && this.replicator != null) {
sendFinalHeartbeat();
stopReplicator();
}
this.terminationThread = spawnTerminateThread();
}
return this.terminationThread;
}


private void sendFinalHeartbeat() {
/**
* Attempt to gracefully terminate the binlog replicator by sending a heartbeat
* through the system for the replicator to stop at. If this fails we'll terminate
* more forcefully.
*/
private void stopReplicator() {
long heartbeat = System.currentTimeMillis();
LOGGER.info("Sending final heartbeat: " + heartbeat);
try {
this.replicator.stopAtHeartbeat(heartbeat);
this.positionStore.heartbeat(heartbeat);

long deadline = heartbeat + 5000L;
while (positionStoreThread.getPosition().getLastHeartbeatRead() < heartbeat) {
if (System.currentTimeMillis() > deadline) {
Expand Down Expand Up @@ -423,8 +429,7 @@ public RecoveryInfo getRecoveryInfo() throws SQLException {
* @param r A processed Rowmap
*/
public void setPosition(RowMap r) {
if ( r.isTXCommit() )
this.setPosition(r.getNextPosition());
this.setPosition(r.getNextPosition());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,8 @@ private void pushSkippedRows() throws Exception {
}
}

@Override
public StopPriority getStopPriority() {
return StopPriority.BINLOG;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,9 @@ public void requestStop() throws Exception {
@Override
public void awaitStop(Long timeout) throws TimeoutException {
}

@Override
public StopPriority getStopPriority() {
return StopPriority.SUPPORT;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.zendesk.maxwell.producer;

import com.codahale.metrics.Gauge;
import com.zendesk.maxwell.MaxwellConfig;
import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.monitoring.Metrics;
import com.zendesk.maxwell.replication.Position;
Expand All @@ -16,21 +15,21 @@ public class CallbackCompleter {
private final MaxwellContext context;
private final int metricsAgeSloMs;
private final Position position;
private final boolean isTXCommit;
private final boolean shouldCommit;
private final long messageID;

public CallbackCompleter(InflightMessageList inflightMessages, Position position, boolean isTXCommit, MaxwellContext context, long messageID) {
public CallbackCompleter(InflightMessageList inflightMessages, Position position, boolean shouldCommit, MaxwellContext context, long messageID) {
this.inflightMessages = inflightMessages;
this.context = context;
this.metricsAgeSloMs = context.getConfig().metricsAgeSlo * 1000;
this.position = position;
this.isTXCommit = isTXCommit;
this.shouldCommit = shouldCommit;
this.messageID = messageID;
}

public void markCompleted() {
inflightMessages.freeSlot(messageID);
if(isTXCommit) {
if(shouldCommit) {
InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position);

if (message != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,12 @@ public void awaitStop(Long timeout) throws TimeoutException {
taskState.awaitStop(thread, timeout);
}

@Override
@Override
public StopPriority getStopPriority() {
return StopPriority.PRODUCER;
}

@Override
public void run() {
this.thread = Thread.currentThread();
while (true) {
Expand Down Expand Up @@ -279,4 +284,4 @@ future, new BigQueryCallback(this, appendContext, cc, r.getNextPosition(),
this.context),
MoreExecutors.directExecutor());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ public void awaitStop(Long timeout) throws TimeoutException {
taskState.awaitStop(thread, timeout);
}

@Override
public StopPriority getStopPriority() {
return StopPriority.PRODUCER;
}

// force-close for tests.
public void close() {
kafka.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,12 @@ public void awaitStop(Long timeout) throws TimeoutException {
taskState.awaitStop(thread, timeout);
}

@Override
@Override
public StopPriority getStopPriority() {
return StopPriority.PRODUCER;
}

@Override
public StoppableTask getStoppableTask() {
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.row.RowMap;
import com.zendesk.maxwell.util.TaskManager;
import com.zendesk.maxwell.util.TopicInterpolator;
import com.zendesk.maxwell.util.StoppableTask;
import org.slf4j.Logger;
Expand All @@ -20,7 +21,8 @@
import java.util.HashSet;
import java.util.Set;

public class MaxwellRedisProducer extends AbstractProducer implements StoppableTask {
public
class MaxwellRedisProducer extends AbstractProducer implements StoppableTask {
private static final Logger logger = LoggerFactory.getLogger(MaxwellRedisProducer.class);
private final String channel;
private final String redisType;
Expand Down Expand Up @@ -190,6 +192,11 @@ public void requestStop() {
@Override
public void awaitStop(Long timeout) { }

@Override
public StopPriority getStopPriority() {
return StopPriority.PRODUCER;
}

@Override
public StoppableTask getStoppableTask() {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public void push(RowMap r) throws Exception {
System.currentTimeMillis() - r.getTimestampMillis(),
TimeUnit.MILLISECONDS
);

this.context.setPosition(r);
}
}
2 changes: 1 addition & 1 deletion src/main/java/com/zendesk/maxwell/recovery/Recovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public HeartbeatRowMap recover() throws Exception {
List<BinlogPosition> list = getBinlogInfo();
for ( int i = list.size() - 1; i >= 0 ; i-- ) {
BinlogPosition binlogPosition = list.get(i);
Position position = Position.valueOf(binlogPosition, recoveryInfo.getHeartbeat());
Position position = Position.valueOf(binlogPosition, recoveryInfo.getHeartbeat(), 0L);
Metrics metrics = new NoOpMetrics();

LOGGER.debug("scanning binlog: {}", binlogPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,24 @@ private RowMap buildRowMap(String type, Position position, Position nextPosition
return map;
}

public List<RowMap> jsonMaps(Table table, long lastHeartbeatRead, String rowQuery) throws ColumnDefCastException {
public List<RowMap> jsonMaps(Table table, long lastHeartbeatRead, String rowQuery, BinlogPosition lastTableMapPosition) throws ColumnDefCastException {
ArrayList<RowMap> list = new ArrayList<>();

Position position = Position.valueOf(this.position, lastHeartbeatRead);
Position nextPosition = Position.valueOf(this.nextPosition, lastHeartbeatRead);
Position position = Position.valueOf(this.position, lastHeartbeatRead, 0L);
/*
at the time our RowMaps are created we have no idea where we are
in a transaciton; we may be the last row-event in the transaction
in which case we're going to "commit" -- ie advance maxwell.positions,
or we may be in the middle of a transaction.

If we're in the middle of a transaction, we want to leave the binlog pointer
pointing at the table-map event that precedes this. The calling function
will additionally add a row-offset to the position of each row.

All this mechanism allows maxwell to stop and restart in the middle of very large
transactions.
*/
Position nextPosition = Position.valueOf(lastTableMapPosition, lastHeartbeatRead, 0L);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we fix like this, prevent it from some nullpoint exception
Position nextPosition = null;
if(lastTableMapPosition != null){
nextPosition = Position.valueOf(lastTableMapPosition, lastHeartbeatRead, 0L);
}else{
nextPosition = Position.valueOf(this.nextPosition, lastHeartbeatRead, 0L);
}


switch ( getType() ) {
case WRITE_ROWS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class BinlogConnectorReplicator extends RunLoopProcess implements Replica

private Position lastHeartbeatPosition;
private final HeartbeatNotifier heartbeatNotifier;
private Position startPosition;
private Long stopAtHeartbeat;
private Filter filter;
private Boolean ignoreMissingSchema;
Expand All @@ -86,6 +87,11 @@ public class BinlogConnectorReplicator extends RunLoopProcess implements Replica

private boolean isConnected = false;

@Override
public StopPriority getStopPriority() {
return StopPriority.BINLOG;
}

private class ClientReconnectedException extends Exception {}

public BinlogConnectorReplicator(
Expand Down Expand Up @@ -182,6 +188,7 @@ public BinlogConnectorReplicator(
this.client.setUseSendAnnotateRowsEvent(true);


this.startPosition = start;
BinlogPosition startBinlog = start.getBinlogPosition();
if (startBinlog.getGtidSetStr() != null) {
String gtidStr = startBinlog.getGtidSetStr();
Expand Down Expand Up @@ -408,8 +415,8 @@ private void processQueryEvent(BinlogConnectorEvent event) throws Exception {
data.getDatabase(),
data.getSql(),
this.schemaStore,
Position.valueOf(event.getPosition(), getLastHeartbeatRead()),
Position.valueOf(event.getNextPosition(), getLastHeartbeatRead()),
Position.valueOf(event.getPosition(), getLastHeartbeatRead(), 0L),
Position.valueOf(event.getNextPosition(), getLastHeartbeatRead(), 0L),
event.getEvent().getHeader().getTimestamp()
);
}
Expand Down Expand Up @@ -534,11 +541,13 @@ private void tryReconnect() throws TimeoutException {
* @return A RowMapBuffer of rows; either in-memory or on disk.
*/

private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws Exception {
private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent, long startTXOffset) throws Exception {
BinlogConnectorEvent event;
BinlogPosition lastTableMapPosition = null;
RowMapBuffer buffer = new RowMapBuffer(MAX_TX_ELEMENTS, this.bufferMemoryUsage);

String currentQuery = null;
long txOffset = 0;

while ( true ) {
event = pollEvent();
Expand All @@ -551,7 +560,9 @@ private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws
EventType eventType = event.getEvent().getHeader().getEventType();
if (event.isCommitEvent()) {
if (!buffer.isEmpty()) {
buffer.getLast().setTXCommit();
RowMap lastRow = buffer.getLast();
lastRow.setTXCommit();
lastRow.updateNextPosition(event.getNextPosition());
long timeSpent = buffer.getLast().getTimestampMillis() - beginEvent.getEvent().getHeader().getTimestamp();
transactionExecutionTime.update(timeSpent);
transactionRowCount.update(buffer.size());
Expand All @@ -574,22 +585,27 @@ private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws
if ( table != null && shouldOutputEvent(table.getDatabase(), table.getName(), filter, table.getColumnNames()) ) {
List<RowMap> rows;
try {
rows = event.jsonMaps(table, getLastHeartbeatRead(), currentQuery);
rows = event.jsonMaps(table, getLastHeartbeatRead(), currentQuery, lastTableMapPosition);
} catch ( ColumnDefCastException e ) {
logColumnDefCastException(table, e);

throw(e);
}

for ( RowMap r : rows )
if (shouldOutputRowMap(table.getDatabase(), table.getName(), r, filter)) {
buffer.add(r);
for ( RowMap r : rows ) {
Copy link

@jackjoesh jackjoesh Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should prevent this logic from impacting rows having different positions, not applying any tx_offset logic:

                    if(rows != null){
                        for ( int i = 0; i < rows.size(); i++) {
                            RowMap r = rows.get(i);
                            if(i > 0 && rows.get(i-1).getPosition().equals(r.getPosition())){
                                //only batch insert uses tx offset
                                r.setXoffset(++txOffset);
                            }else{
                                txOffset = 0l;
                                r.setXoffset(0l);
                            }
                            if ( shouldOutputRowMap(table.getDatabase(), table.getName(), r, filter) ) {
                                if ( startTXOffset <= r.getXoffset() ) {
                                    buffer.add(r);
                                }
                            }
                        }
                    }

r.setXoffset(txOffset++);
if ( shouldOutputRowMap(table.getDatabase(), table.getName(), r, filter) ) {
if ( startTXOffset <= r.getXoffset() ) {
buffer.add(r);
}
}
}
}
break;
case TABLE_MAP:
TableMapEventData data = event.tableMapData();
tableCache.processEvent(getSchema(), this.filter, this.ignoreMissingSchema, data.getTableId(), data.getDatabase(), data.getTable());
lastTableMapPosition = event.getPosition();
break;
case ROWS_QUERY:
RowsQueryEventData rqed = event.getEvent().getData();
Expand Down Expand Up @@ -663,6 +679,7 @@ private void logColumnDefCastException(Table table, ColumnDefCastException e) {
* @return either a RowMap or null
*/
public RowMap getRow() throws Exception {
long txOffset;
BinlogConnectorEvent event;

if ( stopOnEOF && hitEOF )
Expand Down Expand Up @@ -699,29 +716,46 @@ public RowMap getRow() throws Exception {
}
}


switch (event.getType()) {
case WRITE_ROWS:
case EXT_WRITE_ROWS:
case UPDATE_ROWS:
case EXT_UPDATE_ROWS:
case DELETE_ROWS:
case EXT_DELETE_ROWS:
LOGGER.warn("Started replication stream inside a transaction. This shouldn't normally happen.");
LOGGER.warn("Assuming new transaction at unexpected event:" + event);
LOGGER.warn("Started replication stream inside a transaction, probably recovering from stopping inside transaction.");

queue.offerFirst(event);
rowBuffer = getTransactionRows(event);
rowBuffer = getTransactionRows(event, 0);
break;
case TABLE_MAP:
TableMapEventData data = event.tableMapData();
tableCache.processEvent(getSchema(), this.filter,this.ignoreMissingSchema, data.getTableId(), data.getDatabase(), data.getTable());

if ( startPosition != null && startPosition.getTXOffset() > 0 ) {
LOGGER.info("Restarting maxwell inside transaction: {}", startPosition);
queue.offerFirst(event); // put table map back on top of queue
rowBuffer = getTransactionRows(event, startPosition.getTXOffset());
}

startPosition = null;
break;
case QUERY:
txOffset = 0;
QueryEventData qe = event.queryData();
String sql = qe.getSql();

if ( startPosition != null && startPosition.getTXOffset() > 0 ) {
LOGGER.info("Restarting maxwell inside transaction: {}", startPosition);
txOffset = startPosition.getTXOffset();;
}

startPosition = null;

if (BinlogConnectorEvent.BEGIN.equals(sql)) {
try {
rowBuffer = getTransactionRows(event);
rowBuffer = getTransactionRows(event, txOffset);
} catch ( ClientReconnectedException e ) {
// rowBuffer should already be empty by the time we get to this switch
// statement, but we null it for clarity
Expand All @@ -739,8 +773,17 @@ public RowMap getRow() throws Exception {
// in mariaDB the GTID event supplants the normal BEGIN
MariadbGtidEventData g = event.mariaGtidData();
if ( (g.getFlags() & MariadbGtidEventData.FL_STANDALONE) == 0 ) {
txOffset = 0;

if ( startPosition != null && startPosition.getTXOffset() > 0 ) {
LOGGER.info("Restarting maxwell inside transaction: {}", startPosition);
txOffset = startPosition.getTXOffset();;
}

startPosition = null;

try {
rowBuffer = getTransactionRows(event);
rowBuffer = getTransactionRows(event, txOffset);
} catch ( ClientReconnectedException e ) {
// rowBuffer should already be empty by the time we get to this switch
// statement, but we null it for clarity
Expand Down
Loading