Skip to content

Commit

Permalink
Handle the case where the broker has not enabled transactions.
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie committed Jul 8, 2024
1 parent dbfc38b commit f4b5bee
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
9 changes: 9 additions & 0 deletions pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ const (
// fenced. Applications are now supposed to close it and create a
// new producer
ProducerFenced
// MaxConcurrentOperationsReached indicates that the maximum number of concurrent operations
// has been reached. This means that no additional operations can be started until some
// of the current operations complete.
MaxConcurrentOperationsReached
// TransactionCoordinatorNotEnabled indicates that the transaction coordinator is not enabled.
// This error is returned when an operation that requires the transaction coordinator is attempted
// but the transaction coordinator feature is not enabled in the system or the transaction coordinator
// has not initialized
TransactionCoordinatorNotEnabled
)

// Error implement error interface, composed of two parts: msg and result.
Expand Down
12 changes: 9 additions & 3 deletions pulsar/transaction_coordinator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ const (
txnHandlerClosed
)

var ErrMaxConcurrentOpsReached = newError(MaxConcurrentOperationsReached, "Max concurrent operations reached")
var ErrTransactionCoordinatorNotEnabled = newError(TransactionCoordinatorNotEnabled, "The broker doesn't enable "+
"the transaction coordinator, or the transaction coordinator has not initialized")

func (t *transactionHandler) getState() txnHandlerState {
return txnHandlerState(t.state.Load())
}
Expand Down Expand Up @@ -357,6 +361,9 @@ func (tc *transactionCoordinatorClient) start() error {
}
tc.handlers = make([]*transactionHandler, r.Partitions)
//Get connections with all transaction_impl coordinators which is synchronized
if r.Partitions <= 0 {
return ErrTransactionCoordinatorNotEnabled
}
for i := 0; i < r.Partitions; i++ {
handler, err := tc.newTransactionHandler(uint64(i))
if err != nil {
Expand Down Expand Up @@ -458,12 +465,11 @@ func getTCAssignTopicName(partition uint64) string {

func (tc *transactionCoordinatorClient) canSendRequest() error {
if !tc.semaphore.Acquire(context.Background()) {
// What? This is not an Unknown Error. We need to fix that!
return newError(UnknownError, "Failed to acquire semaphore")
return ErrMaxConcurrentOpsReached
}
return nil
}

func (tc *transactionCoordinatorClient) nextTCNumber() uint64 {
return atomic.AddUint64(&tc.epoch, 1) % uint64(len(tc.handlers)) // TODO: The tc.cons may be empty
return atomic.AddUint64(&tc.epoch, 1) % uint64(len(tc.handlers))
}

0 comments on commit f4b5bee

Please sign in to comment.