Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8194 Access gtidEnabled always in a thread safe manner and add more logging #6

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,14 @@ public void setGtidSet(String gtidStr) {
if ( gtidStr == null )
return;

this.gtidEnabled = true;
synchronized (gtidSetAccessLock) {
logger.info("Enabling GTID");
this.gtidEnabled = true;

if (this.binlogFilename == null) {
this.binlogFilename = "";
}
if (this.binlogFilename == null) {
this.binlogFilename = "";
}

synchronized (gtidSetAccessLock) {
if ( !gtidStr.equals("") ) {
if ( MariadbGtidSet.isMariaGtidSet(gtidStr) ) {
this.gtidSet = new MariadbGtidSet(gtidStr);
Expand Down Expand Up @@ -569,6 +570,7 @@ public void setMariaDbSlaveCapability(int mariaDbSlaveCapability) {
* @throws IllegalStateException if binary log client is already connected
*/
public void connect() throws IOException, IllegalStateException {
logger.fine("Trying to connect to " + hostname + ":" + port);
if (!connectLock.tryLock()) {
throw new IllegalStateException("BinaryLogClient is already connected");
}
Expand Down Expand Up @@ -797,11 +799,13 @@ private void requestBinaryLogStreamMysql(long serverId) throws IOException {
Command dumpBinaryLogCommand;
synchronized (gtidSetAccessLock) {
if (this.gtidEnabled) {
logger.info("Requesting streaming from position filename: " + binlogFilename + ", position: " + binlogPosition + ", GTID set: " + gtidSet);
dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,
useBinlogFilenamePositionInGtidMode ? binlogFilename : "",
useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,
gtidSet);
} else {
logger.info("Requesting streaming from position filename: " + binlogFilename + ", position: " + binlogPosition);
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition);
}
}
Expand All @@ -819,7 +823,7 @@ protected void requestBinaryLogStreamMaria(long serverId) throws IOException {

synchronized (gtidSetAccessLock) {
if (this.gtidEnabled) {
logger.info(gtidSet.toString());
logger.info("Requesting streaming from GTID set: " + gtidSet);
channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'"));
checkError(channel.read());
channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0"));
Expand All @@ -828,6 +832,7 @@ protected void requestBinaryLogStreamMaria(long serverId) throws IOException {
checkError(channel.read());
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, "", 0L, isUseSendAnnotateRowsEvent());
} else {
logger.info("Requesting streaming from position filename: " + binlogFilename + ", position: " + binlogPosition);
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition, isUseSendAnnotateRowsEvent());
}
}
Expand Down Expand Up @@ -1000,10 +1005,10 @@ private String fetchGtidPurged() throws IOException {
}

protected void setupGtidSet() throws IOException{
if (!this.gtidEnabled)
return;

synchronized (gtidSetAccessLock) {
if (!this.gtidEnabled)
return;

if ( this.databaseVersion.isMariaDb() ) {
if ( gtidSet == null ) {
gtidSet = new MariadbGtidSet("");
Expand Down Expand Up @@ -1209,6 +1214,7 @@ protected void commitGtid(String sql) {
private void commitGtid() {
if (gtid != null) {
synchronized (gtidSetAccessLock) {
logger.finest("Adding GTID " + gtid);
gtidSet.addGtid(gtid);
}
}
Expand Down Expand Up @@ -1320,6 +1326,7 @@ public void unregisterLifecycleListener(LifecycleListener eventListener) {
* As the result following {@link #connect()} resumes client from where it left off.
*/
public void disconnect() throws IOException {
logger.fine("Disconnecting from " + hostname + ":" + port);
terminateKeepAliveThread();
terminateConnect();
}
Expand Down
Loading