Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rendezvous hash URI scorer #268

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
6 changes: 6 additions & 0 deletions changelog/@unreleased/pr-268.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: improvement
improvement:
description: Add rendezvous hash URI scorer to deterministically balance requests
across URIs
links:
- https://github.com/palantir/conjure-go-runtime/pull/268
24 changes: 23 additions & 1 deletion conjure-go-client/httpclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ func (c *clientImpl) Delete(ctx context.Context, params ...RequestParam) (*http.
}

func (c *clientImpl) Do(ctx context.Context, params ...RequestParam) (*http.Response, error) {
uris := c.uriScorer.CurrentURIScoringMiddleware().GetURIsInOrderOfIncreasingScore()
headers, err := getHeadersFromRequestParams(params...)
if err != nil {
return nil, err
}
uris := c.uriScorer.CurrentURIScoringMiddleware().GetURIsInOrderOfIncreasingScore(headers)
if len(uris) == 0 {
return nil, werror.WrapWithContextParams(ctx, ErrEmptyURIs, "", werror.SafeParam("serviceName", c.serviceName.CurrentString()))
}
Expand Down Expand Up @@ -111,6 +115,24 @@ func (c *clientImpl) Do(ctx context.Context, params ...RequestParam) (*http.Resp
}
}

func getHeadersFromRequestParams(params ...RequestParam) (http.Header, error) {
b := &requestBuilder{
headers: make(http.Header),
query: make(url.Values),
bodyMiddleware: &bodyMiddleware{},
}

for _, p := range params {
if p == nil {
continue
}
if err := p.apply(b); err != nil {
return nil, err
}
}
return b.headers, nil
}

func (c *clientImpl) doOnce(
ctx context.Context,
baseURI string,
Expand Down
12 changes: 12 additions & 0 deletions conjure-go-client/httpclient/client_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,3 +625,15 @@ func WithRandomURIScoring() ClientParam {
return nil
})
}

// WithRendezvousHashURIScoring adds middleware that deterministically routes to URIs based on a header.
func WithRendezvousHashURIScoring(hashHeader string) ClientParam {
return clientParamFunc(func(b *clientBuilder) error {
b.URIScorerBuilder = func(uris []string) internal.URIScoringMiddleware {
return internal.NewRendezvousHashURIScoringMiddleware(uris, hashHeader, func() int64 {
return time.Now().UnixNano()
})
}
return nil
})
}
2 changes: 1 addition & 1 deletion conjure-go-client/httpclient/config_refreshable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestRefreshableClientConfig(t *testing.T) {

t.Run("refreshable config without uris fails", func(t *testing.T) {
getClientURIs := func(client Client) []string {
return client.(*clientImpl).uriScorer.CurrentURIScoringMiddleware().GetURIsInOrderOfIncreasingScore()
return client.(*clientImpl).uriScorer.CurrentURIScoringMiddleware().GetURIsInOrderOfIncreasingScore(http.Header{"foo": []string{"foo"}})
}
refreshableClientConfig := RefreshableClientConfigFromServiceConfig(refreshableServicesConfig, serviceName)
client, err := NewClientFromRefreshableConfig(context.Background(), refreshableClientConfig)
Expand Down
4 changes: 2 additions & 2 deletions conjure-go-client/httpclient/internal/balanced_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
)

type URIScoringMiddleware interface {
GetURIsInOrderOfIncreasingScore() []string
GetURIsInOrderOfIncreasingScore(header http.Header) []string
RoundTrip(req *http.Request, next http.RoundTripper) (*http.Response, error)
}

Expand Down Expand Up @@ -60,7 +60,7 @@ func NewBalancedURIScoringMiddleware(uris []string, nanoClock func() int64) URIS
return &balancedScorer{uriInfos}
}

func (u *balancedScorer) GetURIsInOrderOfIncreasingScore() []string {
func (u *balancedScorer) GetURIsInOrderOfIncreasingScore(header http.Header) []string {
uris := make([]string, 0, len(u.uriInfos))
scores := make(map[string]int32, len(u.uriInfos))
for uri, info := range u.uriInfos {
Expand Down
4 changes: 2 additions & 2 deletions conjure-go-client/httpclient/internal/balanced_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func TestBalancedScorerRandomizesWithNoneInflight(t *testing.T) {
uris := []string{"uri1", "uri2", "uri3", "uri4", "uri5"}
scorer := NewBalancedURIScoringMiddleware(uris, func() int64 { return 0 })
scoredUris := scorer.GetURIsInOrderOfIncreasingScore()
scoredUris := scorer.GetURIsInOrderOfIncreasingScore(http.Header{})
assert.ElementsMatch(t, scoredUris, uris)
assert.NotEqual(t, scoredUris, uris)
}
Expand Down Expand Up @@ -53,6 +53,6 @@ func TestBalancedScoring(t *testing.T) {
assert.NoError(t, err)
}
}
scoredUris := scorer.GetURIsInOrderOfIncreasingScore()
scoredUris := scorer.GetURIsInOrderOfIncreasingScore(http.Header{})
assert.Equal(t, []string{server200.URL, server429.URL, server503.URL}, scoredUris)
}
16 changes: 10 additions & 6 deletions conjure-go-client/httpclient/internal/random_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ type randomScorer struct {
nanoClock func() int64
}

func (n *randomScorer) GetURIsInOrderOfIncreasingScore() []string {
uris := make([]string, len(n.uris))
copy(uris, n.uris)
rand.New(rand.NewSource(n.nanoClock())).Shuffle(len(uris), func(i, j int) {
uris[i], uris[j] = uris[j], uris[i]
func (n *randomScorer) GetURIsInOrderOfIncreasingScore(header http.Header) []string {
return getURIsInRandomOrder(n.uris, n.nanoClock())
}

func getURIsInRandomOrder(uris []string, seed int64) []string {
randomizedUris := make([]string, len(uris))
copy(randomizedUris, uris)
rand.New(rand.NewSource(seed)).Shuffle(len(randomizedUris), func(i, j int) {
randomizedUris[i], randomizedUris[j] = randomizedUris[j], randomizedUris[i]
})
return uris
return randomizedUris
}

func (n *randomScorer) RoundTrip(req *http.Request, next http.RoundTripper) (*http.Response, error) {
Expand Down
5 changes: 3 additions & 2 deletions conjure-go-client/httpclient/internal/random_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package internal

import (
"net/http"
"testing"
"time"

Expand All @@ -24,8 +25,8 @@ import (
func TestRandomScorerGetURIsRandomizes(t *testing.T) {
uris := []string{"uri1", "uri2", "uri3", "uri4", "uri5"}
scorer := NewRandomURIScoringMiddleware(uris, func() int64 { return time.Now().UnixNano() })
scoredUris1 := scorer.GetURIsInOrderOfIncreasingScore()
scoredUris2 := scorer.GetURIsInOrderOfIncreasingScore()
scoredUris1 := scorer.GetURIsInOrderOfIncreasingScore(http.Header{})
scoredUris2 := scorer.GetURIsInOrderOfIncreasingScore(http.Header{})
assert.ElementsMatch(t, scoredUris1, scoredUris2)
assert.NotEqual(t, scoredUris1, scoredUris2)
}
80 changes: 80 additions & 0 deletions conjure-go-client/httpclient/internal/rendezvous_hash_scorer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) 2021 Palantir Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"hash/fnv"
"net/http"
"sort"
)

type rendezvousHashScorer struct {
uris []string
hashHeaderKey string
nanoClock func() int64
}

func (r *rendezvousHashScorer) GetURIsInOrderOfIncreasingScore(header http.Header) []string {
hashHeaderValues, ok := header[r.hashHeaderKey]
if !ok || len(hashHeaderValues) == 0 {
return getURIsInRandomOrder(r.uris, r.nanoClock())
}
fnv.New64a()
uris := make([]string, 0, len(r.uris))
scores := make(map[string]uint32, len(r.uris))
hash := fnv.New32()
for _, uri := range r.uris {
hash.Reset()
if _, err := hash.Write([]byte(uri)); err != nil {
return nil
}
for _, value := range hashHeaderValues {
if _, err := hash.Write([]byte(value)); err != nil {
return nil
}
}
uris = append(uris, uri)
scores[uri] = hash.Sum32()
}
sort.Slice(uris, func(i, j int) bool {
return scores[uris[i]] < scores[uris[j]]
})
return uris
}

func (r *rendezvousHashScorer) RoundTrip(req *http.Request, next http.RoundTripper) (*http.Response, error) {
return next.RoundTrip(req)
}

// NewRendezvousHashURIScoringMiddleware returns a URI scorer that generates a deterministic ordering of the URIs
// based on the value of a header. The scorer hashes the header value along with the URI and sorts the URIs based on
// the value of the hash.
//
// The intent of this scoring strategy is to provide server-side read and write locality for clients - by providing the
// configured header based on a key from content in the request, clients can expect that requests for the same key are
// generally routed to the same URI. It is important clients do not rely on always reaching the same URIs for
// correctness as requests will be retried with other URIs in the case of failures.
//
// When the header is not present, the scorer randomizes the order of URIs by using a rand.Rand
// seeded by the nanoClock function.
//
// The middleware no-ops on each request.
func NewRendezvousHashURIScoringMiddleware(uris []string, hashHeader string, nanoClock func() int64) URIScoringMiddleware {
return &rendezvousHashScorer{
uris: uris,
hashHeaderKey: hashHeader,
nanoClock: nanoClock,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2021 Palantir Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

const (
hashHeader = "X-Route-Hash"
)

func TestRendezvousHashScorerRandomizesWithoutHeader(t *testing.T) {
uris := []string{"uri1", "uri2", "uri3", "uri4", "uri5"}
scorer := NewRendezvousHashURIScoringMiddleware(uris, hashHeader, func() int64 { return time.Now().UnixNano() })
scoredUris1 := scorer.GetURIsInOrderOfIncreasingScore(http.Header{})
scoredUris2 := scorer.GetURIsInOrderOfIncreasingScore(http.Header{})
assert.ElementsMatch(t, scoredUris1, scoredUris2)
assert.NotEqual(t, scoredUris1, scoredUris2)
}

func TestRendezvousHashScorerSortsUrisDeterministically(t *testing.T) {
uris := []string{"uri1", "uri2", "uri3", "uri4", "uri5"}
scorer := NewRendezvousHashURIScoringMiddleware(uris, hashHeader, func() int64 { return time.Now().UnixNano() })
scoredUris1 := scorer.GetURIsInOrderOfIncreasingScore(http.Header{hashHeader: []string{"foo"}})
scoredUris2 := scorer.GetURIsInOrderOfIncreasingScore(http.Header{hashHeader: []string{"foo"}})
assert.Equal(t, scoredUris1, scoredUris2)
}

func TestRendezvousHashScorerMultipleHashHeaders(t *testing.T) {
uris := []string{"uri1", "uri2", "uri3", "uri4", "uri5"}
scorer := NewRendezvousHashURIScoringMiddleware(uris, hashHeader, func() int64 { return time.Now().UnixNano() })
scoredUris1 := scorer.GetURIsInOrderOfIncreasingScore(http.Header{hashHeader: []string{"hash1", "hash2"}})
scoredUris2 := scorer.GetURIsInOrderOfIncreasingScore(http.Header{hashHeader: []string{"hash1", "hash2", "hash3"}})
assert.ElementsMatch(t, scoredUris1, scoredUris2)
assert.NotEqual(t, scoredUris1, scoredUris2)
}