diff --git a/pool/connection.go b/pool/connection.go index 0e5a39f..35f7b1f 100644 --- a/pool/connection.go +++ b/pool/connection.go @@ -154,6 +154,12 @@ func (ch *Connection) Close() (err error) { return nil } +// flush all internal channels +func (ch *Connection) flush() { + flush(ch.errors) + flush(ch.blocking) +} + // Flag flags the connection as broken which must be recovered. // A flagged connection implies a closed connection. // Flagging of a connectioncan only be undone by Recover-ing the connection. @@ -285,7 +291,7 @@ func (ch *Connection) Recover(ctx context.Context) error { return ch.recover(ctx) } -func (ch *Connection) recover(ctx context.Context) error { +func (ch *Connection) recover(ctx context.Context) (err error) { select { case <-ctx.Done(): @@ -301,6 +307,8 @@ func (ch *Connection) recover(ctx context.Context) error { if healthy { return nil } + // flush all channels after recovery + defer ch.flush() var ( timer = time.NewTimer(0) diff --git a/pool/connection_test.go b/pool/connection_test.go index 54f8a6d..2dbf46d 100644 --- a/pool/connection_test.go +++ b/pool/connection_test.go @@ -137,7 +137,7 @@ func TestManyNewConnectionWithDisconnect(t *testing.T) { wait() // wait for connection to work again. - tctx, cancel := context.WithTimeout(ctx, 5*time.Second) + tctx, cancel := context.WithTimeout(ctx, 20*time.Second) defer cancel() assert.NoError(t, c.Recover(tctx)) assert.NoError(t, c.Error())