Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include response chunking info in nonce #3

Merged
merged 4 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,23 @@ fmt: $(GOBIN)/goimports

# Can be used to change the number of tests run, defaults to 1 to prevent caching
TESTCOUNT = 1
# Can be used to change the verobosity of tests: make test TESTVERBOSE=-v
TESTVERBOSE =
# Can be used to generate coverage reports for a specific package
COVERPKG = $(PACKAGE)
# Can be used to add flags to the go test invocation: make test TESTFLAGS=-v
TESTFLAGS =
# Can be used to change which package gets tested, defaults to all packages.
TESTPKG = ./...

test: $(COVERAGE)
$(COVERAGE):
@mkdir -p $(@D)
go test -race -coverprofile=$(COVERAGE) -coverpkg=$(COVERPKG)/... -count=$(TESTCOUNT) $(TESTVERBOSE) $(TESTPKG)
test:
go test -race -count=$(TESTCOUNT) $(TESTFLAGS) $(TESTPKG)

# Can be used to generate coverage reports for a specific package
COVERPKG = $(PACKAGE)

.PHONY: $(COVERAGE)

coverage: $(COVERAGE)
$(COVERAGE):
@mkdir -p $(@D)
$(MAKE) test TESTFLAGS='-coverprofile=$(COVERAGE) -coverpkg=$(COVERPKG)/...'
go tool cover -html=$(COVERAGE)

profile_cache:
Expand Down
30 changes: 30 additions & 0 deletions ads/ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ protocol (ADS), such as convenient type aliases, constants and core definitions.
package ads

import (
"encoding/binary"
"encoding/hex"
"errors"
"log/slog"
"sync"
"time"
Expand Down Expand Up @@ -255,3 +258,30 @@ func LookupStreamTypeByRPCMethod(rpcMethod string) (StreamType, bool) {
return UnknownStreamType, false
}
}

var (
errInvalidNonceEncoding = errors.New("nonce isn't in hex encoding")
errInvalidNonceLength = errors.New("decoded nonce did not have expected length")
)

// ParseRemainingChunksFromNonce checks whether the Diderot server implementation chunked the delta
// responses because not all resources could fit in the same response without going over the default
// max gRPC message size of 4MB. A nonce from Diderot always starts with the 64-bit nanosecond
// timestamp of when the response was generated on the server. Then the number of remaining chunks as
// a 32-bit integer. The sequence of integers is binary encoded with [binary.BigEndian] then hex
// encoded. If the given nonce does not match the expected format, this function simply returns 0
// along with an error describing why it does not match. If the error isn't nil, it means the nonce
// was not created by a Diderot server implementation, and therefore does not contain the expected
// information.
func ParseRemainingChunksFromNonce(nonce string) (remainingChunks int, err error) {
decoded, err := hex.DecodeString(nonce)
if err != nil {
return 0, errInvalidNonceEncoding
}

if len(decoded) != 12 {
return 0, errInvalidNonceLength
}

return int(binary.BigEndian.Uint32(decoded[8:12])), nil
}
27 changes: 27 additions & 0 deletions ads/ads_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ads_test

import (
"log"

"github.com/linkedin/diderot/ads"
)

func ExampleParseRemainingChunksFromNonce() {
// Acquire a delta ADS client
var client ads.DeltaClient

var responses []*ads.DeltaDiscoveryResponse
for {
res, err := client.Recv()
if err != nil {
log.Panicf("Error receiving delta response: %v", err)
}
responses = append(responses, res)

if remaining, _ := ads.ParseRemainingChunksFromNonce(res.Nonce); remaining == 0 {
break
}
}

log.Printf("All responses received: %+v", responses)
}
4 changes: 2 additions & 2 deletions internal/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func newSotWHandler(

res := &ads.SotWDiscoveryResponse{
TypeUrl: typeUrl,
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
}
for _, e := range entries {
res.Resources = append(res.Resources, e.Resource.Resource)
Expand All @@ -349,7 +349,7 @@ func newSotWHandler(

res := &ads.SotWDiscoveryResponse{
TypeUrl: typeUrl,
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
}
for _, r := range allResources {
res.Resources = append(res.Resources, r.Resource.Resource)
Expand Down
6 changes: 5 additions & 1 deletion internal/server/handlers_delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De
"typeURL", ds.typeURL,
"updates", len(ds.queuedUpdates),
)
for i, c := range chunks {
c.Nonce = utils.NewNonce(len(chunks) - i - 1)
}
} else {
chunks[0].Nonce = utils.NewNonce(0)
}

return chunks
Expand All @@ -182,7 +187,6 @@ func (ds *deltaSender) chunk(resourceUpdates map[string]entry) (chunks []*ads.De
func (ds *deltaSender) newChunk() *ads.DeltaDiscoveryResponse {
return &ads.DeltaDiscoveryResponse{
TypeUrl: ds.typeURL,
Nonce: utils.NewNonce(),
}
}

Expand Down
29 changes: 21 additions & 8 deletions internal/server/handlers_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"slices"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -102,7 +103,7 @@ func TestInitialChunkSize(t *testing.T) {
typeURL := utils.GetTypeURL[*wrapperspb.StringValue]()
require.Equal(t, proto.Size(&ads.DeltaDiscoveryResponse{
TypeUrl: typeURL,
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
}), initialChunkSize(typeURL))
}

Expand All @@ -121,11 +122,23 @@ func TestDeltaHandlerChunking(t *testing.T) {
minChunkSize: initialChunkSize(typeURL),
}

sentResponses := ds.chunk(map[string]entry{
getSentResponses := func(resources map[string]entry, expectedChunks int) []*ads.DeltaDiscoveryResponse {
responses := ds.chunk(resources)
require.Len(t, responses, expectedChunks)
expectedRemainingChunks := 0
for _, res := range slices.Backward(responses) {
remaining, err := ads.ParseRemainingChunksFromNonce(res.Nonce)
require.NoError(t, err)
require.Equal(t, expectedRemainingChunks, remaining)
expectedRemainingChunks++
}
return responses
}

sentResponses := getSentResponses(map[string]entry{
foo.Name: {Resource: foo},
bar.Name: {Resource: bar},
})

}, 2)
require.Equal(t, len(sentResponses[0].Resources), 1)
require.Equal(t, len(sentResponses[1].Resources), 1)
response0 := sentResponses[0].Resources[0]
Expand All @@ -142,10 +155,10 @@ func TestDeltaHandlerChunking(t *testing.T) {
// Delete resources whose names are the same size as the resources to trip the chunker with the same conditions
name1 := strings.Repeat("1", resourceSize)
name2 := strings.Repeat("2", resourceSize)
sentResponses = ds.chunk(map[string]entry{
sentResponses = getSentResponses(map[string]entry{
name1: {Resource: nil},
name2: {Resource: nil},
})
}, 2)
require.Equal(t, len(sentResponses[0].RemovedResources), 1)
require.Equal(t, len(sentResponses[1].RemovedResources), 1)
require.ElementsMatch(t,
Expand All @@ -156,12 +169,12 @@ func TestDeltaHandlerChunking(t *testing.T) {
small1, small2, small3 := "a", "b", "c"
wayTooBig := strings.Repeat("3", 10*resourceSize)

sentResponses = ds.chunk(map[string]entry{
sentResponses = getSentResponses(map[string]entry{
small1: {Resource: nil},
small2: {Resource: nil},
small3: {Resource: nil},
wayTooBig: {Resource: nil},
})
}, 1)
require.Equal(t, len(sentResponses[0].RemovedResources), 3)
require.ElementsMatch(t, []string{small1, small2, small3}, sentResponses[0].RemovedResources)
require.Equal(t, int64(1), statsHandler.DeltaResourcesOverMaxSize.Load())
Expand Down
48 changes: 29 additions & 19 deletions internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package utils

import (
"encoding/base64"
"encoding/binary"
"encoding/hex"
"slices"
"strconv"
"strings"
"time"

Expand All @@ -12,24 +13,33 @@ import (
"google.golang.org/protobuf/proto"
)

// NonceLength is the length of the string returned by NewNonce. NewNonce encodes the current UNIX
// time in nanos in hex encoding, so the nonce will be 16 characters if the current UNIX nano time is
// greater than 2^60-1. This is because it takes 16 hex characters to encode 64 bits, but only 15 to
// encode 60 bits (the output of strconv.FormatInt is not padded by 0s). 2^60-1 nanos from epoch time
// (January 1st 1970) is 2006-07-14 23:58:24.606, which as of this writing is over 17 years ago. This
// is why it's guaranteed that NonceLength will be 16 characters (before that date, encoding the
// nanos only required 15 characters). For the curious, the UNIX nano timestamp will overflow int64
// some time in 2262, making this constant valid for the next few centuries.
const NonceLength = 16

// NewNonce creates a new unique nonce based on the current UNIX time in nanos. It always returns a
// string of length NonceLength.
func NewNonce() string {
// The second parameter to FormatInt is the base, e.g. 2 will return binary, 8 will return octal
// encoding, etc. 16 means FormatInt returns the integer in hex encoding, e.g. 30 => "1e" or
// 1704239351400 => "18ccc94c668".
const hexBase = 16
return strconv.FormatInt(time.Now().UnixNano(), hexBase)
const (
// NonceLength is the length of the string returned by NewNonce. NewNonce encodes the current UNIX
// time in nanos and the remaining chunks, encoded as 64-bit and 32-bit integers respectively, then
// hex encoded. This means a nonce will always be 8 + 4 bytes, multiplied by 2 by the hex encoding.
NonceLength = (8 + 4) * 2
)

// NewNonce creates a new unique nonce based on the current UNIX time in nanos, always returning a
// string of [NonceLength].
func NewNonce(remainingChunks int) string {
Comment on lines +23 to +25

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a smart yet big change to the nonce. To make sure it's backward compatible, I checked Rest.li xds client is not using the nonce in any way currently. May want to let Xin and Yan know to make sure Envoy is not using it too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No one is, it's not supposed to be used for anything other than ACKs/NACKs, it's not meant to be interpreted more than that. Envoy doesn't use it at all for sure.

return newNonce(time.Now(), remainingChunks)
}

func newNonce(now time.Time, remainingChunks int) string {
// preallocating these buffers with constants (instead of doing `out = make([]byte, len(buf) * 2)`)
// means the compiler will allocate them on the stack, instead of heap. This significantly reduces
// the amount of garbage created by this function, as the only heap allocation will be the final
// string(out), rather than all of these buffers.
buf := make([]byte, NonceLength/2)
out := make([]byte, NonceLength)

binary.BigEndian.PutUint64(buf[:8], uint64(now.UnixNano()))
binary.BigEndian.PutUint32(buf[8:], uint32(remainingChunks))

hex.Encode(out, buf)

return string(out)
}

func GetTypeURL[T proto.Message]() string {
Expand Down
25 changes: 23 additions & 2 deletions internal/utils/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package utils

import (
"fmt"
"testing"
"time"

"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/linkedin/diderot/ads"
Expand Down Expand Up @@ -46,6 +48,25 @@ func TestProtoMap(t *testing.T) {
})
}

func TestNonceLength(t *testing.T) {
require.Len(t, NewNonce(), NonceLength)
func TestNewNonce(t *testing.T) {
now := time.Now()
t.Run("remainingChunks", func(t *testing.T) {
for _, expected := range []int{0, 42} {
nonce := newNonce(now, expected)
require.Equal(t, fmt.Sprintf("%x%08x", now.UnixNano(), expected), nonce)
actualRemainingChunks, err := ads.ParseRemainingChunksFromNonce(nonce)
require.NoError(t, err)
require.Equal(t, expected, actualRemainingChunks)
}
})
t.Run("badNonce", func(t *testing.T) {
remaining, err := ads.ParseRemainingChunksFromNonce("foo")
require.Error(t, err)
require.Zero(t, remaining)
})
t.Run("oldNonce", func(t *testing.T) {
remaining, err := ads.ParseRemainingChunksFromNonce(fmt.Sprintf("%x", now.UnixNano()))
require.Error(t, err)
require.Zero(t, remaining)
})
}
4 changes: 2 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (s *ADSServer) StreamAggregatedResources(stream ads.SotWStream) (err error)
return &ads.SotWDiscoveryResponse{
Resources: nil,
TypeUrl: req.TypeUrl,
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
}
},
setControlPlane: func(res *ads.SotWDiscoveryResponse, controlPlane *corev3.ControlPlane) {
Expand Down Expand Up @@ -251,7 +251,7 @@ func (s *ADSServer) DeltaAggregatedResources(stream ads.DeltaStream) (err error)
return &ads.DeltaDiscoveryResponse{
TypeUrl: req.GetTypeUrl(),
RemovedResources: req.GetResourceNamesSubscribe(),
Nonce: utils.NewNonce(),
Nonce: utils.NewNonce(0),
ControlPlane: s.controlPlane,
}
},
Expand Down