-
Notifications
You must be signed in to change notification settings - Fork 346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix transaction coordinator client cann't reconnect to the broker #1237
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR might be over complicated.
IIUC, the root cause is that when RequestOnCnx
returns an error that indicates the connection is closed, it just propagate to the caller side. Here is a simple fix (without any backoff mechanism and timeout):
func (tc *transactionCoordinatorClient) sendRequest(cnxIndex uint64, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) error {
for {
res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[cnxIndex], requestID, cmdType, message)
if err != nil {
if err.Error() != "connection closed" {
return err
}
if err = tc.grabConn(cnxIndex); err != nil {
return err
}
continue
}
if res.Response.EndTxnResponse.Error != nil {
return getErrorFromServerError(res.Response.EndTxnResponse.Error)
}
return nil
}
}
Then replace the RequestOnCnx
call like:
err := tc.sendRequest(id.MostSigBits, requestID, pb.BaseCommand_END_TXN, cmdEndTxn)
tc.semaphore.Release()
if err != nil {
return err
}
return nil
The fix above will bring much less code changes and the code will look more simple.
This implementation misses some key details. For example, it lacks a backoff mechanism and connection timeout logic as you said. Also, what happens if many goroutines call IIUC, the main simplification in this implementation involves transfering the XXOp and the event loop logic, right? The key difference appears to be the use of the event loop model. However, reducing this complexity doesn't seem very significant to me. Both producers and consumers already use the event loop for handling requests. pulsar-client-go/pulsar/consumer_partition.go Lines 1578 to 1582 in b0111a2
The Java implementation of the TransactionMetaStoreHandler also uses something like event loop, similar to Go: Moreover, without the event loop, it seems challenging to implement asynchronous methods like This PR mainly adopts the eventloop patterns of the the existing producer and consumer. Maybe it's better to start a separate discussion for this. |
I see, currently the |
Fixes #1227
Motivation
There are some issues with the
transactionCoordinatorClient
in the go client. When using the transaction, if there are any reconnection happens during the transaction operation. The connection to the transaction coordinator won't be reconnected. This causes all following operations to fail with theconnection closed
error.Modifications
transactionHandler
to manage reconnections and handle requests.Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation