Skip to content

Commit

Permalink
Include response chunking info in nonce
Browse files Browse the repository at this point in the history
If a given delta response contains too many resources, the server will break it
up into multiple responses. However, this means the client does not know whether
it received all the resources for its subscription. This is especially relevant
for wildcard subscriptions, for which the client does not know the resources
ahead of time and therefore cannot wait for them explicitly. By returning
additional metadata in the nonce (there is no field for this in the delta
discovery response, though I'm hoping that will change
cncf/xds#99), the client can know if the server
chunked the response, and react accordingly.
  • Loading branch information
PapaCharlie committed Sep 27, 2024
1 parent d1254dc commit 14b0d8e
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 43 deletions.
20 changes: 12 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,23 @@ fmt: $(GOBIN)/goimports

# Can be used to change the number of tests run, defaults to 1 to prevent caching
TESTCOUNT = 1
# Can be used to change the verobosity of tests: make test TESTVERBOSE=-v
TESTVERBOSE =
# Can be used to generate coverage reports for a specific package
COVERPKG = $(PACKAGE)
# Can be used to add flags to the go test invocation: make test TESTFLAGS=-v
TESTFLAGS =
# Can be used to change which package gets tested, defaults to all packages.
TESTPKG = ./...

test: $(COVERAGE)
$(COVERAGE):
@mkdir -p $(@D)
go test -race -coverprofile=$(COVERAGE) -coverpkg=$(COVERPKG)/... -count=$(TESTCOUNT) $(TESTVERBOSE) $(TESTPKG)
test:
go test -race -count=$(TESTCOUNT) $(TESTFLAGS) $(TESTPKG)

# Can be used to generate coverage reports for a specific package
COVERPKG = $(PACKAGE)

.PHONY: $(COVERAGE)

coverage: $(COVERAGE)
$(COVERAGE):
@mkdir -p $(@D)
$(MAKE) test TESTFLAGS='-coverprofile=$(COVERAGE) -coverpkg=$(COVERPKG)/...'
go tool cover -html=$(COVERAGE)

profile_cache:
Expand Down
23 changes: 23 additions & 0 deletions ads/ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ protocol (ADS), such as convenient type aliases, constants and core definitions.
package ads

import (
"encoding/binary"
"encoding/hex"
"log/slog"
"sync"
"time"
Expand Down Expand Up @@ -255,3 +257,24 @@ func LookupStreamTypeByRPCMethod(rpcMethod string) (StreamType, bool) {
return UnknownStreamType, false
}
}

// ParseRemainingChunksFromNonce checks whether the Diderot server implementation chunked the delta
// responses because not all resources could fit in the same response without going over the default
// max gRPC message size of 4MB. A nonce from Diderot always starts with the 64-bit nanosecond
// timestamp of when the response was generated on the server. Then the number of remaining chunks as
// a 32-bit integer. The sequence of integers is binary encoded with [binary.BigEndian] then hex
// encoded. If the given nonce does not match the expected format, this function simply returns 0, as
// it means the nonce was not created by Diderot server implementation, and therefore does not
// contain the expected information.
func ParseRemainingChunksFromNonce(nonce string) (remainingChunks int) {
decoded, err := hex.DecodeString(nonce)
if err != nil {
return 0
}

if len(decoded) != 12 {
return 0
}

return int(binary.BigEndian.Uint32(decoded[8:12]))
}
27 changes: 27 additions & 0 deletions ads/ads_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ads_test

import (
"log"

"github.com/linkedin/diderot/ads"
)

func ExampleParseRemainingChunksFromNonce() {
// Acquire a delta ADS client
var client ads.DeltaClient

var responses []*ads.DeltaDiscoveryResponse
for {
res, err := client.Recv()
if err != nil {
log.Panicf("Error receiving delta response: %v", err)
}
responses = append(responses, res)

if ads.ParseRemainingChunksFromNonce(res.Nonce) == 0 {
break
}
}

log.Printf("All responses received: %+v", responses)
}
4 changes: 2 additions & 2 deletions internal/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func newSotWHandler(

res := &ads.SotWDiscoveryResponse{
TypeUrl: typeUrl,
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
}
for _, e := range entries {
res.Resources = append(res.Resources, e.Resource.Resource)
Expand All @@ -349,7 +349,7 @@ func newSotWHandler(

res := &ads.SotWDiscoveryResponse{
TypeUrl: typeUrl,
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
}
for _, r := range allResources {
res.Resources = append(res.Resources, r.Resource.Resource)
Expand Down
8 changes: 6 additions & 2 deletions internal/server/handlers_delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type deltaSender struct {
queuedUpdates []queuedResourceUpdate
// The minimum size an encoded chunk will serialize to, in bytes. Used to check whether a given
// update can _ever_ be sent, and as the initial size of a chunk. Note that this value only depends
// on utils.NonceLength and the length of typeURL.
// on utils.MaxNonceLength and the length of typeURL.
minChunkSize int
}

Expand Down Expand Up @@ -174,6 +174,11 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De
"typeURL", ds.typeURL,
"updates", len(ds.queuedUpdates),
)
for i, c := range chunks {
c.Nonce = utils.NewNonce(len(chunks) - i - 1)
}
} else {
chunks[0].Nonce = utils.NewNonce(0)
}

return chunks
Expand All @@ -182,7 +187,6 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De
func (ds *deltaSender) newChunk() *ads.DeltaDiscoveryResponse {
return &ads.DeltaDiscoveryResponse{
TypeUrl: ds.typeURL,
Nonce: utils.NewNonce(),
}
}

Expand Down
27 changes: 19 additions & 8 deletions internal/server/handlers_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"slices"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -102,7 +103,7 @@ func TestInitialChunkSize(t *testing.T) {
typeURL := utils.GetTypeURL[*wrapperspb.StringValue]()
require.Equal(t, proto.Size(&ads.DeltaDiscoveryResponse{
TypeUrl: typeURL,
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
}), initialChunkSize(typeURL))
}

Expand All @@ -121,11 +122,21 @@ func TestDeltaHandlerChunking(t *testing.T) {
minChunkSize: initialChunkSize(typeURL),
}

sentResponses := ds.chunk(map[string]entry{
getSentResponses := func(resources map[string]entry, expectedChunks int) []*ads.DeltaDiscoveryResponse {
responses := ds.chunk(resources)
require.Len(t, responses, expectedChunks)
expectedRemainingChunks := 0
for _, res := range slices.Backward(responses) {
require.Equal(t, expectedRemainingChunks, ads.ParseRemainingChunksFromNonce(res.Nonce))
expectedRemainingChunks++
}
return responses
}

sentResponses := getSentResponses(map[string]entry{
foo.Name: {Resource: foo},
bar.Name: {Resource: bar},
})

}, 2)
require.Equal(t, len(sentResponses[0].Resources), 1)
require.Equal(t, len(sentResponses[1].Resources), 1)
response0 := sentResponses[0].Resources[0]
Expand All @@ -142,10 +153,10 @@ func TestDeltaHandlerChunking(t *testing.T) {
// Delete resources whose names are the same size as the resources to trip the chunker with the same conditions
name1 := strings.Repeat("1", resourceSize)
name2 := strings.Repeat("2", resourceSize)
sentResponses = ds.chunk(map[string]entry{
sentResponses = getSentResponses(map[string]entry{
name1: {Resource: nil},
name2: {Resource: nil},
})
}, 2)
require.Equal(t, len(sentResponses[0].RemovedResources), 1)
require.Equal(t, len(sentResponses[1].RemovedResources), 1)
require.ElementsMatch(t,
Expand All @@ -156,12 +167,12 @@ func TestDeltaHandlerChunking(t *testing.T) {
small1, small2, small3 := "a", "b", "c"
wayTooBig := strings.Repeat("3", 10*resourceSize)

sentResponses = ds.chunk(map[string]entry{
sentResponses = getSentResponses(map[string]entry{
small1: {Resource: nil},
small2: {Resource: nil},
small3: {Resource: nil},
wayTooBig: {Resource: nil},
})
}, 1)
require.Equal(t, len(sentResponses[0].RemovedResources), 3)
require.ElementsMatch(t, []string{small1, small2, small3}, sentResponses[0].RemovedResources)
require.Equal(t, int64(1), statsHandler.DeltaResourcesOverMaxSize.Load())
Expand Down
48 changes: 29 additions & 19 deletions internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package utils

import (
"encoding/base64"
"encoding/binary"
"encoding/hex"
"slices"
"strconv"
"strings"
"time"

Expand All @@ -12,24 +13,33 @@ import (
"google.golang.org/protobuf/proto"
)

// NonceLength is the length of the string returned by NewNonce. NewNonce encodes the current UNIX
// time in nanos in hex encoding, so the nonce will be 16 characters if the current UNIX nano time is
// greater than 2^60-1. This is because it takes 16 hex characters to encode 64 bits, but only 15 to
// encode 60 bits (the output of strconv.FormatInt is not padded by 0s). 2^60-1 nanos from epoch time
// (January 1st 1970) is 2006-07-14 23:58:24.606, which as of this writing is over 17 years ago. This
// is why it's guaranteed that NonceLength will be 16 characters (before that date, encoding the
// nanos only required 15 characters). For the curious, the UNIX nano timestamp will overflow int64
// some time in 2262, making this constant valid for the next few centuries.
const NonceLength = 16

// NewNonce creates a new unique nonce based on the current UNIX time in nanos. It always returns a
// string of length NonceLength.
func NewNonce() string {
// The second parameter to FormatInt is the base, e.g. 2 will return binary, 8 will return octal
// encoding, etc. 16 means FormatInt returns the integer in hex encoding, e.g. 30 => "1e" or
// 1704239351400 => "18ccc94c668".
const hexBase = 16
return strconv.FormatInt(time.Now().UnixNano(), hexBase)
const (
// NonceLength is the length of the string returned by NewNonce. NewNonce encodes the current UNIX
// time in nanos and the remaining chunks, encoded as 64-bit and 32-bit integers respectively, then
// hex encoded. This means a nonce will always be 8 + 4 bytes, multiplied by 2 by the hex encoding.
NonceLength = (8 + 4) * 2
)

// NewNonce creates a new unique nonce based on the current UNIX time in nanos, always returning a
// string of [NonceLength].
func NewNonce(remainingChunks int) string {
return newNonce(time.Now(), remainingChunks)
}

func newNonce(now time.Time, remainingChunks int) string {
// preallocating these buffers with constants (instead of doing `out = make([]byte, len(buf) * 2)`)
// means the compiler will allocate them on the stack, instead of heap. This significantly reduces
// the amount of garbage created by this function, as the only heap allocation will be the final
// string(out), rather than all of these buffers.
buf := make([]byte, NonceLength/2)
out := make([]byte, NonceLength)

binary.BigEndian.PutUint64(buf[:8], uint64(now.UnixNano()))
binary.BigEndian.PutUint32(buf[8:], uint32(remainingChunks))

hex.Encode(out, buf)

return string(out)
}

func GetTypeURL[T proto.Message]() string {
Expand Down
20 changes: 18 additions & 2 deletions internal/utils/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package utils

import (
"fmt"
"testing"
"time"

"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/linkedin/diderot/ads"
Expand Down Expand Up @@ -46,6 +48,20 @@ func TestProtoMap(t *testing.T) {
})
}

func TestNonceLength(t *testing.T) {
require.Len(t, NewNonce(), NonceLength)
func TestNewNonce(t *testing.T) {
now := time.Now()
t.Run("remainingChunks", func(t *testing.T) {
for _, expected := range []int{0, 42} {
nonce := newNonce(now, expected)
require.Equal(t, fmt.Sprintf("%x%08x", now.UnixNano(), expected), nonce)
actualRemainingChunks := ads.ParseRemainingChunksFromNonce(nonce)
require.Equal(t, expected, actualRemainingChunks)
}
})
t.Run("badNonce", func(t *testing.T) {
require.Zero(t, ads.ParseRemainingChunksFromNonce("foo"))
})
t.Run("oldNonce", func(t *testing.T) {
require.Zero(t, ads.ParseRemainingChunksFromNonce(fmt.Sprintf("%x", now.UnixNano())))
})
}
4 changes: 2 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (s *ADSServer) StreamAggregatedResources(stream ads.SotWStream) (err error)
return &ads.SotWDiscoveryResponse{
Resources: nil,
TypeUrl: req.TypeUrl,
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
}
},
setControlPlane: func(res *ads.SotWDiscoveryResponse, controlPlane *corev3.ControlPlane) {
Expand Down Expand Up @@ -251,7 +251,7 @@ func (s *ADSServer) DeltaAggregatedResources(stream ads.DeltaStream) (err error)
return &ads.DeltaDiscoveryResponse{
TypeUrl: req.GetTypeUrl(),
RemovedResources: req.GetResourceNamesSubscribe(),
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
ControlPlane: s.controlPlane,
}
},
Expand Down

0 comments on commit 14b0d8e

Please sign in to comment.