From 14b0d8e156c9fbea0dfd74330672d9004e4d4832 Mon Sep 17 00:00:00 2001 From: PapaCharlie Date: Fri, 27 Sep 2024 14:58:12 -0700 Subject: [PATCH] Include response chunking info in nonce If a given delta response contains too many resources, the server will break it up into multiple responses. However, this means the client does not know whether it received all the resources for its subscription. This is especially relevant for wildcard subscriptions, for which the client does not know the resources ahead of time and therefore cannot wait for them explicitly. By returning additional metadata in the nonce (there is no field for this in the delta discovery response, though I'm hoping that will change https://github.com/cncf/xds/issues/99), the client can know if the server chunked the response, and react accordingly. --- Makefile | 20 ++++++----- ads/ads.go | 23 ++++++++++++ ads/ads_example_test.go | 27 +++++++++++++++ internal/server/handlers.go | 4 +-- internal/server/handlers_delta.go | 8 +++-- internal/server/handlers_delta_test.go | 27 ++++++++++----- internal/utils/utils.go | 48 ++++++++++++++++---------- internal/utils/utils_test.go | 20 +++++++++-- server.go | 4 +-- 9 files changed, 138 insertions(+), 43 deletions(-) create mode 100644 ads/ads_example_test.go diff --git a/Makefile b/Makefile index 1d283a0..d663468 100644 --- a/Makefile +++ b/Makefile @@ -28,19 +28,23 @@ fmt: $(GOBIN)/goimports # Can be used to change the number of tests run, defaults to 1 to prevent caching TESTCOUNT = 1 -# Can be used to change the verobosity of tests: make test TESTVERBOSE=-v -TESTVERBOSE = -# Can be used to generate coverage reports for a specific package -COVERPKG = $(PACKAGE) +# Can be used to add flags to the go test invocation: make test TESTFLAGS=-v +TESTFLAGS = # Can be used to change which package gets tested, defaults to all packages. TESTPKG = ./... -test: $(COVERAGE) -$(COVERAGE): - @mkdir -p $(@D) - go test -race -coverprofile=$(COVERAGE) -coverpkg=$(COVERPKG)/... -count=$(TESTCOUNT) $(TESTVERBOSE) $(TESTPKG) +test: + go test -race -count=$(TESTCOUNT) $(TESTFLAGS) $(TESTPKG) + +# Can be used to generate coverage reports for a specific package +COVERPKG = $(PACKAGE) + +.PHONY: $(COVERAGE) coverage: $(COVERAGE) +$(COVERAGE): + @mkdir -p $(@D) + $(MAKE) test TESTFLAGS='-coverprofile=$(COVERAGE) -coverpkg=$(COVERPKG)/...' go tool cover -html=$(COVERAGE) profile_cache: diff --git a/ads/ads.go b/ads/ads.go index 97a9511..c7f3cfc 100644 --- a/ads/ads.go +++ b/ads/ads.go @@ -5,6 +5,8 @@ protocol (ADS), such as convenient type aliases, constants and core definitions. package ads import ( + "encoding/binary" + "encoding/hex" "log/slog" "sync" "time" @@ -255,3 +257,24 @@ func LookupStreamTypeByRPCMethod(rpcMethod string) (StreamType, bool) { return UnknownStreamType, false } } + +// ParseRemainingChunksFromNonce checks whether the Diderot server implementation chunked the delta +// responses because not all resources could fit in the same response without going over the default +// max gRPC message size of 4MB. A nonce from Diderot always starts with the 64-bit nanosecond +// timestamp of when the response was generated on the server. Then the number of remaining chunks as +// a 32-bit integer. The sequence of integers is binary encoded with [binary.BigEndian] then hex +// encoded. If the given nonce does not match the expected format, this function simply returns 0, as +// it means the nonce was not created by Diderot server implementation, and therefore does not +// contain the expected information. +func ParseRemainingChunksFromNonce(nonce string) (remainingChunks int) { + decoded, err := hex.DecodeString(nonce) + if err != nil { + return 0 + } + + if len(decoded) != 12 { + return 0 + } + + return int(binary.BigEndian.Uint32(decoded[8:12])) +} diff --git a/ads/ads_example_test.go b/ads/ads_example_test.go new file mode 100644 index 0000000..65db167 --- /dev/null +++ b/ads/ads_example_test.go @@ -0,0 +1,27 @@ +package ads_test + +import ( + "log" + + "github.com/linkedin/diderot/ads" +) + +func ExampleParseRemainingChunksFromNonce() { + // Acquire a delta ADS client + var client ads.DeltaClient + + var responses []*ads.DeltaDiscoveryResponse + for { + res, err := client.Recv() + if err != nil { + log.Panicf("Error receiving delta response: %v", err) + } + responses = append(responses, res) + + if ads.ParseRemainingChunksFromNonce(res.Nonce) == 0 { + break + } + } + + log.Printf("All responses received: %+v", responses) +} diff --git a/internal/server/handlers.go b/internal/server/handlers.go index 1b4aa34..a9eb5e2 100644 --- a/internal/server/handlers.go +++ b/internal/server/handlers.go @@ -324,7 +324,7 @@ func newSotWHandler( res := &ads.SotWDiscoveryResponse{ TypeUrl: typeUrl, - Nonce: utils.NewNonce(), + Nonce: utils.NewNonce(0), } for _, e := range entries { res.Resources = append(res.Resources, e.Resource.Resource) @@ -349,7 +349,7 @@ func newSotWHandler( res := &ads.SotWDiscoveryResponse{ TypeUrl: typeUrl, - Nonce: utils.NewNonce(), + Nonce: utils.NewNonce(0), } for _, r := range allResources { res.Resources = append(res.Resources, r.Resource.Resource) diff --git a/internal/server/handlers_delta.go b/internal/server/handlers_delta.go index 21d708e..1c92726 100644 --- a/internal/server/handlers_delta.go +++ b/internal/server/handlers_delta.go @@ -92,7 +92,7 @@ type deltaSender struct { queuedUpdates []queuedResourceUpdate // The minimum size an encoded chunk will serialize to, in bytes. Used to check whether a given // update can _ever_ be sent, and as the initial size of a chunk. Note that this value only depends - // on utils.NonceLength and the length of typeURL. + // on utils.MaxNonceLength and the length of typeURL. minChunkSize int } @@ -174,6 +174,11 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De "typeURL", ds.typeURL, "updates", len(ds.queuedUpdates), ) + for i, c := range chunks { + c.Nonce = utils.NewNonce(len(chunks) - i - 1) + } + } else { + chunks[0].Nonce = utils.NewNonce(0) } return chunks @@ -182,7 +187,6 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De func (ds *deltaSender) newChunk() *ads.DeltaDiscoveryResponse { return &ads.DeltaDiscoveryResponse{ TypeUrl: ds.typeURL, - Nonce: utils.NewNonce(), } } diff --git a/internal/server/handlers_delta_test.go b/internal/server/handlers_delta_test.go index c19e482..c421cd4 100644 --- a/internal/server/handlers_delta_test.go +++ b/internal/server/handlers_delta_test.go @@ -2,6 +2,7 @@ package internal import ( "context" + "slices" "strings" "sync/atomic" "testing" @@ -102,7 +103,7 @@ func TestInitialChunkSize(t *testing.T) { typeURL := utils.GetTypeURL[*wrapperspb.StringValue]() require.Equal(t, proto.Size(&ads.DeltaDiscoveryResponse{ TypeUrl: typeURL, - Nonce: utils.NewNonce(), + Nonce: utils.NewNonce(0), }), initialChunkSize(typeURL)) } @@ -121,11 +122,21 @@ func TestDeltaHandlerChunking(t *testing.T) { minChunkSize: initialChunkSize(typeURL), } - sentResponses := ds.chunk(map[string]entry{ + getSentResponses := func(resources map[string]entry, expectedChunks int) []*ads.DeltaDiscoveryResponse { + responses := ds.chunk(resources) + require.Len(t, responses, expectedChunks) + expectedRemainingChunks := 0 + for _, res := range slices.Backward(responses) { + require.Equal(t, expectedRemainingChunks, ads.ParseRemainingChunksFromNonce(res.Nonce)) + expectedRemainingChunks++ + } + return responses + } + + sentResponses := getSentResponses(map[string]entry{ foo.Name: {Resource: foo}, bar.Name: {Resource: bar}, - }) - + }, 2) require.Equal(t, len(sentResponses[0].Resources), 1) require.Equal(t, len(sentResponses[1].Resources), 1) response0 := sentResponses[0].Resources[0] @@ -142,10 +153,10 @@ func TestDeltaHandlerChunking(t *testing.T) { // Delete resources whose names are the same size as the resources to trip the chunker with the same conditions name1 := strings.Repeat("1", resourceSize) name2 := strings.Repeat("2", resourceSize) - sentResponses = ds.chunk(map[string]entry{ + sentResponses = getSentResponses(map[string]entry{ name1: {Resource: nil}, name2: {Resource: nil}, - }) + }, 2) require.Equal(t, len(sentResponses[0].RemovedResources), 1) require.Equal(t, len(sentResponses[1].RemovedResources), 1) require.ElementsMatch(t, @@ -156,12 +167,12 @@ func TestDeltaHandlerChunking(t *testing.T) { small1, small2, small3 := "a", "b", "c" wayTooBig := strings.Repeat("3", 10*resourceSize) - sentResponses = ds.chunk(map[string]entry{ + sentResponses = getSentResponses(map[string]entry{ small1: {Resource: nil}, small2: {Resource: nil}, small3: {Resource: nil}, wayTooBig: {Resource: nil}, - }) + }, 1) require.Equal(t, len(sentResponses[0].RemovedResources), 3) require.ElementsMatch(t, []string{small1, small2, small3}, sentResponses[0].RemovedResources) require.Equal(t, int64(1), statsHandler.DeltaResourcesOverMaxSize.Load()) diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 42d3029..f9107ec 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -2,8 +2,9 @@ package utils import ( "encoding/base64" + "encoding/binary" + "encoding/hex" "slices" - "strconv" "strings" "time" @@ -12,24 +13,33 @@ import ( "google.golang.org/protobuf/proto" ) -// NonceLength is the length of the string returned by NewNonce. NewNonce encodes the current UNIX -// time in nanos in hex encoding, so the nonce will be 16 characters if the current UNIX nano time is -// greater than 2^60-1. This is because it takes 16 hex characters to encode 64 bits, but only 15 to -// encode 60 bits (the output of strconv.FormatInt is not padded by 0s). 2^60-1 nanos from epoch time -// (January 1st 1970) is 2006-07-14 23:58:24.606, which as of this writing is over 17 years ago. This -// is why it's guaranteed that NonceLength will be 16 characters (before that date, encoding the -// nanos only required 15 characters). For the curious, the UNIX nano timestamp will overflow int64 -// some time in 2262, making this constant valid for the next few centuries. -const NonceLength = 16 - -// NewNonce creates a new unique nonce based on the current UNIX time in nanos. It always returns a -// string of length NonceLength. -func NewNonce() string { - // The second parameter to FormatInt is the base, e.g. 2 will return binary, 8 will return octal - // encoding, etc. 16 means FormatInt returns the integer in hex encoding, e.g. 30 => "1e" or - // 1704239351400 => "18ccc94c668". - const hexBase = 16 - return strconv.FormatInt(time.Now().UnixNano(), hexBase) +const ( + // NonceLength is the length of the string returned by NewNonce. NewNonce encodes the current UNIX + // time in nanos and the remaining chunks, encoded as 64-bit and 32-bit integers respectively, then + // hex encoded. This means a nonce will always be 8 + 4 bytes, multiplied by 2 by the hex encoding. + NonceLength = (8 + 4) * 2 +) + +// NewNonce creates a new unique nonce based on the current UNIX time in nanos, always returning a +// string of [NonceLength]. +func NewNonce(remainingChunks int) string { + return newNonce(time.Now(), remainingChunks) +} + +func newNonce(now time.Time, remainingChunks int) string { + // preallocating these buffers with constants (instead of doing `out = make([]byte, len(buf) * 2)`) + // means the compiler will allocate them on the stack, instead of heap. This significantly reduces + // the amount of garbage created by this function, as the only heap allocation will be the final + // string(out), rather than all of these buffers. + buf := make([]byte, NonceLength/2) + out := make([]byte, NonceLength) + + binary.BigEndian.PutUint64(buf[:8], uint64(now.UnixNano())) + binary.BigEndian.PutUint32(buf[8:], uint32(remainingChunks)) + + hex.Encode(out, buf) + + return string(out) } func GetTypeURL[T proto.Message]() string { diff --git a/internal/utils/utils_test.go b/internal/utils/utils_test.go index d959668..9af6ea1 100644 --- a/internal/utils/utils_test.go +++ b/internal/utils/utils_test.go @@ -1,7 +1,9 @@ package utils import ( + "fmt" "testing" + "time" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/linkedin/diderot/ads" @@ -46,6 +48,20 @@ func TestProtoMap(t *testing.T) { }) } -func TestNonceLength(t *testing.T) { - require.Len(t, NewNonce(), NonceLength) +func TestNewNonce(t *testing.T) { + now := time.Now() + t.Run("remainingChunks", func(t *testing.T) { + for _, expected := range []int{0, 42} { + nonce := newNonce(now, expected) + require.Equal(t, fmt.Sprintf("%x%08x", now.UnixNano(), expected), nonce) + actualRemainingChunks := ads.ParseRemainingChunksFromNonce(nonce) + require.Equal(t, expected, actualRemainingChunks) + } + }) + t.Run("badNonce", func(t *testing.T) { + require.Zero(t, ads.ParseRemainingChunksFromNonce("foo")) + }) + t.Run("oldNonce", func(t *testing.T) { + require.Zero(t, ads.ParseRemainingChunksFromNonce(fmt.Sprintf("%x", now.UnixNano()))) + }) } diff --git a/server.go b/server.go index b8980a7..92dec8d 100644 --- a/server.go +++ b/server.go @@ -210,7 +210,7 @@ func (s *ADSServer) StreamAggregatedResources(stream ads.SotWStream) (err error) return &ads.SotWDiscoveryResponse{ Resources: nil, TypeUrl: req.TypeUrl, - Nonce: utils.NewNonce(), + Nonce: utils.NewNonce(0), } }, setControlPlane: func(res *ads.SotWDiscoveryResponse, controlPlane *corev3.ControlPlane) { @@ -251,7 +251,7 @@ func (s *ADSServer) DeltaAggregatedResources(stream ads.DeltaStream) (err error) return &ads.DeltaDiscoveryResponse{ TypeUrl: req.GetTypeUrl(), RemovedResources: req.GetResourceNamesSubscribe(), - Nonce: utils.NewNonce(), + Nonce: utils.NewNonce(0), ControlPlane: s.controlPlane, } },