Skip to content

Commit

Permalink
flush connection channels after recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
jxsl13 committed Mar 15, 2024
1 parent 1613877 commit 8927bc5
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
10 changes: 9 additions & 1 deletion pool/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ func (ch *Connection) Close() (err error) {
return nil
}

// flush all internal channels
func (ch *Connection) flush() {
flush(ch.errors)
flush(ch.blocking)
}

// Flag flags the connection as broken which must be recovered.
// A flagged connection implies a closed connection.
// Flagging of a connectioncan only be undone by Recover-ing the connection.
Expand Down Expand Up @@ -285,7 +291,7 @@ func (ch *Connection) Recover(ctx context.Context) error {
return ch.recover(ctx)
}

func (ch *Connection) recover(ctx context.Context) error {
func (ch *Connection) recover(ctx context.Context) (err error) {

select {
case <-ctx.Done():
Expand All @@ -301,6 +307,8 @@ func (ch *Connection) recover(ctx context.Context) error {
if healthy {
return nil
}
// flush all channels after recovery
defer ch.flush()

var (
timer = time.NewTimer(0)
Expand Down
2 changes: 1 addition & 1 deletion pool/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestManyNewConnectionWithDisconnect(t *testing.T) {

wait() // wait for connection to work again.

tctx, cancel := context.WithTimeout(ctx, 5*time.Second)
tctx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
assert.NoError(t, c.Recover(tctx))
assert.NoError(t, c.Error())
Expand Down

0 comments on commit 8927bc5

Please sign in to comment.