diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dd9cbd..18ccab9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +Release v1.2.13 (2023-07-13) +=== +* On errors close the single connection instead of destroying the whole pool +* Increase number of retries to 3 for health checks +* Fix issue where we end up sending error to a closed channel + Release v1.2.12 (2023-01-10) === * Add support for healthcheck infrastructure diff --git a/dax/internal/client/cluster.go b/dax/internal/client/cluster.go index 50d6787..d62fa8a 100644 --- a/dax/internal/client/cluster.go +++ b/dax/internal/client/cluster.go @@ -642,7 +642,7 @@ func (c *cluster) update(config []serviceEndpoint) error { for ep, clicfg := range oldActive { _, isPartOfUpdatedEndpointsConfig := newEndpoints[ep] if !isPartOfUpdatedEndpointsConfig { - c.debugLog(fmt.Sprintf("Found updated endpoing configs, will close inactive endpoint client : %s", ep.host)) + c.debugLog(fmt.Sprintf("Found updated endpoint configs, will close inactive endpoint client : %s", ep.host)) toClose = append(toClose, clicfg) } } diff --git a/dax/internal/client/single.go b/dax/internal/client/single.go index a08dfa6..8ddeb26 100644 --- a/dax/internal/client/single.go +++ b/dax/internal/client/single.go @@ -169,7 +169,7 @@ func (client *SingleDaxClient) startHealthChecks(cc *cluster, host hostPort) { ctx, cfn := context.WithTimeout(aws.BackgroundContext(), 1*time.Second) defer cfn() var err error - _, err = client.endpoints(RequestOptions{MaxRetries: 2, Context: ctx}) + _, err = client.endpoints(RequestOptions{MaxRetries: 3, Context: ctx}) if err != nil { cc.debugLog(fmt.Sprintf("Health checks failed with error " + err.Error() + " for host :: " + host.host)) cc.onHealthCheckFailed(host) @@ -740,31 +740,41 @@ func (client *SingleDaxClient) executeWithContext(ctx aws.Context, op string, en return err } if err = client.pool.setDeadline(ctx, t); err != nil { - client.pool.discard(t) + // If the error is just due to context cancelled or timeout + // then the tube is still usable because we have not written anything to tube + if err == ctx.Err() { + client.pool.put(t) + return err + } + // If we get error while setting deadline of tube + // probably something is wrong with the tube + client.pool.closeTube(t) return err } if err = client.auth(t); err != nil { - client.pool.discard(t) + // Auth method writes in the tube and + // it is not guaranteed that it will be drained completely on error + client.pool.closeTube(t) return err } writer := t.CborWriter() if err = encoder(writer); err != nil { - // Validation errors will cause pool to be discarded as there is no guarantee + // Validation errors will cause connection to be closed as there is no guarantee // that the validation was performed before any data was written into tube - client.pool.discard(t) + client.pool.closeTube(t) return err } if err := writer.Flush(); err != nil { - client.pool.discard(t) + client.pool.closeTube(t) return err } reader := t.CborReader() ex, err := decodeError(reader) - if err != nil { // decode or network error - client.pool.discard(t) + if err != nil { // decode or network error - doesn't guarantee completely drained tube + client.pool.closeTube(t) return err } if ex != nil { // user or server error @@ -774,7 +784,8 @@ func (client *SingleDaxClient) executeWithContext(ctx aws.Context, op string, en err = decoder(reader) if err != nil { - client.pool.discard(t) + // we are not able to completely drain tube + client.pool.closeTube(t) } else { client.pool.put(t) } @@ -809,7 +820,7 @@ func (client *SingleDaxClient) recycleTube(t tube, err error) { if recycle { client.pool.put(t) } else { - client.pool.discard(t) + client.pool.closeTube(t) } } func (client *SingleDaxClient) auth(t tube) error { diff --git a/dax/internal/client/tube.go b/dax/internal/client/tube.go index 2b20e0d..064f0c8 100644 --- a/dax/internal/client/tube.go +++ b/dax/internal/client/tube.go @@ -25,7 +25,7 @@ import ( ) const magic = "J7yne5G" -const agent = "DaxGoClient-1.2.12" +const agent = "DaxGoClient-1.2.13" var optional = map[string]string{"UserAgent": agent} diff --git a/dax/internal/client/tubepool.go b/dax/internal/client/tubepool.go index db13cdb..d0c0aee 100644 --- a/dax/internal/client/tubepool.go +++ b/dax/internal/client/tubepool.go @@ -190,14 +190,13 @@ func (p *tubePool) allocAndReleaseGate(session int64, done chan tube, releaseGat } } else { p.mutex.Lock() - cls := p.closed - p.mutex.Unlock() - if !cls { + if !p.closed { select { case p.errCh <- err: default: } } + p.mutex.Unlock() } if done != nil { close(done) @@ -236,9 +235,10 @@ func (p *tubePool) put(t tube) { p.top = t } -// Closes the specified tube, and if the tube is using the same version as the current session, -// then also closes all other idle tubes and performs a version bump. -func (p *tubePool) discard(t tube) { +// Make sure to closeTube the tube if you are not sure that the tube is clean +// Clean tube means nothing is written inside the tube or +// the things written inside tube is drained completely +func (p *tubePool) closeTube(t tube) { if t == nil { return } @@ -249,30 +249,6 @@ func (p *tubePool) discard(t tube) { t.Close() }() } - - p.mutex.Lock() - - var head tube - if t.Session() == p.session { - p.sessionBump() - head = p.clearIdleConnections() - } - - // Waiters enter the waiting queue when there's no existing tube - // or when they failed to acquire a permit to create a new tube. - // There's also a chance the newly created tube was stolen and - // the thief must return it back into the pool or discard it. - if p.waiters != nil { - select { - case p.waiters <- nil: // wake up a single waiter, if any - break - default: - close(p.waiters) // or unblock all future waiters who are yet to enter the waiters queue - p.waiters = nil - } - } - p.mutex.Unlock() - p.closeAll(head) } // Sets the deadline on the underlying net.Conn object @@ -303,7 +279,7 @@ func (p *tubePool) Close() error { p.waiters = nil } close(p.errCh) - // cannot close(p.gate) as send on closed channel will panic. new connections will be closed immediately. + // cannot closeTube(p.gate) as send on closed channel will panic. new connections will be closed immediately. } p.mutex.Unlock() p.closeAll(head) diff --git a/dax/internal/client/tubepool_test.go b/dax/internal/client/tubepool_test.go index 1c3f11b..bec69cb 100644 --- a/dax/internal/client/tubepool_test.go +++ b/dax/internal/client/tubepool_test.go @@ -509,58 +509,16 @@ func countTubes(pool *tubePool) int { return count } -func TestTubePool_DiscardBumpsSession(t *testing.T) { +func TestTubePool_close(t *testing.T) { p := newTubePoolWithOptions(":1234", tubePoolOptions{1, 5 * time.Second, defaultDialer.DialContext}, connConfigData) origSession := p.session + p.closeTubeImmediately = true tt := &mockTube{} - tt.On("Session").Return(p.session).Once() tt.On("Close").Return(nil).Once() - p.discard(tt) - - require.NotEqual(t, origSession, p.session) -} - -func TestTubePool_DiscardWakesUpWaiters(t *testing.T) { - - p := newTubePoolWithOptions(":1234", tubePoolOptions{1, 5 * time.Second, defaultDialer.DialContext}, connConfigData) - p.dialContext = func(ctx context.Context, a, n string) (net.Conn, error) { - return &mockConn{}, nil - } - // artificially enter the gate to prevent new connections - entered := p.gate.tryEnter() - require.True(t, entered) - - var startedWg sync.WaitGroup - startedWg.Add(1) - - ch := make(chan struct { - tube - error - }) - go func() { - startedWg.Done() - t, err := p.get() - ch <- struct { - tube - error - }{t, err} - }() - startedWg.Wait() - // wait some extra time to make sure the caller has entered waiters queue - time.Sleep(2 * time.Second) - - // release the gate to allow woken waiters to establish a new connection - p.gate.exit() - tt := &mockTube{} - tt.On("Session").Return(p.session).Once() - tt.On("Close").Return(nil).Once() - - p.discard(tt) - - result := <-ch - require.NoError(t, result.error) - require.NotNil(t, result.tube) + p.closeTube(tt) + require.Equal(t, origSession, p.session) + tt.AssertCalled(t, "Close") } func TestTubePool_PutClosesTubesIfPoolIsClosed(t *testing.T) {