Skip to content

Commit

Permalink
Refactoring the interceptor (#105)
Browse files Browse the repository at this point in the history
* Refactoring the interceptor

- Adding more testing to the proxy
- Refactoring the proxy to use standard net/http (gives it more control over the response lifecycle)
- Using the standard library test framework, because stretchr/testify/suite was not adding much (stretchr/testify/require is still in use though)

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding test for holding the connection

Signed-off-by: Aaron Schlesinger <[email protected]>

* removing dead code

Signed-off-by: Aaron Schlesinger <[email protected]>

* removing dead code

Signed-off-by: Aaron Schlesinger <[email protected]>

* using the getter/watcher logic in the proxy handler

Signed-off-by: Aaron Schlesinger <[email protected]>

* progress on fast getter/watcher implementation:

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding deployment cache

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding a backoff for the network connection

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding test targets to the magefile

Signed-off-by: Aaron Schlesinger <[email protected]>

* Fixing and adding more tests to the proxy

Signed-off-by: Aaron Schlesinger <[email protected]>

* Adding test to ensure proxy handler holds until >0 replicas on target deployment

Signed-off-by: Aaron Schlesinger <[email protected]>

* refactoring in progress and more tests

Signed-off-by: Aaron Schlesinger <[email protected]>

* todo test

Signed-off-by: Aaron Schlesinger <[email protected]>

* fixing hanging issue

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding test for slow origin, and a TODO in the proxy tests

Signed-off-by: Aaron Schlesinger <[email protected]>

* TODO tests

Signed-off-by: Aaron Schlesinger <[email protected]>

* more work on connection retry and backoff

Signed-off-by: Aaron Schlesinger <[email protected]>

* logging nested errors

Signed-off-by: Aaron Schlesinger <[email protected]>

* better test checks and explanatory comments

Signed-off-by: Aaron Schlesinger <[email protected]>

* moving sumExp utility function to top

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding tests for DialContextWithRetry

Signed-off-by: Aaron Schlesinger <[email protected]>

* refactoring backoff etc...

Signed-off-by: Aaron Schlesinger <[email protected]>

* refactoring the dialer

Signed-off-by: Aaron Schlesinger <[email protected]>

* refactoring if/else multiple return statements

Signed-off-by: Aaron Schlesinger <[email protected]>

* removing addition from num steps

Signed-off-by: Aaron Schlesinger <[email protected]>

* removing dead code and updating knative attribution comment

Signed-off-by: Aaron Schlesinger <[email protected]>

* removing net error type check

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding one more test

Signed-off-by: Aaron Schlesinger <[email protected]>

* implementing TODO tests and cleaning up unused and logging code

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding wait func timeout

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding waitFunc timeout logic

Signed-off-by: Aaron Schlesinger <[email protected]>

* starting tests for the deployment cache

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding deployment cache watch test

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding timing checks to the deployment cache test

Signed-off-by: Aaron Schlesinger <[email protected]>

* Fixing the last broken test

Turns out, don't run a handler in a goroutine because (net/http/httptest).ResponseRecorder
is not concurrency-safe. Thanks to @asw101 and @khaosdoctor for help on this!

Signed-off-by: Aaron Schlesinger <[email protected]>

* Tidying up interceptor config and making operator set more config vars

Signed-off-by: Aaron Schlesinger <[email protected]>

* fixing tests and compile errs

Signed-off-by: Aaron Schlesinger <[email protected]>
  • Loading branch information
arschles authored Apr 12, 2021
1 parent 728437a commit 6b8d4ab
Show file tree
Hide file tree
Showing 28 changed files with 1,457 additions and 132 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ go 1.16
require (
github.com/go-logr/logr v0.3.0
github.com/golang/protobuf v1.4.3
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/labstack/echo/v4 v4.2.1
github.com/magefile/mage v1.11.0
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/onsi/ginkgo v1.15.2
github.com/onsi/gomega v1.11.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
google.golang.org/grpc v1.33.2
google.golang.org/protobuf v1.25.0
k8s.io/api v0.20.2
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
Expand Down Expand Up @@ -498,6 +500,8 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
27 changes: 16 additions & 11 deletions interceptor/admin_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package main
import (
"encoding/json"
"errors"
"testing"

"github.com/stretchr/testify/require"
)

func (i *InterceptorSuite) TestQueueSizeHandlerSuccess() {
func TestQueueSizeHandlerSuccess(t *testing.T) {
r := require.New(t)
reader := &fakeQueueCountReader{
current: 123,
err: nil,
Expand All @@ -14,20 +18,21 @@ func (i *InterceptorSuite) TestQueueSizeHandlerSuccess() {
handler := newQueueSizeHandler(reader)
_, echoCtx, rec := newTestCtx("GET", "/queue")
err := handler(echoCtx)
i.NoError(err)
i.Equal(200, rec.Code, "response code")
r.NoError(err)
r.Equal(200, rec.Code, "response code")
respMap := map[string]int{}
decodeErr := json.NewDecoder(rec.Body).Decode(&respMap)
i.NoError(decodeErr)
i.Equalf(1, len(respMap), "response JSON length was not 1")
r.NoError(decodeErr)
r.Equalf(1, len(respMap), "response JSON length was not 1")
sizeVal, ok := respMap["current_size"]
i.Truef(ok, "'current_size' entry not available in return JSON")
i.Equalf(reader.current, sizeVal, "returned JSON queue size was wrong")
r.Truef(ok, "'current_size' entry not available in return JSON")
r.Equalf(reader.current, sizeVal, "returned JSON queue size was wrong")
reader.err = errors.New("test error")
i.Error(handler(echoCtx))
r.Error(handler(echoCtx))
}

func (i *InterceptorSuite) TestQueueSizeHandlerFail() {
func TestQueueSizeHandlerFail(t *testing.T) {
r := require.New(t)
reader := &fakeQueueCountReader{
current: 0,
err: errors.New("test error"),
Expand All @@ -36,6 +41,6 @@ func (i *InterceptorSuite) TestQueueSizeHandlerFail() {
handler := newQueueSizeHandler(reader)
_, echoCtx, rec := newTestCtx("GET", "/queue")
err := handler(echoCtx)
i.Error(err)
i.Equal(500, rec.Code, "response code")
r.Error(err)
r.Equal(500, rec.Code, "response code")
}
38 changes: 38 additions & 0 deletions interceptor/config/origin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package config

import (
"fmt"
"net/url"

"github.com/kelseyhightower/envconfig"
)

// Origin is the configuration for where and how the proxy forwards
// requests to a backing Kubernetes service
type Origin struct {
// AppServiceName is the name of the service that fronts the user's app
AppServiceName string `envconfig:"KEDA_HTTP_APP_SERVICE_NAME" required:"true"`
// AppServiecPort the port that that the proxy should forward to
AppServicePort string `envconfig:"KEDA_HTTP_APP_SERVICE_PORT" required:"true"`
// TargetDeploymentName is the name of the backing deployment that the interceptor
// should forward to
TargetDeploymentName string `envconfig:"KEDA_HTTP_TARGET_DEPLOYMENT_NAME" required:"true"`
// Namespace is the namespace that this interceptor is running in
Namespace string `envconfig:"KEDA_HTTP_NAMESPACE" required:"true"`
}

// ServiceURL formats the app service name and port into a URL
func (o *Origin) ServiceURL() (*url.URL, error) {
urlStr := fmt.Sprintf("http://%s:%s", o.AppServiceName, o.AppServicePort)
u, err := url.Parse(urlStr)
if err != nil {
return nil, err
}
return u, nil
}

func MustParseOrigin() *Origin {
ret := new(Origin)
envconfig.MustProcess("", ret)
return ret
}
24 changes: 24 additions & 0 deletions interceptor/config/serving.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package config

import (
"github.com/kelseyhightower/envconfig"
)

// Serving is configuration for how the interceptor serves the proxy
// and admin server
type Serving struct {
// ProxyPort is the port that the public proxy should run on
ProxyPort int `envconfig:"KEDA_HTTP_PROXY_PORT" required:"true"`
// AdminPort is the port that the internal admin server should run on.
// This is the server that the external scaler will issue metrics
// requests to
AdminPort int `envconfig:"KEDA_HTTP_ADMIN_PORT" required:"true"`
}

// Parse parses standard configs using envconfig and returns a pointer to the
// newly created config. Returns nil and a non-nil error if parsing failed
func MustParseServing() *Serving {
ret := new(Serving)
envconfig.MustProcess("", ret)
return ret
}
46 changes: 46 additions & 0 deletions interceptor/config/timeouts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package config

import (
"time"

"github.com/kelseyhightower/envconfig"
"k8s.io/apimachinery/pkg/util/wait"
)

// Timeouts is the configuration for connection and HTTP timeouts
type Timeouts struct {
// Connect is the connection timeout
Connect time.Duration `envconfig:"KEDA_HTTP_CONNECT_TIMEOUT" default:"500ms"`
// KeepAlive is the interval between keepalive probes
KeepAlive time.Duration `envconfig:"KEDA_HTTP_KEEP_ALIVE" default:"1s"`
// ResponseHeaderTimeout is how long to wait between when the HTTP request
// is sent to the backing app and when response headers need to arrive
ResponseHeader time.Duration `envconfig:"KEDA_RESPONSE_HEADER_TIMEOUT" default:"500ms"`
// DeploymentReplicas is how long to wait for the backing deployment
// to have 1 or more replicas before connecting and sending the HTTP request.
DeploymentReplicas time.Duration `envconfig:"KEDA_CONDITION_WAIT_TIMEOUT" default:"1500ms"`
}

// Backoff returns a wait.Backoff based on the timeouts in t
func (t *Timeouts) Backoff(factor, jitter float64, steps int) wait.Backoff {
return wait.Backoff{
Duration: t.Connect,
Factor: factor,
Jitter: jitter,
Steps: steps,
}
}

// DefaultBackoff calls t.Backoff with reasonable defaults and returns
// the result
func (t *Timeouts) DefaultBackoff() wait.Backoff {
return t.Backoff(2, 0.5, 5)
}

// Parse parses standard configs using envconfig and returns a pointer to the
// newly created config. Returns nil and a non-nil error if parsing failed
func MustParseTimeouts() *Timeouts {
ret := new(Timeouts)
envconfig.MustProcess("", ret)
return ret
}
58 changes: 58 additions & 0 deletions interceptor/forward_wait_func.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package main

import (
"fmt"
"log"
"time"

"github.com/kedacore/http-add-on/pkg/k8s"
appsv1 "k8s.io/api/apps/v1"
)

type forwardWaitFunc func() error

func newDeployReplicasForwardWaitFunc(
deployCache k8s.DeploymentCache,
deployName string,
totalWait time.Duration,
) forwardWaitFunc {
return func() error {
deployment, err := deployCache.Get(deployName)
if err != nil {
// if we didn't get the initial deployment state, bail out
return fmt.Errorf("Error getting state for deployment %s (%s)", deployName, err)
}
// if there is 1 or more replica, we're done waiting
if moreThanPtr(deployment.Spec.Replicas, 0) {
return nil
}

watcher := deployCache.Watch(deployName)
if err != nil {
return fmt.Errorf("Error getting the stream of deployment changes")
}
defer watcher.Stop()
eventCh := watcher.ResultChan()
timer := time.NewTimer(totalWait)
defer timer.Stop()
for {
select {
case event := <-eventCh:
deployment := event.Object.(*appsv1.Deployment)
if err != nil {
log.Printf(
"Error getting deployment %s after change was triggered (%s)",
deployName,
err,
)
}
if moreThanPtr(deployment.Spec.Replicas, 0) {
return nil
}
case <-timer.C:
// otherwise, if we hit the end of the timeout, fail
return fmt.Errorf("Timeout expired waiting for deployment %s to reach > 0 replicas", deployName)
}
}
}
}
106 changes: 106 additions & 0 deletions interceptor/forward_wait_func_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main

import (
"context"
"testing"
"time"

"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/watch"
)

// Test to make sure the wait function returns a nil error if there is immediately
// one replica on the target deployment
func TestForwardWaitFuncOneReplica(t *testing.T) {
r := require.New(t)
const ns = "testNS"
const deployName = "TestForwardingHandlerDeploy"
cache := k8s.NewMemoryDeploymentCache(map[string]*appsv1.Deployment{
deployName: k8s.NewDeployment(
ns,
deployName,
"myimage",
[]int32{123},
nil,
map[string]string{},
),
})
waitFunc := newDeployReplicasForwardWaitFunc(
cache,
deployName,
1*time.Second,
)

ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer done()
group, ctx := errgroup.WithContext(ctx)
group.Go(waitFunc)
r.NoError(group.Wait())
}

// Test to make sure the wait function returns an error if there are no replicas, and that doesn't change
// within a timeout
func TestForwardWaitFuncNoReplicas(t *testing.T) {
r := require.New(t)
const ns = "testNS"
const deployName = "TestForwardingHandlerHoldsDeployment"
deployment := k8s.NewDeployment(
ns,
deployName,
"myimage",
[]int32{123},
nil,
map[string]string{},
)
deployment.Spec.Replicas = k8s.Int32P(0)
cache := k8s.NewMemoryDeploymentCache(map[string]*appsv1.Deployment{
deployName: deployment,
})

const timeout = 200 * time.Millisecond
waitFunc := newDeployReplicasForwardWaitFunc(
cache,
deployName,
1*time.Second,
)

err := waitFunc()
r.Error(err)
}

func TestWaitFuncWaitsUntilReplicas(t *testing.T) {
r := require.New(t)
totalWaitDur := 500 * time.Millisecond

const ns = "testNS"
const deployName = "TestForwardingHandlerHoldsDeployment"
deployment := k8s.NewDeployment(
ns,
deployName,
"myimage",
[]int32{123},
nil,
map[string]string{},
)
deployment.Spec.Replicas = k8s.Int32P(0)
cache := k8s.NewMemoryDeploymentCache(map[string]*appsv1.Deployment{
deployName: deployment,
})
waitFunc := newDeployReplicasForwardWaitFunc(cache, deployName, totalWaitDur)
// this channel will be closed immediately after the replicas were increased
replicasIncreasedCh := make(chan struct{})
go func() {
time.Sleep(totalWaitDur / 2)
cache.RWM.RLock()
defer cache.RWM.RUnlock()
watcher := cache.Watchers[deployName]
modifiedDeployment := deployment.DeepCopy()
modifiedDeployment.Spec.Replicas = k8s.Int32P(1)
watcher.Action(watch.Modified, modifiedDeployment)
close(replicasIncreasedCh)
}()
r.NoError(waitFunc())
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,10 @@ package main

import (
"net/http/httptest"
"testing"

echo "github.com/labstack/echo/v4"
"github.com/stretchr/testify/suite"
)

type InterceptorSuite struct {
suite.Suite
}

func TestInterceptor(t *testing.T) {
suite.Run(t, new(InterceptorSuite))
}

func newTestCtx(method, path string) (*echo.Echo, echo.Context, *httptest.ResponseRecorder) {
req := httptest.NewRequest(method, path, nil)
rec := httptest.NewRecorder()
Expand Down
Loading

0 comments on commit 6b8d4ab

Please sign in to comment.