From e24489cd937a0e593372f3f9c3aa702d4559c6c8 Mon Sep 17 00:00:00 2001 From: Vojtech Juranek Date: Mon, 14 Oct 2024 13:06:14 +0200 Subject: [PATCH 1/2] DBZ-8194 Access gtidEnabled always in a thread safe manner --- .../shyiko/mysql/binlog/BinaryLogClient.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 24e8bc3a..10e43e1d 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -347,13 +347,13 @@ public void setGtidSet(String gtidStr) { if ( gtidStr == null ) return; - this.gtidEnabled = true; + synchronized (gtidSetAccessLock) { + this.gtidEnabled = true; - if (this.binlogFilename == null) { - this.binlogFilename = ""; - } + if (this.binlogFilename == null) { + this.binlogFilename = ""; + } - synchronized (gtidSetAccessLock) { if ( !gtidStr.equals("") ) { if ( MariadbGtidSet.isMariaGtidSet(gtidStr) ) { this.gtidSet = new MariadbGtidSet(gtidStr); @@ -1000,10 +1000,10 @@ private String fetchGtidPurged() throws IOException { } protected void setupGtidSet() throws IOException{ - if (!this.gtidEnabled) - return; - synchronized (gtidSetAccessLock) { + if (!this.gtidEnabled) + return; + if ( this.databaseVersion.isMariaDb() ) { if ( gtidSet == null ) { gtidSet = new MariadbGtidSet(""); From b6bd7e6c0e6953e2b9488b718becc84e5427db8c Mon Sep 17 00:00:00 2001 From: Vojtech Juranek Date: Mon, 14 Oct 2024 14:54:17 +0200 Subject: [PATCH 2/2] DBZ-8194 Add more GTID logging --- .../com/github/shyiko/mysql/binlog/BinaryLogClient.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 10e43e1d..49a9fe15 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -348,6 +348,7 @@ public void setGtidSet(String gtidStr) { return; synchronized (gtidSetAccessLock) { + logger.info("Enabling GTID"); this.gtidEnabled = true; if (this.binlogFilename == null) { @@ -569,6 +570,7 @@ public void setMariaDbSlaveCapability(int mariaDbSlaveCapability) { * @throws IllegalStateException if binary log client is already connected */ public void connect() throws IOException, IllegalStateException { + logger.fine("Trying to connect to " + hostname + ":" + port); if (!connectLock.tryLock()) { throw new IllegalStateException("BinaryLogClient is already connected"); } @@ -797,11 +799,13 @@ private void requestBinaryLogStreamMysql(long serverId) throws IOException { Command dumpBinaryLogCommand; synchronized (gtidSetAccessLock) { if (this.gtidEnabled) { + logger.info("Requesting streaming from position filename: " + binlogFilename + ", position: " + binlogPosition + ", GTID set: " + gtidSet); dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId, useBinlogFilenamePositionInGtidMode ? binlogFilename : "", useBinlogFilenamePositionInGtidMode ? binlogPosition : 4, gtidSet); } else { + logger.info("Requesting streaming from position filename: " + binlogFilename + ", position: " + binlogPosition); dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition); } } @@ -819,7 +823,7 @@ protected void requestBinaryLogStreamMaria(long serverId) throws IOException { synchronized (gtidSetAccessLock) { if (this.gtidEnabled) { - logger.info(gtidSet.toString()); + logger.info("Requesting streaming from GTID set: " + gtidSet); channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'")); checkError(channel.read()); channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0")); @@ -828,6 +832,7 @@ protected void requestBinaryLogStreamMaria(long serverId) throws IOException { checkError(channel.read()); dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, "", 0L, isUseSendAnnotateRowsEvent()); } else { + logger.info("Requesting streaming from position filename: " + binlogFilename + ", position: " + binlogPosition); dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition, isUseSendAnnotateRowsEvent()); } } @@ -1209,6 +1214,7 @@ protected void commitGtid(String sql) { private void commitGtid() { if (gtid != null) { synchronized (gtidSetAccessLock) { + logger.finest("Adding GTID " + gtid); gtidSet.addGtid(gtid); } } @@ -1320,6 +1326,7 @@ public void unregisterLifecycleListener(LifecycleListener eventListener) { * As the result following {@link #connect()} resumes client from where it left off. */ public void disconnect() throws IOException { + logger.fine("Disconnecting from " + hostname + ":" + port); terminateKeepAliveThread(); terminateConnect(); }