From e6249ea318b7f363eef24248d0492befd994f123 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Sun, 2 Jul 2023 00:31:42 -0700 Subject: [PATCH 1/5] support sub-transaction positioning When we end up crashing for reasons like we got throttled by the producer, this will allow us to continue to make progress. --- .../com/zendesk/maxwell/MaxwellConfig.java | 2 +- .../com/zendesk/maxwell/MaxwellContext.java | 13 +++-- .../bootstrap/BootstrapController.java | 4 ++ .../maxwell/monitoring/MaxwellHTTPServer.java | 5 ++ .../producer/AbstractAsyncProducer.java | 9 ++- .../producer/MaxwellBigQueryProducer.java | 9 ++- .../producer/MaxwellKafkaProducer.java | 5 ++ .../producer/MaxwellPubsubProducer.java | 7 ++- .../producer/MaxwellRedisProducer.java | 9 ++- .../maxwell/producer/StdoutProducer.java | 2 + .../zendesk/maxwell/recovery/Recovery.java | 2 +- .../replication/BinlogConnectorEvent.java | 14 ++++- .../BinlogConnectorReplicator.java | 58 ++++++++++++++----- .../zendesk/maxwell/replication/Position.java | 36 ++++++++---- .../java/com/zendesk/maxwell/row/RowMap.java | 5 ++ .../com/zendesk/maxwell/row/RowMapBuffer.java | 2 - .../maxwell/schema/MysqlPositionStore.java | 19 +++--- .../maxwell/schema/MysqlSavedSchema.java | 2 +- .../maxwell/schema/MysqlSchemaCompactor.java | 5 ++ .../maxwell/schema/PositionStoreThread.java | 8 ++- .../maxwell/schema/SchemaStoreSchema.java | 7 ++- .../zendesk/maxwell/util/StoppableTask.java | 7 +++ .../com/zendesk/maxwell/util/TaskManager.java | 29 ++++++---- src/main/resources/sql/maxwell_schema.sql | 1 + .../producer/BigQueryCallbackTest.java | 2 +- .../producer/InflightMessageListTest.java | 8 +-- .../producer/MaxwellKinesisProducerTest.java | 2 +- .../maxwell/producer/NatsProducerTest.java | 2 +- .../zendesk/maxwell/util/TaskManagerTest.java | 10 ++++ 29 files changed, 213 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java index 12722b8a6..7d1bc773a 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java @@ -1271,7 +1271,7 @@ private void setupInitPosition(OptionSet options) { } } - this.initPosition = new Position(new BinlogPosition(pos, initPositionSplit[0]), lastHeartbeat); + this.initPosition = new Position(new BinlogPosition(pos, initPositionSplit[0]), lastHeartbeat, 0L); } } diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index 1da30273f..65d9db3ba 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -256,7 +256,7 @@ public Thread terminate(Exception error) { if (taskManager.requestStop()) { if (this.error == null && this.replicator != null) { - sendFinalHeartbeat(); + stopReplicator(); } this.terminationThread = spawnTerminateThread(); } @@ -264,12 +264,18 @@ public Thread terminate(Exception error) { } - private void sendFinalHeartbeat() { + /** + * Attempt to gracefully terminate the binlog replicator by sending a heartbeat + * through the system for the replicator to stop at. If this fails we'll terminate + * more forcefully. + */ + private void stopReplicator() { long heartbeat = System.currentTimeMillis(); LOGGER.info("Sending final heartbeat: " + heartbeat); try { this.replicator.stopAtHeartbeat(heartbeat); this.positionStore.heartbeat(heartbeat); + long deadline = heartbeat + 5000L; while (positionStoreThread.getPosition().getLastHeartbeatRead() < heartbeat) { if (System.currentTimeMillis() > deadline) { @@ -423,8 +429,7 @@ public RecoveryInfo getRecoveryInfo() throws SQLException { * @param r A processed Rowmap */ public void setPosition(RowMap r) { - if ( r.isTXCommit() ) - this.setPosition(r.getNextPosition()); + this.setPosition(r.getNextPosition()); } /** diff --git a/src/main/java/com/zendesk/maxwell/bootstrap/BootstrapController.java b/src/main/java/com/zendesk/maxwell/bootstrap/BootstrapController.java index 59185b96f..5db6da0a5 100644 --- a/src/main/java/com/zendesk/maxwell/bootstrap/BootstrapController.java +++ b/src/main/java/com/zendesk/maxwell/bootstrap/BootstrapController.java @@ -165,4 +165,8 @@ private void pushSkippedRows() throws Exception { } } + @Override + public StopPriority getStopPriority() { + return StopPriority.BINLOG; + } } diff --git a/src/main/java/com/zendesk/maxwell/monitoring/MaxwellHTTPServer.java b/src/main/java/com/zendesk/maxwell/monitoring/MaxwellHTTPServer.java index ce57bdec6..75ee1f844 100644 --- a/src/main/java/com/zendesk/maxwell/monitoring/MaxwellHTTPServer.java +++ b/src/main/java/com/zendesk/maxwell/monitoring/MaxwellHTTPServer.java @@ -148,4 +148,9 @@ public void requestStop() throws Exception { @Override public void awaitStop(Long timeout) throws TimeoutException { } + + @Override + public StopPriority getStopPriority() { + return StopPriority.SUPPORT; + } } diff --git a/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java b/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java index 22fc194d1..54695146a 100644 --- a/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java @@ -1,7 +1,6 @@ package com.zendesk.maxwell.producer; import com.codahale.metrics.Gauge; -import com.zendesk.maxwell.MaxwellConfig; import com.zendesk.maxwell.MaxwellContext; import com.zendesk.maxwell.monitoring.Metrics; import com.zendesk.maxwell.replication.Position; @@ -16,21 +15,21 @@ public class CallbackCompleter { private final MaxwellContext context; private final int metricsAgeSloMs; private final Position position; - private final boolean isTXCommit; + private final boolean shouldCommit; private final long messageID; - public CallbackCompleter(InflightMessageList inflightMessages, Position position, boolean isTXCommit, MaxwellContext context, long messageID) { + public CallbackCompleter(InflightMessageList inflightMessages, Position position, boolean shouldCommit, MaxwellContext context, long messageID) { this.inflightMessages = inflightMessages; this.context = context; this.metricsAgeSloMs = context.getConfig().metricsAgeSlo * 1000; this.position = position; - this.isTXCommit = isTXCommit; + this.shouldCommit = shouldCommit; this.messageID = messageID; } public void markCompleted() { inflightMessages.freeSlot(messageID); - if(isTXCommit) { + if(shouldCommit) { InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position); if (message != null) { diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 6629ddbe9..c4ecc3493 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -239,7 +239,12 @@ public void awaitStop(Long timeout) throws TimeoutException { taskState.awaitStop(thread, timeout); } - @Override + @Override + public StopPriority getStopPriority() { + return StopPriority.PRODUCER; + } + + @Override public void run() { this.thread = Thread.currentThread(); while (true) { @@ -279,4 +284,4 @@ future, new BigQueryCallback(this, appendContext, cc, r.getNextPosition(), this.context), MoreExecutors.directExecutor()); } -} \ No newline at end of file +} diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java index 470626c61..771de5530 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java @@ -304,6 +304,11 @@ public void awaitStop(Long timeout) throws TimeoutException { taskState.awaitStop(thread, timeout); } + @Override + public StopPriority getStopPriority() { + return StopPriority.PRODUCER; + } + // force-close for tests. public void close() { kafka.close(); diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellPubsubProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellPubsubProducer.java index 6f0040531..77dea550e 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellPubsubProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellPubsubProducer.java @@ -275,7 +275,12 @@ public void awaitStop(Long timeout) throws TimeoutException { taskState.awaitStop(thread, timeout); } - @Override + @Override + public StopPriority getStopPriority() { + return StopPriority.PRODUCER; + } + + @Override public StoppableTask getStoppableTask() { return this; } diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java index f46605a64..f767b484a 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellRedisProducer.java @@ -2,6 +2,7 @@ import com.zendesk.maxwell.MaxwellContext; import com.zendesk.maxwell.row.RowMap; +import com.zendesk.maxwell.util.TaskManager; import com.zendesk.maxwell.util.TopicInterpolator; import com.zendesk.maxwell.util.StoppableTask; import org.slf4j.Logger; @@ -20,7 +21,8 @@ import java.util.HashSet; import java.util.Set; -public class MaxwellRedisProducer extends AbstractProducer implements StoppableTask { +public +class MaxwellRedisProducer extends AbstractProducer implements StoppableTask { private static final Logger logger = LoggerFactory.getLogger(MaxwellRedisProducer.class); private final String channel; private final String redisType; @@ -190,6 +192,11 @@ public void requestStop() { @Override public void awaitStop(Long timeout) { } + @Override + public StopPriority getStopPriority() { + return StopPriority.PRODUCER; + } + @Override public StoppableTask getStoppableTask() { return this; diff --git a/src/main/java/com/zendesk/maxwell/producer/StdoutProducer.java b/src/main/java/com/zendesk/maxwell/producer/StdoutProducer.java index c9a45a278..3a632fddb 100644 --- a/src/main/java/com/zendesk/maxwell/producer/StdoutProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/StdoutProducer.java @@ -21,6 +21,8 @@ public void push(RowMap r) throws Exception { System.currentTimeMillis() - r.getTimestampMillis(), TimeUnit.MILLISECONDS ); + this.context.setPosition(r); + Thread.sleep(1); } } diff --git a/src/main/java/com/zendesk/maxwell/recovery/Recovery.java b/src/main/java/com/zendesk/maxwell/recovery/Recovery.java index 16dc03a63..b73ae212f 100644 --- a/src/main/java/com/zendesk/maxwell/recovery/Recovery.java +++ b/src/main/java/com/zendesk/maxwell/recovery/Recovery.java @@ -56,7 +56,7 @@ public HeartbeatRowMap recover() throws Exception { List list = getBinlogInfo(); for ( int i = list.size() - 1; i >= 0 ; i-- ) { BinlogPosition binlogPosition = list.get(i); - Position position = Position.valueOf(binlogPosition, recoveryInfo.getHeartbeat()); + Position position = Position.valueOf(binlogPosition, recoveryInfo.getHeartbeat(), 0L); Metrics metrics = new NoOpMetrics(); LOGGER.debug("scanning binlog: {}", binlogPosition); diff --git a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorEvent.java b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorEvent.java index f068396ad..7b495b239 100644 --- a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorEvent.java +++ b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorEvent.java @@ -167,11 +167,19 @@ private RowMap buildRowMap(String type, Position position, Position nextPosition return map; } - public List jsonMaps(Table table, long lastHeartbeatRead, String rowQuery) throws ColumnDefCastException { + public List jsonMaps(Table table, long lastHeartbeatRead, String rowQuery, BinlogPosition lastTableMapPosition) throws ColumnDefCastException { ArrayList list = new ArrayList<>(); - Position position = Position.valueOf(this.position, lastHeartbeatRead); - Position nextPosition = Position.valueOf(this.nextPosition, lastHeartbeatRead); + Position position = Position.valueOf(this.position, lastHeartbeatRead, 0L); + /* + at creation time we don't know if we're a row event that wants to "commit" -- + ie advance the binlog position pointer. If we're not in a "commit this row" + + is the current position (with a skip-row offset in txOffset). + After we process the commit event we'll make the last row in the event bump the + binlog pointer. + */ + Position nextPosition = Position.valueOf(lastTableMapPosition, lastHeartbeatRead, 0L); switch ( getType() ) { case WRITE_ROWS: diff --git a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java index 27537c521..676e568ed 100644 --- a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java +++ b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java @@ -63,6 +63,7 @@ public class BinlogConnectorReplicator extends RunLoopProcess implements Replica private Position lastHeartbeatPosition; private final HeartbeatNotifier heartbeatNotifier; + private Position startPosition; private Long stopAtHeartbeat; private Filter filter; private Boolean ignoreMissingSchema; @@ -86,6 +87,11 @@ public class BinlogConnectorReplicator extends RunLoopProcess implements Replica private boolean isConnected = false; + @Override + public StopPriority getStopPriority() { + return StopPriority.BINLOG; + } + private class ClientReconnectedException extends Exception {} public BinlogConnectorReplicator( @@ -182,6 +188,7 @@ public BinlogConnectorReplicator( this.client.setUseSendAnnotateRowsEvent(true); + this.startPosition = start; BinlogPosition startBinlog = start.getBinlogPosition(); if (startBinlog.getGtidSetStr() != null) { String gtidStr = startBinlog.getGtidSetStr(); @@ -408,8 +415,8 @@ private void processQueryEvent(BinlogConnectorEvent event) throws Exception { data.getDatabase(), data.getSql(), this.schemaStore, - Position.valueOf(event.getPosition(), getLastHeartbeatRead()), - Position.valueOf(event.getNextPosition(), getLastHeartbeatRead()), + Position.valueOf(event.getPosition(), getLastHeartbeatRead(), 0L), + Position.valueOf(event.getNextPosition(), getLastHeartbeatRead(), 0L), event.getEvent().getHeader().getTimestamp() ); } @@ -522,6 +529,14 @@ private void tryReconnect() throws TimeoutException { throw new TimeoutException("Maximum reconnection attempts reached."); } + private long getTXOffset(BinlogConnectorEvent event) { + if ( event.getPosition() == startPosition.getBinlogPosition() ) { + return startPosition.getTXOffset(); + } else { + return 0; + } + } + /** * Get a batch of rows for the current transaction. * @@ -534,11 +549,13 @@ private void tryReconnect() throws TimeoutException { * @return A RowMapBuffer of rows; either in-memory or on disk. */ - private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws Exception { + private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent, long startTXOffset) throws Exception { BinlogConnectorEvent event; + BinlogPosition lastTableMapPosition = null; RowMapBuffer buffer = new RowMapBuffer(MAX_TX_ELEMENTS, this.bufferMemoryUsage); String currentQuery = null; + long txOffset = 0; while ( true ) { event = pollEvent(); @@ -551,7 +568,9 @@ private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws EventType eventType = event.getEvent().getHeader().getEventType(); if (event.isCommitEvent()) { if (!buffer.isEmpty()) { - buffer.getLast().setTXCommit(); + RowMap lastRow = buffer.getLast(); + lastRow.setTXCommit(); + lastRow.updateNextPosition(event.getNextPosition()); long timeSpent = buffer.getLast().getTimestampMillis() - beginEvent.getEvent().getHeader().getTimestamp(); transactionExecutionTime.update(timeSpent); transactionRowCount.update(buffer.size()); @@ -574,22 +593,27 @@ private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws if ( table != null && shouldOutputEvent(table.getDatabase(), table.getName(), filter, table.getColumnNames()) ) { List rows; try { - rows = event.jsonMaps(table, getLastHeartbeatRead(), currentQuery); + rows = event.jsonMaps(table, getLastHeartbeatRead(), currentQuery, lastTableMapPosition); } catch ( ColumnDefCastException e ) { logColumnDefCastException(table, e); throw(e); } - for ( RowMap r : rows ) - if (shouldOutputRowMap(table.getDatabase(), table.getName(), r, filter)) { - buffer.add(r); + for ( RowMap r : rows ) { + r.setXoffset(txOffset++); + if ( shouldOutputRowMap(table.getDatabase(), table.getName(), r, filter) ) { + if ( startTXOffset <= r.getXoffset() ) { + buffer.add(r); + } } + } } break; case TABLE_MAP: TableMapEventData data = event.tableMapData(); tableCache.processEvent(getSchema(), this.filter, this.ignoreMissingSchema, data.getTableId(), data.getDatabase(), data.getTable()); + lastTableMapPosition = event.getPosition(); break; case ROWS_QUERY: RowsQueryEventData rqed = event.getEvent().getData(); @@ -699,6 +723,7 @@ public RowMap getRow() throws Exception { } } + switch (event.getType()) { case WRITE_ROWS: case EXT_WRITE_ROWS: @@ -706,22 +731,29 @@ public RowMap getRow() throws Exception { case EXT_UPDATE_ROWS: case DELETE_ROWS: case EXT_DELETE_ROWS: - LOGGER.warn("Started replication stream inside a transaction. This shouldn't normally happen."); - LOGGER.warn("Assuming new transaction at unexpected event:" + event); + LOGGER.warn("Started replication stream inside a transaction, probably recovering from stopping inside transaction."); queue.offerFirst(event); - rowBuffer = getTransactionRows(event); + rowBuffer = getTransactionRows(event, getTXOffset(event)); break; case TABLE_MAP: TableMapEventData data = event.tableMapData(); tableCache.processEvent(getSchema(), this.filter,this.ignoreMissingSchema, data.getTableId(), data.getDatabase(), data.getTable()); + + /* starting inside a transaction, should be pointing at a table map event. */ + if ( startPosition != null && startPosition.getTXOffset() > 0 ) { + queue.offerFirst(event); + rowBuffer = getTransactionRows(event, startPosition.getTXOffset()); + } + + startPosition = null; break; case QUERY: QueryEventData qe = event.queryData(); String sql = qe.getSql(); if (BinlogConnectorEvent.BEGIN.equals(sql)) { try { - rowBuffer = getTransactionRows(event); + rowBuffer = getTransactionRows(event, getTXOffset(event)); } catch ( ClientReconnectedException e ) { // rowBuffer should already be empty by the time we get to this switch // statement, but we null it for clarity @@ -740,7 +772,7 @@ public RowMap getRow() throws Exception { MariadbGtidEventData g = event.mariaGtidData(); if ( (g.getFlags() & MariadbGtidEventData.FL_STANDALONE) == 0 ) { try { - rowBuffer = getTransactionRows(event); + rowBuffer = getTransactionRows(event, getTXOffset(event)); } catch ( ClientReconnectedException e ) { // rowBuffer should already be empty by the time we get to this switch // statement, but we null it for clarity diff --git a/src/main/java/com/zendesk/maxwell/replication/Position.java b/src/main/java/com/zendesk/maxwell/replication/Position.java index 3ed0a4c78..b534fbb5f 100644 --- a/src/main/java/com/zendesk/maxwell/replication/Position.java +++ b/src/main/java/com/zendesk/maxwell/replication/Position.java @@ -9,22 +9,28 @@ public class Position implements Serializable { // For a HeartbeatRow, it is the exact (new) heartbeat value for this position. private final long lastHeartbeatRead; private final BinlogPosition binlogPosition; + private final long txOffset; - public Position(BinlogPosition binlogPosition, long lastHeartbeatRead) { + public Position(BinlogPosition binlogPosition, long lastHeartbeatRead, long txOffset) { this.binlogPosition = binlogPosition; this.lastHeartbeatRead = lastHeartbeatRead; + this.txOffset = txOffset; } - public static Position valueOf(BinlogPosition binlogPosition, Long lastHeartbeatRead) { - return new Position(binlogPosition, lastHeartbeatRead); + public Position(BinlogPosition binlogPosition, long lastHeartbeatRead) { + this(binlogPosition, lastHeartbeatRead, 0L); } - public Position withHeartbeat(long lastHeartbeatRead) { - return new Position(getBinlogPosition(), lastHeartbeatRead); + public static Position valueOf(BinlogPosition binlogPosition, Long lastHeartbeatRead, long txOffset) { + return new Position(binlogPosition, lastHeartbeatRead, txOffset); } public static Position capture(Connection c, boolean gtidMode) throws SQLException { - return new Position(BinlogPosition.capture(c, gtidMode), 0L); + return new Position(BinlogPosition.capture(c, gtidMode), 0L, 0L); + } + + public Position withHeartbeat(long lastHeartbeatRead) { + return new Position(getBinlogPosition(), lastHeartbeatRead, txOffset); } public long getLastHeartbeatRead() { @@ -35,13 +41,17 @@ public BinlogPosition getBinlogPosition() { return binlogPosition; } - public Position addGtid(String gtid, long offset, String file) { - return new Position(binlogPosition.addGtid(gtid, offset, file), lastHeartbeatRead); + public long getTXOffset() { + return txOffset; } @Override public String toString() { - return "Position[" + binlogPosition + ", lastHeartbeat=" + lastHeartbeatRead + "]"; + String s = "Position[" + binlogPosition + ", lastHeartbeat=" + lastHeartbeatRead + "]"; + if ( txOffset > 0 ) { + s += "+" + txOffset; + } + return s; } public String toCommandline() { @@ -69,8 +79,14 @@ public int hashCode() { } public boolean newerThan(Position other) { + BinlogPosition ours = this.getBinlogPosition(); + BinlogPosition theirs = other.getBinlogPosition(); if ( other == null ) return true; - return this.getBinlogPosition().newerThan(other.getBinlogPosition()); + return ours.newerThan(theirs) || (ours.equals(theirs) && this.txOffset > other.txOffset); + } + + public Position withTXOffset(Long txOffset) { + return new Position(binlogPosition, lastHeartbeatRead, txOffset); } } diff --git a/src/main/java/com/zendesk/maxwell/row/RowMap.java b/src/main/java/com/zendesk/maxwell/row/RowMap.java index 37b3343b1..c7215ff41 100644 --- a/src/main/java/com/zendesk/maxwell/row/RowMap.java +++ b/src/main/java/com/zendesk/maxwell/row/RowMap.java @@ -340,12 +340,17 @@ public Long getXoffset() { public void setXoffset(Long xoffset) { this.xoffset = xoffset; + nextPosition = nextPosition.withTXOffset(xoffset + 1); } public void setTXCommit() { this.txCommit = true; } + public void updateNextPosition(BinlogPosition newPosition) { + nextPosition = Position.valueOf(newPosition, nextPosition.getLastHeartbeatRead(), 0L); + } + public boolean isTXCommit() { return this.txCommit; } diff --git a/src/main/java/com/zendesk/maxwell/row/RowMapBuffer.java b/src/main/java/com/zendesk/maxwell/row/RowMapBuffer.java index ad595550d..f1aeb3843 100644 --- a/src/main/java/com/zendesk/maxwell/row/RowMapBuffer.java +++ b/src/main/java/com/zendesk/maxwell/row/RowMapBuffer.java @@ -10,7 +10,6 @@ public class RowMapBuffer extends ListWithDiskBuffer { static final Logger LOGGER = LoggerFactory.getLogger(RowMapBuffer.class); private static long FlushOutputStreamBytes = 10000000; private Long xid; - private Long xoffset = 0L; private Long serverId; private Long threadId; private Long schemaId; @@ -65,7 +64,6 @@ protected RowMap evict() throws IOException { public RowMap removeFirst() throws IOException, ClassNotFoundException { RowMap r = super.removeFirst(RowMap.class); r.setXid(this.xid); - r.setXoffset(this.xoffset++); r.setServerId(this.serverId); r.setThreadId(this.threadId); r.setSchemaId(this.schemaId); diff --git a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java index b61aa271d..d1e3ce5c0 100644 --- a/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java +++ b/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java @@ -49,10 +49,11 @@ public void set(Position newPosition) throws SQLException, DuplicateProcessExcep + "binlog_file = ?, " + "binlog_position = ?, " + "last_heartbeat_read = ?, " - + "client_id = ? " + + "client_id = ?, " + + "tx_offset = ? " + "ON DUPLICATE KEY UPDATE " + "last_heartbeat_read = ?, " - + "gtid_set = ?, binlog_file = ?, binlog_position=?"; + + "gtid_set = ?, binlog_file = ?, binlog_position = ?, tx_offset = ?"; BinlogPosition binlogPosition = newPosition.getBinlogPosition(); connectionPool.withSQLRetry(1, (c) -> { @@ -65,10 +66,12 @@ public void set(Position newPosition) throws SQLException, DuplicateProcessExcep s.setLong(4, binlogPosition.getOffset()); s.setLong(5, heartbeat); s.setString(6, clientID); - s.setLong(7, heartbeat); - s.setString(8, binlogPosition.getGtidSetStr()); - s.setString(9, binlogPosition.getFile()); - s.setLong(10, binlogPosition.getOffset()); + s.setLong(7, newPosition.getTXOffset()); + s.setLong(8, heartbeat); + s.setString(9, binlogPosition.getGtidSetStr()); + s.setString(10, binlogPosition.getFile()); + s.setLong(11, binlogPosition.getOffset()); + s.setLong(12, newPosition.getTXOffset()); s.execute(); } @@ -173,7 +176,7 @@ public static Position positionFromResultSet(ResultSet rs, boolean gtidMode) thr rs.getString("binlog_file") ); - return new Position(pos, rs.getLong("last_heartbeat_read")); + return new Position(pos, rs.getLong("last_heartbeat_read"), rs.getLong("tx_offset")); } public Position getLatestFromAnyClient() throws SQLException { @@ -236,7 +239,7 @@ protected List getAllRecoveryInfos(Connection c) throws SQLExcepti BinlogPosition.at(gtid, rs.getLong("binlog_position"), rs.getString("binlog_file") - ), rs.getLong("last_heartbeat_read")); + ), rs.getLong("last_heartbeat_read"), 0L); if ( rs.wasNull() ) { LOGGER.warn("master recovery is ignoring position with NULL heartbeat"); diff --git a/src/main/java/com/zendesk/maxwell/schema/MysqlSavedSchema.java b/src/main/java/com/zendesk/maxwell/schema/MysqlSavedSchema.java index 6b864c20d..a97eb3080 100644 --- a/src/main/java/com/zendesk/maxwell/schema/MysqlSavedSchema.java +++ b/src/main/java/com/zendesk/maxwell/schema/MysqlSavedSchema.java @@ -423,7 +423,7 @@ private void restoreSchemaMetadata(Connection conn, Long schemaID) throws SQLExc null, schemaRS.getInt("binlog_position"), schemaRS.getString("binlog_file") - ), schemaRS.getLong("last_heartbeat_read") + ), schemaRS.getLong("last_heartbeat_read"), 0L )); LOGGER.info("Restoring schema id " + schemaRS.getLong("id") + " (last modified at " + this.position + ")"); diff --git a/src/main/java/com/zendesk/maxwell/schema/MysqlSchemaCompactor.java b/src/main/java/com/zendesk/maxwell/schema/MysqlSchemaCompactor.java index 4c326f990..aa205de51 100644 --- a/src/main/java/com/zendesk/maxwell/schema/MysqlSchemaCompactor.java +++ b/src/main/java/com/zendesk/maxwell/schema/MysqlSchemaCompactor.java @@ -203,4 +203,9 @@ private void slowDeleteFrom(String table, Connection cx, long schemaID) throws S } } catch ( InterruptedException e ) {} } + + @Override + public StopPriority getStopPriority() { + return StopPriority.SUPPORT; + } } diff --git a/src/main/java/com/zendesk/maxwell/schema/PositionStoreThread.java b/src/main/java/com/zendesk/maxwell/schema/PositionStoreThread.java index a05ec0e9e..93e591c0b 100644 --- a/src/main/java/com/zendesk/maxwell/schema/PositionStoreThread.java +++ b/src/main/java/com/zendesk/maxwell/schema/PositionStoreThread.java @@ -59,7 +59,7 @@ protected void beforeStop() { } void storeFinalPosition() throws SQLException, DuplicateProcessException { - if ( position != null && !position.equals(storedPosition) ) { + if ( position != null && position.newerThan(storedPosition) ) { LOGGER.info("Storing final position: " + position); store.set(position); } @@ -94,6 +94,7 @@ boolean shouldHeartbeat(Position currentPosition) { public void work() throws Exception { Position newPosition = position; + LOGGER.info("new: {}, old: {}", newPosition, storedPosition); if ( newPosition != null && newPosition.newerThan(storedPosition) ) { store.set(newPosition); storedPosition = newPosition; @@ -133,5 +134,10 @@ public synchronized Position getPosition() throws SQLException { return position; } + + @Override + public StopPriority getStopPriority() { + return StopPriority.SUPPORT; // position store stops late to flush final rows + } } diff --git a/src/main/java/com/zendesk/maxwell/schema/SchemaStoreSchema.java b/src/main/java/com/zendesk/maxwell/schema/SchemaStoreSchema.java index ea602adc3..644adfb33 100644 --- a/src/main/java/com/zendesk/maxwell/schema/SchemaStoreSchema.java +++ b/src/main/java/com/zendesk/maxwell/schema/SchemaStoreSchema.java @@ -168,6 +168,10 @@ public static void upgradeSchemaStoreSchema(Connection c) throws SQLException, I performAlter(c, "alter table `positions` add column `last_heartbeat_read` bigint null default null"); } + if ( !getTableColumns("positions", c).containsKey("tx_offset") ) { + performAlter(c, "alter table `positions` add column `tx_offset` int not null default 0"); + } + if ( !getTableColumns("columns", c).containsKey("column_length") ) { performAlter(c, "alter table `columns` add column `column_length` tinyint unsigned"); } @@ -250,7 +254,8 @@ private static void backfillPositionSHAs(Connection c) throws SQLException { Long id = rs.getLong("id"); Position position = new Position( new BinlogPosition(rs.getLong("binlog_position"), rs.getString("binlog_file")), - rs.getLong("last_heartbeat_read") + rs.getLong("last_heartbeat_read"), + 0L ); String sha = MysqlSavedSchema.getSchemaPositionSHA(rs.getLong("server_id"), position); try ( Statement stmtUpdate = c.createStatement() ) { // statements cannot interleave ResultSets, so we need a new statement diff --git a/src/main/java/com/zendesk/maxwell/util/StoppableTask.java b/src/main/java/com/zendesk/maxwell/util/StoppableTask.java index ddec2d5bd..4b500df3e 100644 --- a/src/main/java/com/zendesk/maxwell/util/StoppableTask.java +++ b/src/main/java/com/zendesk/maxwell/util/StoppableTask.java @@ -3,6 +3,13 @@ import java.util.concurrent.TimeoutException; public interface StoppableTask { + public enum StopPriority { + BINLOG, + PRODUCER, + SUPPORT + } + void requestStop() throws Exception; void awaitStop(Long timeout) throws TimeoutException; + StopPriority getStopPriority(); } diff --git a/src/main/java/com/zendesk/maxwell/util/TaskManager.java b/src/main/java/com/zendesk/maxwell/util/TaskManager.java index ad873b03b..ac41444e1 100644 --- a/src/main/java/com/zendesk/maxwell/util/TaskManager.java +++ b/src/main/java/com/zendesk/maxwell/util/TaskManager.java @@ -42,17 +42,26 @@ public synchronized void stop(Exception error) throws Exception { LOGGER.error("cause: ", error); } - // tell everything to stop - for (StoppableTask task: this.tasks) { - LOGGER.info("Stopping: " + task); - task.requestStop(); - } + // tell everything to stop, binlog then producer then position store/etc + for ( StoppableTask.StopPriority priority : StoppableTask.StopPriority.values() ) { + for (StoppableTask task: this.tasks) { + if ( task.getStopPriority() != priority ) + continue; + + LOGGER.info("Stopping: " + task); + task.requestStop(); + } + + // then wait for everything to stop + Long timeout = 1000L; + for (StoppableTask task: this.tasks) { + if ( task.getStopPriority() != priority ) + continue; + + LOGGER.debug("Awaiting stop of: {}", task); + task.awaitStop(timeout); + } - // then wait for everything to stop - Long timeout = 1000L; - for (StoppableTask task: this.tasks) { - LOGGER.debug("Awaiting stop of: {}", task); - task.awaitStop(timeout); } this.state = RunState.STOPPED; diff --git a/src/main/resources/sql/maxwell_schema.sql b/src/main/resources/sql/maxwell_schema.sql index cb89699dd..f52c4da1a 100644 --- a/src/main/resources/sql/maxwell_schema.sql +++ b/src/main/resources/sql/maxwell_schema.sql @@ -55,5 +55,6 @@ CREATE TABLE IF NOT EXISTS `positions` ( client_id varchar(255) charset latin1 not null default 'maxwell', heartbeat_at bigint null default null, last_heartbeat_read bigint null default null, + tx_offset int not null default 0, primary key(server_id, client_id) ); diff --git a/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java b/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java index 5a32c9e38..b922e353b 100644 --- a/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java +++ b/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java @@ -45,7 +45,7 @@ public void shouldIgnoreProducerErrorByDefault() throws JSONException, Exception ArrayBlockingQueue queue = new ArrayBlockingQueue(100); MaxwellBigQueryProducerWorker producerWorker = new MaxwellBigQueryProducerWorker(context, queue,"myproject", "mydataset", "mytable"); BigQueryCallback callback = new BigQueryCallback(producerWorker, appendContext, cc, - new Position(new BinlogPosition(1, "binlog-1"), 0L), + new Position(new BinlogPosition(1, "binlog-1"), 0L, 0L), new Counter(), new Counter(), new Meter(), new Meter(), context); Throwable t = new Throwable("error"); callback.onFailure(t); diff --git a/src/test/java/com/zendesk/maxwell/producer/InflightMessageListTest.java b/src/test/java/com/zendesk/maxwell/producer/InflightMessageListTest.java index ed6988ea2..0fb5d54d2 100644 --- a/src/test/java/com/zendesk/maxwell/producer/InflightMessageListTest.java +++ b/src/test/java/com/zendesk/maxwell/producer/InflightMessageListTest.java @@ -24,10 +24,10 @@ @RunWith(MockitoJUnitRunner.class) public class InflightMessageListTest { private static int capacity = 3; - private static Position p1 = new Position(BinlogPosition.at(1, "f"), 0L); - private static Position p2 = new Position(BinlogPosition.at(2, "f"), 0L); - private static Position p3 = new Position(BinlogPosition.at(3, "f"), 0L); - private static Position p4 = new Position(BinlogPosition.at(4, "f"), 0L); + private static Position p1 = new Position(BinlogPosition.at(1, "f"), 0L, 0L); + private static Position p2 = new Position(BinlogPosition.at(2, "f"), 0L, 0L); + private static Position p3 = new Position(BinlogPosition.at(3, "f"), 0L, 0L); + private static Position p4 = new Position(BinlogPosition.at(4, "f"), 0L, 0L); private InflightMessageList list; private MaxwellContext context; @Captor diff --git a/src/test/java/com/zendesk/maxwell/producer/MaxwellKinesisProducerTest.java b/src/test/java/com/zendesk/maxwell/producer/MaxwellKinesisProducerTest.java index 5666ee894..d0cf8a487 100644 --- a/src/test/java/com/zendesk/maxwell/producer/MaxwellKinesisProducerTest.java +++ b/src/test/java/com/zendesk/maxwell/producer/MaxwellKinesisProducerTest.java @@ -18,7 +18,7 @@ public class MaxwellKinesisProducerTest { private static final long TIMESTAMP_MILLISECONDS = 1496712943447L; - private static final Position POSITION = new Position(new BinlogPosition(1L, "binlog-0001"), 0L); + private static final Position POSITION = new Position(new BinlogPosition(1L, "binlog-0001"), 0L, 0L); @Test public void dealsWithTooLargeRecord() throws Exception { diff --git a/src/test/java/com/zendesk/maxwell/producer/NatsProducerTest.java b/src/test/java/com/zendesk/maxwell/producer/NatsProducerTest.java index 9a113014e..5334459ad 100644 --- a/src/test/java/com/zendesk/maxwell/producer/NatsProducerTest.java +++ b/src/test/java/com/zendesk/maxwell/producer/NatsProducerTest.java @@ -202,7 +202,7 @@ public void failToPushRowToSubject() throws Exception { } private RowMap newRowMap() { - return new RowMap("insert", "testDb", "testTable", System.currentTimeMillis(), Collections.emptyList(), new Position(new BinlogPosition(3, "mysql.1"), 0L)); + return new RowMap("insert", "testDb", "testTable", System.currentTimeMillis(), Collections.emptyList(), new Position(new BinlogPosition(3, "mysql.1"), 0L, 0L)); } } diff --git a/src/test/java/com/zendesk/maxwell/util/TaskManagerTest.java b/src/test/java/com/zendesk/maxwell/util/TaskManagerTest.java index ad065f0b4..4292ee0e1 100644 --- a/src/test/java/com/zendesk/maxwell/util/TaskManagerTest.java +++ b/src/test/java/com/zendesk/maxwell/util/TaskManagerTest.java @@ -31,6 +31,11 @@ public void requestStop() { public void awaitStop(Long timeout) throws TimeoutException { log.add(new Event(EventType.AWAIT_STOP, this.name)); } + + @Override + public StopPriority getStopPriority() { + return StopPriority.SUPPORT; + } } enum EventType { REQUEST_STOP, AWAIT_STOP }; @@ -46,6 +51,11 @@ public void requestStop() { public void awaitStop(Long timeout) throws TimeoutException { throw new TimeoutException("can't stop this"); } + + @Override + public StopPriority getStopPriority() { + return StopPriority.BINLOG; + } } class Event extends MutablePair { From 61c152ac53cf94ba1628dea6314bce942ae607bd Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Wed, 5 Jul 2023 09:01:35 +0200 Subject: [PATCH 2/5] remove thread.sleep --- src/main/java/com/zendesk/maxwell/producer/StdoutProducer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/StdoutProducer.java b/src/main/java/com/zendesk/maxwell/producer/StdoutProducer.java index 3a632fddb..f2745d142 100644 --- a/src/main/java/com/zendesk/maxwell/producer/StdoutProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/StdoutProducer.java @@ -23,6 +23,5 @@ public void push(RowMap r) throws Exception { ); this.context.setPosition(r); - Thread.sleep(1); } } From 609aabbd219945add366142f388def2245204408 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Wed, 5 Jul 2023 09:08:40 +0200 Subject: [PATCH 3/5] update comment --- .../maxwell/replication/BinlogConnectorEvent.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorEvent.java b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorEvent.java index 7b495b239..f6464db7d 100644 --- a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorEvent.java +++ b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorEvent.java @@ -172,12 +172,17 @@ public List jsonMaps(Table table, long lastHeartbeatRead, String rowQuer Position position = Position.valueOf(this.position, lastHeartbeatRead, 0L); /* - at creation time we don't know if we're a row event that wants to "commit" -- - ie advance the binlog position pointer. If we're not in a "commit this row" + at the time our RowMaps are created we have no idea where we are + in a transaciton; we may be the last row-event in the transaction + in which case we're going to "commit" -- ie advance maxwell.positions, + or we may be in the middle of a transaction. - is the current position (with a skip-row offset in txOffset). - After we process the commit event we'll make the last row in the event bump the - binlog pointer. + If we're in the middle of a transaction, we want to leave the binlog pointer + pointing at the table-map event that precedes this. The calling function + will additionally add a row-offset to the position of each row. + + All this mechanism allows maxwell to stop and restart in the middle of very large + transactions. */ Position nextPosition = Position.valueOf(lastTableMapPosition, lastHeartbeatRead, 0L); From f8e0b8310fa00b0d0bcf3d0408ac90bc4dc937d0 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Fri, 18 Aug 2023 01:45:56 -0700 Subject: [PATCH 4/5] further on sub-tx positioning --- .../BinlogConnectorReplicator.java | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java index 676e568ed..ea3e1582b 100644 --- a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java +++ b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java @@ -529,14 +529,6 @@ private void tryReconnect() throws TimeoutException { throw new TimeoutException("Maximum reconnection attempts reached."); } - private long getTXOffset(BinlogConnectorEvent event) { - if ( event.getPosition() == startPosition.getBinlogPosition() ) { - return startPosition.getTXOffset(); - } else { - return 0; - } - } - /** * Get a batch of rows for the current transaction. * @@ -687,6 +679,7 @@ private void logColumnDefCastException(Table table, ColumnDefCastException e) { * @return either a RowMap or null */ public RowMap getRow() throws Exception { + long txOffset; BinlogConnectorEvent event; if ( stopOnEOF && hitEOF ) @@ -734,26 +727,35 @@ public RowMap getRow() throws Exception { LOGGER.warn("Started replication stream inside a transaction, probably recovering from stopping inside transaction."); queue.offerFirst(event); - rowBuffer = getTransactionRows(event, getTXOffset(event)); + rowBuffer = getTransactionRows(event, 0); break; case TABLE_MAP: TableMapEventData data = event.tableMapData(); tableCache.processEvent(getSchema(), this.filter,this.ignoreMissingSchema, data.getTableId(), data.getDatabase(), data.getTable()); - /* starting inside a transaction, should be pointing at a table map event. */ if ( startPosition != null && startPosition.getTXOffset() > 0 ) { - queue.offerFirst(event); + LOGGER.info("Restarting maxwell inside transaction: {}", startPosition); + queue.offerFirst(event); // put table map back on top of queue rowBuffer = getTransactionRows(event, startPosition.getTXOffset()); } startPosition = null; break; case QUERY: + txOffset = 0; QueryEventData qe = event.queryData(); String sql = qe.getSql(); + + if ( startPosition != null && startPosition.getTXOffset() > 0 ) { + LOGGER.info("Restarting maxwell inside transaction: {}", startPosition); + txOffset = startPosition.getTXOffset();; + } + + startPosition = null; + if (BinlogConnectorEvent.BEGIN.equals(sql)) { try { - rowBuffer = getTransactionRows(event, getTXOffset(event)); + rowBuffer = getTransactionRows(event, txOffset); } catch ( ClientReconnectedException e ) { // rowBuffer should already be empty by the time we get to this switch // statement, but we null it for clarity @@ -771,8 +773,17 @@ public RowMap getRow() throws Exception { // in mariaDB the GTID event supplants the normal BEGIN MariadbGtidEventData g = event.mariaGtidData(); if ( (g.getFlags() & MariadbGtidEventData.FL_STANDALONE) == 0 ) { + txOffset = 0; + + if ( startPosition != null && startPosition.getTXOffset() > 0 ) { + LOGGER.info("Restarting maxwell inside transaction: {}", startPosition); + txOffset = startPosition.getTXOffset();; + } + + startPosition = null; + try { - rowBuffer = getTransactionRows(event, getTXOffset(event)); + rowBuffer = getTransactionRows(event, txOffset); } catch ( ClientReconnectedException e ) { // rowBuffer should already be empty by the time we get to this switch // statement, but we null it for clarity From 75b47a0884c53e95474ef64203bb94966ffac288 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Fri, 18 Aug 2023 01:52:20 -0700 Subject: [PATCH 5/5] update tests --- .../maxwell/schema/MysqlSchemaCompactor.java | 2 +- .../zendesk/maxwell/row/RowMapBufferTest.java | 21 ------------------- 2 files changed, 1 insertion(+), 22 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/schema/MysqlSchemaCompactor.java b/src/main/java/com/zendesk/maxwell/schema/MysqlSchemaCompactor.java index aa205de51..6fc5a6456 100644 --- a/src/main/java/com/zendesk/maxwell/schema/MysqlSchemaCompactor.java +++ b/src/main/java/com/zendesk/maxwell/schema/MysqlSchemaCompactor.java @@ -97,7 +97,7 @@ private Long chooseCompactedSchemaBase(Connection cx) throws SQLException { return null; } - String schemaSql = "select id, binlog_file, binlog_position, gtid_set, 0 as last_heartbeat_read " + String schemaSql = "select id, binlog_file, binlog_position, gtid_set, 0 as last_heartbeat_read, tx_offset " + " from `schemas` where `server_id` = " + this.serverID + " order by id desc limit 1"; final Long schemaID; diff --git a/src/test/java/com/zendesk/maxwell/row/RowMapBufferTest.java b/src/test/java/com/zendesk/maxwell/row/RowMapBufferTest.java index c2b8b2692..bf06c8c59 100644 --- a/src/test/java/com/zendesk/maxwell/row/RowMapBufferTest.java +++ b/src/test/java/com/zendesk/maxwell/row/RowMapBufferTest.java @@ -29,27 +29,6 @@ public void TestOverflowToDisk() throws Exception { assertThat(buffer.removeFirst().getTimestamp(), is(3L)); } - @Test - public void TestXOffsetIncrement() throws IOException, ClassNotFoundException { - RowMapBuffer buffer = new RowMapBuffer(100); - - buffer.add(new RowMap("insert", "foo", "bar", 1000L, new ArrayList(), new Position(new BinlogPosition(3, "mysql.1"), 0L))); - buffer.add(new RowMap("insert", "foo", "bar", 2000L, new ArrayList(), new Position(new BinlogPosition(3, "mysql.1"), 0L))); - buffer.add(new RowMap("insert", "foo", "bar", 3000L, new ArrayList(), new Position(new BinlogPosition(3, "mysql.1"), 0L))); - - assert buffer.removeFirst().getXoffset() == 0; - assert buffer.removeFirst().getXoffset() == 1; - assert buffer.removeFirst().getXoffset() == 2; - assert buffer.isEmpty(); - - buffer.add(new RowMap("insert", "foo", "bar", 3000L, new ArrayList(), new Position(new BinlogPosition(3, "mysql.1"), 0L))); - assert buffer.removeFirst().getXoffset() == 3; - - buffer = new RowMapBuffer(100); - buffer.add(new RowMap("insert", "foo", "bar", 1000L, new ArrayList(), new Position(new BinlogPosition(3, "mysql.1"), 0L))); - assert buffer.removeFirst().getXoffset() == 0; - } - // https://github.com/zendesk/maxwell/issues/996 @Test public void TestOverflowToDiskWithJson() throws Exception {