Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "deadline exceeded" issues discovered in conformance tests #643

Merged
merged 12 commits into from
Dec 7, 2023
37 changes: 32 additions & 5 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
# when editing this list, also update steps and jobs below
go-version: [1.19.x, 1.20.x, 1.21.x]
steps:
- name: Checkout Code
Expand All @@ -25,17 +26,43 @@ jobs:
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
- name: Test
run: make test
- name: Unit Test
run: make shorttest
- name: Lint
# Often, lint & gofmt guidelines depend on the Go version. To prevent
# conflicting guidance, run only on the most recent supported version.
# For the same reason, only check generated code on the most recent
# supported version.
if: matrix.go-version == '1.21.x'
run: make checkgenerate && make lint
conformance:
runs-on: ubuntu-latest
strategy:
matrix:
# 1.19 is omitted because conformance test runner requires 1.20+
go-version: [1.20.x, 1.21.x]
steps:
- name: Checkout Code
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Install Go
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
- name: Run Conformance Tests
# A dependency of conformance tests (for testing http/3) requires
# Go 1.20. So we skip this step for Go 1.19.
if: matrix.go-version != '1.19.x'
run: make runconformance
slowtest:
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Install Go
uses: actions/setup-go@v4
with:
# only the latest
go-version: 1.21.x
- name: Run Slow Tests
run: make slowtest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked to Josh live about this, but I'd love for this to be carved up a little differently. Locally, I'd really like make and make test to run these slower smoke tests. In CI, the highly-matrixed tests should run all the tests except the smoke tests. The new job (slowtest) should run the slow tests, but it would be okay with me if it also ran the faster tests.

I don't care that much whether we carve up the tests with an environment variable, testing.Short(), or with build tags.

Copy link
Member Author

@jhump jhump Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make test now runs both short and slow tests. CI "Test" job only runs short tests, and there's a separate "Slow Tests" job for the others. Did this with go test -short and a check in the test (if testing.Short() { t.Skip(...) }).

Also, while factoring out slow tests to a parallel job, I did the same for conformance tests, to shorten the critical path through the CI workflow.

Copy link
Member Author

@jhump jhump Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the main CI job must re-build the code (to instrument with race detector) and also does linting, the new slowtest job is actually faster than that other job (for now at least). So this new job does not increase the total duration. The longest job is still Windows, by a good margin.

20 changes: 18 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,24 @@ clean: ## Delete intermediate build artifacts
git clean -Xdf

.PHONY: test
test: build ## Run unit tests
go test -vet=off -race -cover ./...
test: shorttest slowtest

.PHONY: shorttest
shorttest: build ## Run unit tests
go test -vet=off -race -cover -short ./...

.PHONY: slowtest
# Runs all tests, including known long/slow ones. The
# race detector is not used for a few reasons:
# 1. Race coverage of the short tests should be
# adequate to catch race conditions.
# 2. It slows tests down, which is not good if we
# know these are already slow tests.
# 3. Some of the slow tests can't repro issues and
# find regressions as reliably with the race
# detector enabled.
slowtest: build
go test ./...

.PHONY: runconformance
runconformance: build ## Run conformance test suite
Expand Down
245 changes: 245 additions & 0 deletions client_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,28 @@
package connect_test

import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

connect "connectrpc.com/connect"
"connectrpc.com/connect/internal/assert"
pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1"
"connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect"
"connectrpc.com/connect/internal/memhttp/memhttptest"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
"google.golang.org/protobuf/types/dynamicpb"
Expand Down Expand Up @@ -421,6 +431,229 @@ func TestDynamicClient(t *testing.T) {
})
}

func TestClientDeadlineHandling(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping slow test")
}

// Note that these tests are not able to reproduce issues with the race
// detector enabled. That's partly why the makefile only runs "slow"
// tests with the race detector disabled.

_, handler := pingv1connect.NewPingServiceHandler(pingServer{})
svr := httptest.NewUnstartedServer(http.HandlerFunc(func(respWriter http.ResponseWriter, req *http.Request) {
if req.Context().Err() != nil {
return
}
handler.ServeHTTP(respWriter, req)
}))
svr.EnableHTTP2 = true
svr.StartTLS()
t.Cleanup(svr.Close)

// This case creates a new connection for each RPC to verify that timeouts during dialing
// won't cause issues. This is historically easier to reproduce, so it uses a smaller
// duration, no concurrency, and fewer iterations. This is important because if we used
// a new connection for each RPC in the bigger test scenario below, we'd encounter other
// issues related to overwhelming the loopback interface and exhausting ephemeral ports.
t.Run("dial", func(t *testing.T) {
t.Parallel()
transport, ok := svr.Client().Transport.(*http.Transport)
if !assert.True(t, ok) {
t.FailNow()
}
testClientDeadlineBruteForceLoop(t,
5*time.Second, 5, 1,
func(ctx context.Context) (string, rpcErrors) {
httpClient := &http.Client{
Transport: transport.Clone(),
}
client := pingv1connect.NewPingServiceClient(httpClient, svr.URL)
_, err := client.Ping(ctx, connect.NewRequest(&pingv1.PingRequest{Text: "foo"}))
// Close all connections and make sure to give a little time for the OS to
// release socket resources to prevent resource exhaustion (such as running
// out of ephemeral ports).
httpClient.CloseIdleConnections()
time.Sleep(time.Millisecond / 2)
return pingv1connect.PingServicePingProcedure, rpcErrors{recvErr: err}
},
)
})

// This case creates significantly more load than the above one, but uses a normal
// client so pools and re-uses connections. It also uses all stream types to send
// messages, to make sure that all stream implementations handle deadlines correctly.
// The I/O errors related to deadlines are historically harder to reproduce, so it
// throws a lot more effort into reproducing, particularly a longer duration for
// which it will run. It also uses larger messages (by packing requests with
// unrecognized fields) and compression, to make it more likely to encounter the
// deadline in the middle of read and write operations.
t.Run("read-write", func(t *testing.T) {
t.Parallel()

var extraField []byte
extraField = protowire.AppendTag(extraField, 999, protowire.BytesType)
extraData := make([]byte, 16*1024)
// use good random data so it's not very compressible
if _, err := rand.Read(extraData); err != nil {
t.Fatalf("failed to generate extra payload: %v", err)
return
}
extraField = protowire.AppendBytes(extraField, extraData)

clientConnect := pingv1connect.NewPingServiceClient(svr.Client(), svr.URL, connect.WithSendGzip())
clientGRPC := pingv1connect.NewPingServiceClient(svr.Client(), svr.URL, connect.WithSendGzip(), connect.WithGRPCWeb())
var count atomic.Int32
testClientDeadlineBruteForceLoop(t,
20*time.Second, 200, runtime.GOMAXPROCS(0),
func(ctx context.Context) (string, rpcErrors) {
var procedure string
var errs rpcErrors
rpcNum := count.Add(1)
var client pingv1connect.PingServiceClient
if rpcNum&4 == 0 {
client = clientConnect
} else {
client = clientGRPC
}
switch rpcNum & 3 {
case 0:
procedure = pingv1connect.PingServicePingProcedure
_, errs.recvErr = client.Ping(ctx, connect.NewRequest(addUnrecognizedBytes(&pingv1.PingRequest{Text: "foo"}, extraField)))
case 1:
procedure = pingv1connect.PingServiceSumProcedure
stream := client.Sum(ctx)
for i := 0; i < 3; i++ {
errs.sendErr = stream.Send(addUnrecognizedBytes(&pingv1.SumRequest{Number: 1}, extraField))
if errs.sendErr != nil {
break
}
}
_, errs.recvErr = stream.CloseAndReceive()
case 2:
procedure = pingv1connect.PingServiceCountUpProcedure
var stream *connect.ServerStreamForClient[pingv1.CountUpResponse]
stream, errs.recvErr = client.CountUp(ctx, connect.NewRequest(addUnrecognizedBytes(&pingv1.CountUpRequest{Number: 3}, extraField)))
if errs.recvErr == nil {
for stream.Receive() {
}
errs.recvErr = stream.Err()
errs.closeRecvErr = stream.Close()
}
case 3:
procedure = pingv1connect.PingServiceCumSumProcedure
stream := client.CumSum(ctx)
for i := 0; i < 3; i++ {
errs.sendErr = stream.Send(addUnrecognizedBytes(&pingv1.CumSumRequest{Number: 1}, extraField))
_, errs.recvErr = stream.Receive()
if errs.recvErr != nil {
break
}
}
errs.closeSendErr = stream.CloseRequest()
errs.closeRecvErr = stream.CloseResponse()
}
return procedure, errs
},
)
})
}

func testClientDeadlineBruteForceLoop(
t *testing.T,
duration time.Duration,
iterationsPerDeadline int,
parallelism int,
loopBody func(ctx context.Context) (string, rpcErrors),
) {
t.Helper()
testContext, testCancel := context.WithTimeout(context.Background(), duration)
defer testCancel()
var rpcCount atomic.Int64

var wg sync.WaitGroup
for goroutine := 0; goroutine < parallelism; goroutine++ {
goroutine := goroutine
wg.Add(1)
go func() {
defer wg.Done()
// We try a range of timeouts since the timing issue is sensitive
// to execution environment (e.g. CPU, memory, and network speeds).
// So the lower timeout values may be more likely to trigger an issue
// in faster environments; higher timeouts for slower environments.
const minTimeout = 10 * time.Microsecond
const maxTimeout = 2 * time.Millisecond
for {
for timeout := minTimeout; timeout <= maxTimeout; timeout += 10 * time.Microsecond {
for i := 0; i < iterationsPerDeadline; i++ {
if testContext.Err() != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
// We are intentionally not inheriting from testContext, which signals when the
// test loop should stop and return but need not influence the RPC deadline.
proc, errs := loopBody(ctx) //nolint:contextcheck
rpcCount.Add(1)
cancel()
type errCase struct {
err error
name string
allowEOF bool
}
errCases := []errCase{
{
err: errs.sendErr,
name: "send error",
allowEOF: true,
},
{
err: errs.recvErr,
name: "receive error",
},
{
err: errs.closeSendErr,
name: "close-send error",
},
{
err: errs.closeRecvErr,
name: "close-receive error",
},
}
for _, errCase := range errCases {
err := errCase.err
if err == nil {
// operation completed before timeout, try again
continue
}
if errCase.allowEOF && errors.Is(err, io.EOF) {
continue
}

if !assert.Equal(t, connect.CodeOf(err), connect.CodeDeadlineExceeded) {
var buf bytes.Buffer
_, _ = fmt.Fprintf(&buf, "actual %v from %s: %v\n%#v", errCase.name, proc, err, err)
for {
err = errors.Unwrap(err)
if err == nil {
break
}
_, _ = fmt.Fprintf(&buf, "\n caused by: %#v", err)
}
t.Log(buf.String())
testCancel()
}
}
}
}
t.Logf("goroutine %d: repeating duration loop", goroutine)
}
}()
}
wg.Wait()
t.Logf("Issued %d RPCs.", rpcCount.Load())
}

type notModifiedPingServer struct {
pingv1connect.UnimplementedPingServiceHandler

Expand Down Expand Up @@ -515,3 +748,15 @@ func (a *assertSchemaInterceptor) WrapStreamingHandler(next connect.StreamingHan
return next(ctx, conn)
}
}

type rpcErrors struct {
sendErr error
recvErr error
closeSendErr error
closeRecvErr error
}

func addUnrecognizedBytes[M proto.Message](msg M, data []byte) M {
msg.ProtoReflect().SetUnknown(data)
return msg
}
10 changes: 9 additions & 1 deletion compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (c *compressionPool) Decompress(dst *bytes.Buffer, src *bytes.Buffer, readM
bytesRead, err := dst.ReadFrom(reader)
if err != nil {
_ = c.putDecompressor(decompressor)
err = wrapIfContextError(err)
if connectErr, ok := asError(err); ok {
return connectErr
}
return errorf(CodeInvalidArgument, "decompress: %w", err)
}
if readMaxBytes > 0 && bytesRead > readMaxBytes {
Expand All @@ -111,8 +115,12 @@ func (c *compressionPool) Compress(dst *bytes.Buffer, src *bytes.Buffer) *Error
if err != nil {
return errorf(CodeUnknown, "get compressor: %w", err)
}
if _, err := io.Copy(compressor, src); err != nil {
if _, err := src.WriteTo(compressor); err != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just for consistency/symmetry with the decompress code above, which directly calls dst.ReadFrom instead of using io.Copy.

_ = c.putCompressor(compressor)
err = wrapIfContextError(err)
if connectErr, ok := asError(err); ok {
return connectErr
}
return errorf(CodeInternal, "compress: %w", err)
}
if err := c.putCompressor(compressor); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func receiveUnaryResponse[T any](conn StreamingClientConn, initializer maybeInit
if err := conn.Receive(&msg2); err == nil {
return nil, NewError(CodeUnknown, errors.New("unary stream has multiple messages"))
} else if err != nil && !errors.Is(err, io.EOF) {
return nil, NewError(CodeUnknown, err)
return nil, err
}
return &Response[T]{
Msg: &msg,
Expand Down
Loading