Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix transaction coordinator client cann't reconnect to the broker #1237

Merged
merged 13 commits into from
Jul 11, 2024
5 changes: 5 additions & 0 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Connection interface {
ID() string
GetMaxMessageSize() int32
Close()
WaitForClose() <-chan interface{}
IsProxied() bool
}

Expand Down Expand Up @@ -995,6 +996,10 @@ func (c *connection) CheckIdle(maxIdleTime time.Duration) bool {
return time.Since(c.lastActive) > maxIdleTime
}

func (c *connection) WaitForClose() <-chan interface{} {
return c.closeCh
}

// Close closes the connection by
// closing underlying socket connection and closeCh.
// This also triggers callbacks to the ConnectionClosed listeners.
Expand Down
13 changes: 13 additions & 0 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type ConnectionPool interface {
// GetConnection get a connection from ConnectionPool.
GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)

// GetConnections get all connections in the pool.
GetConnections() map[string]Connection

// Close all the connections in the pool
Close()
}
Expand Down Expand Up @@ -124,6 +127,16 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
return conn, err
}

func (p *connectionPool) GetConnections() map[string]Connection {
p.Lock()
conns := make(map[string]Connection)
for k, c := range p.connections {
conns[k] = c
}
p.Unlock()
return conns
}

func (p *connectionPool) Close() {
p.Lock()
close(p.closeCh)
Expand Down
Loading
Loading