Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring the interceptor #105

Merged
merged 41 commits into from
Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f054c63
Refactoring the interceptor
arschles Mar 17, 2021
5c36256
adding test for holding the connection
arschles Mar 17, 2021
f12503c
removing dead code
arschles Mar 17, 2021
b0beb9c
removing dead code
arschles Mar 17, 2021
0ea3aca
using the getter/watcher logic in the proxy handler
arschles Mar 18, 2021
30d0aa0
progress on fast getter/watcher implementation:
arschles Mar 18, 2021
ae259d2
Merge branch 'main' into interceptor-hold
arschles Mar 19, 2021
06e95ef
Merge branch 'main' into interceptor-hold
arschles Mar 22, 2021
7a13f68
adding deployment cache
arschles Mar 22, 2021
d7081e0
adding a backoff for the network connection
arschles Mar 22, 2021
db44937
adding test targets to the magefile
arschles Mar 23, 2021
49daba3
Fixing and adding more tests to the proxy
arschles Mar 23, 2021
060e492
Adding test to ensure proxy handler holds until >0 replicas on target…
arschles Mar 23, 2021
e71c432
refactoring in progress and more tests
arschles Mar 23, 2021
c0233da
todo test
arschles Mar 23, 2021
19d1348
fixing hanging issue
arschles Mar 23, 2021
89f6962
adding test for slow origin, and a TODO in the proxy tests
arschles Mar 23, 2021
876f08f
TODO tests
arschles Mar 23, 2021
0feb8ea
more work on connection retry and backoff
arschles Mar 24, 2021
fbe7770
logging nested errors
arschles Mar 25, 2021
fe987dd
better test checks and explanatory comments
arschles Mar 25, 2021
89d7376
moving sumExp utility function to top
arschles Mar 25, 2021
151c4c2
Merge branch 'main' into interceptor-hold
arschles Mar 25, 2021
db4fac4
adding tests for DialContextWithRetry
arschles Mar 25, 2021
8d8613b
refactoring backoff etc...
arschles Mar 25, 2021
2e8ca28
refactoring the dialer
arschles Mar 25, 2021
17c8f8f
refactoring if/else multiple return statements
arschles Mar 25, 2021
4795b8c
removing addition from num steps
arschles Mar 25, 2021
1aa77b5
removing dead code and updating knative attribution comment
arschles Mar 25, 2021
9f1c953
removing net error type check
arschles Mar 25, 2021
9ca9f79
adding one more test
arschles Mar 25, 2021
6053ab3
implementing TODO tests and cleaning up unused and logging code
arschles Mar 25, 2021
95dd529
adding wait func timeout
arschles Mar 25, 2021
093a16b
adding waitFunc timeout logic
arschles Mar 25, 2021
267e58f
starting tests for the deployment cache
arschles Mar 25, 2021
bafbd8b
adding deployment cache watch test
arschles Mar 25, 2021
979ba9d
adding timing checks to the deployment cache test
arschles Mar 25, 2021
6160d5d
Fixing the last broken test
arschles Mar 29, 2021
112d21e
Merge branch 'main' into interceptor-hold
arschles Mar 31, 2021
9c13238
Tidying up interceptor config and making operator set more config vars
arschles Mar 31, 2021
60779f3
fixing tests and compile errs
arschles Apr 12, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ go 1.16
require (
github.com/go-logr/logr v0.3.0
github.com/golang/protobuf v1.4.3
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/labstack/echo/v4 v4.2.1
github.com/magefile/mage v1.11.0
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/onsi/ginkgo v1.15.2
github.com/onsi/gomega v1.11.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
google.golang.org/grpc v1.33.2
google.golang.org/protobuf v1.25.0
k8s.io/api v0.20.2
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
Expand Down Expand Up @@ -498,6 +500,8 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
27 changes: 16 additions & 11 deletions interceptor/admin_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package main
import (
"encoding/json"
"errors"
"testing"

"github.com/stretchr/testify/require"
)

func (i *InterceptorSuite) TestQueueSizeHandlerSuccess() {
func TestQueueSizeHandlerSuccess(t *testing.T) {
r := require.New(t)
reader := &fakeQueueCountReader{
current: 123,
err: nil,
Expand All @@ -14,20 +18,21 @@ func (i *InterceptorSuite) TestQueueSizeHandlerSuccess() {
handler := newQueueSizeHandler(reader)
_, echoCtx, rec := newTestCtx("GET", "/queue")
err := handler(echoCtx)
i.NoError(err)
i.Equal(200, rec.Code, "response code")
r.NoError(err)
r.Equal(200, rec.Code, "response code")
respMap := map[string]int{}
decodeErr := json.NewDecoder(rec.Body).Decode(&respMap)
i.NoError(decodeErr)
i.Equalf(1, len(respMap), "response JSON length was not 1")
r.NoError(decodeErr)
r.Equalf(1, len(respMap), "response JSON length was not 1")
sizeVal, ok := respMap["current_size"]
i.Truef(ok, "'current_size' entry not available in return JSON")
i.Equalf(reader.current, sizeVal, "returned JSON queue size was wrong")
r.Truef(ok, "'current_size' entry not available in return JSON")
r.Equalf(reader.current, sizeVal, "returned JSON queue size was wrong")
reader.err = errors.New("test error")
i.Error(handler(echoCtx))
r.Error(handler(echoCtx))
}

func (i *InterceptorSuite) TestQueueSizeHandlerFail() {
func TestQueueSizeHandlerFail(t *testing.T) {
r := require.New(t)
reader := &fakeQueueCountReader{
current: 0,
err: errors.New("test error"),
Expand All @@ -36,6 +41,6 @@ func (i *InterceptorSuite) TestQueueSizeHandlerFail() {
handler := newQueueSizeHandler(reader)
_, echoCtx, rec := newTestCtx("GET", "/queue")
err := handler(echoCtx)
i.Error(err)
i.Equal(500, rec.Code, "response code")
r.Error(err)
r.Equal(500, rec.Code, "response code")
}
38 changes: 38 additions & 0 deletions interceptor/config/origin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package config

import (
"fmt"
"net/url"

"github.com/kelseyhightower/envconfig"
)

// Origin is the configuration for where and how the proxy forwards
// requests to a backing Kubernetes service
type Origin struct {
// AppServiceName is the name of the service that fronts the user's app
AppServiceName string `envconfig:"KEDA_HTTP_APP_SERVICE_NAME" required:"true"`
// AppServiecPort the port that that the proxy should forward to
AppServicePort string `envconfig:"KEDA_HTTP_APP_SERVICE_PORT" required:"true"`
// TargetDeploymentName is the name of the backing deployment that the interceptor
// should forward to
TargetDeploymentName string `envconfig:"KEDA_HTTP_TARGET_DEPLOYMENT_NAME" required:"true"`
// Namespace is the namespace that this interceptor is running in
Namespace string `envconfig:"KEDA_HTTP_NAMESPACE" required:"true"`
}

// ServiceURL formats the app service name and port into a URL
func (o *Origin) ServiceURL() (*url.URL, error) {
urlStr := fmt.Sprintf("http://%s:%s", o.AppServiceName, o.AppServicePort)
u, err := url.Parse(urlStr)
if err != nil {
return nil, err
}
return u, nil
}

func MustParseOrigin() *Origin {
ret := new(Origin)
envconfig.MustProcess("", ret)
return ret
}
24 changes: 24 additions & 0 deletions interceptor/config/serving.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package config

import (
"github.com/kelseyhightower/envconfig"
)

// Serving is configuration for how the interceptor serves the proxy
// and admin server
type Serving struct {
// ProxyPort is the port that the public proxy should run on
ProxyPort int `envconfig:"KEDA_HTTP_PROXY_PORT" required:"true"`
// AdminPort is the port that the internal admin server should run on.
// This is the server that the external scaler will issue metrics
// requests to
AdminPort int `envconfig:"KEDA_HTTP_ADMIN_PORT" required:"true"`
}

// Parse parses standard configs using envconfig and returns a pointer to the
// newly created config. Returns nil and a non-nil error if parsing failed
func MustParseServing() *Serving {
ret := new(Serving)
envconfig.MustProcess("", ret)
return ret
}
46 changes: 46 additions & 0 deletions interceptor/config/timeouts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package config

import (
"time"

"github.com/kelseyhightower/envconfig"
"k8s.io/apimachinery/pkg/util/wait"
)

// Timeouts is the configuration for connection and HTTP timeouts
type Timeouts struct {
// Connect is the connection timeout
Connect time.Duration `envconfig:"KEDA_HTTP_CONNECT_TIMEOUT" default:"500ms"`
// KeepAlive is the interval between keepalive probes
KeepAlive time.Duration `envconfig:"KEDA_HTTP_KEEP_ALIVE" default:"1s"`
// ResponseHeaderTimeout is how long to wait between when the HTTP request
// is sent to the backing app and when response headers need to arrive
ResponseHeader time.Duration `envconfig:"KEDA_RESPONSE_HEADER_TIMEOUT" default:"500ms"`
// DeploymentReplicas is how long to wait for the backing deployment
// to have 1 or more replicas before connecting and sending the HTTP request.
DeploymentReplicas time.Duration `envconfig:"KEDA_CONDITION_WAIT_TIMEOUT" default:"1500ms"`
}

// Backoff returns a wait.Backoff based on the timeouts in t
func (t *Timeouts) Backoff(factor, jitter float64, steps int) wait.Backoff {
return wait.Backoff{
Duration: t.Connect,
Factor: factor,
Jitter: jitter,
Steps: steps,
}
}

// DefaultBackoff calls t.Backoff with reasonable defaults and returns
// the result
func (t *Timeouts) DefaultBackoff() wait.Backoff {
return t.Backoff(2, 0.5, 5)
}

// Parse parses standard configs using envconfig and returns a pointer to the
// newly created config. Returns nil and a non-nil error if parsing failed
func MustParseTimeouts() *Timeouts {
ret := new(Timeouts)
envconfig.MustProcess("", ret)
return ret
}
58 changes: 58 additions & 0 deletions interceptor/forward_wait_func.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package main

import (
"fmt"
"log"
"time"

"github.com/kedacore/http-add-on/pkg/k8s"
appsv1 "k8s.io/api/apps/v1"
)

type forwardWaitFunc func() error

func newDeployReplicasForwardWaitFunc(
deployCache k8s.DeploymentCache,
deployName string,
totalWait time.Duration,
) forwardWaitFunc {
return func() error {
deployment, err := deployCache.Get(deployName)
if err != nil {
// if we didn't get the initial deployment state, bail out
return fmt.Errorf("Error getting state for deployment %s (%s)", deployName, err)
}
// if there is 1 or more replica, we're done waiting
if moreThanPtr(deployment.Spec.Replicas, 0) {
return nil
}

watcher := deployCache.Watch(deployName)
if err != nil {
return fmt.Errorf("Error getting the stream of deployment changes")
}
defer watcher.Stop()
eventCh := watcher.ResultChan()
timer := time.NewTimer(totalWait)
defer timer.Stop()
for {
select {
case event := <-eventCh:
deployment := event.Object.(*appsv1.Deployment)
if err != nil {
log.Printf(
"Error getting deployment %s after change was triggered (%s)",
deployName,
err,
)
}
if moreThanPtr(deployment.Spec.Replicas, 0) {
return nil
}
case <-timer.C:
// otherwise, if we hit the end of the timeout, fail
return fmt.Errorf("Timeout expired waiting for deployment %s to reach > 0 replicas", deployName)
}
}
}
}
106 changes: 106 additions & 0 deletions interceptor/forward_wait_func_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main

import (
"context"
"testing"
"time"

"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/watch"
)

// Test to make sure the wait function returns a nil error if there is immediately
// one replica on the target deployment
func TestForwardWaitFuncOneReplica(t *testing.T) {
r := require.New(t)
const ns = "testNS"
const deployName = "TestForwardingHandlerDeploy"
cache := k8s.NewMemoryDeploymentCache(map[string]*appsv1.Deployment{
deployName: k8s.NewDeployment(
ns,
deployName,
"myimage",
[]int32{123},
nil,
map[string]string{},
),
})
waitFunc := newDeployReplicasForwardWaitFunc(
cache,
deployName,
1*time.Second,
)

ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer done()
group, ctx := errgroup.WithContext(ctx)
group.Go(waitFunc)
r.NoError(group.Wait())
}

// Test to make sure the wait function returns an error if there are no replicas, and that doesn't change
// within a timeout
func TestForwardWaitFuncNoReplicas(t *testing.T) {
r := require.New(t)
const ns = "testNS"
const deployName = "TestForwardingHandlerHoldsDeployment"
deployment := k8s.NewDeployment(
ns,
deployName,
"myimage",
[]int32{123},
nil,
map[string]string{},
)
deployment.Spec.Replicas = k8s.Int32P(0)
cache := k8s.NewMemoryDeploymentCache(map[string]*appsv1.Deployment{
deployName: deployment,
})

const timeout = 200 * time.Millisecond
waitFunc := newDeployReplicasForwardWaitFunc(
cache,
deployName,
1*time.Second,
)

err := waitFunc()
r.Error(err)
}

func TestWaitFuncWaitsUntilReplicas(t *testing.T) {
r := require.New(t)
totalWaitDur := 500 * time.Millisecond

const ns = "testNS"
const deployName = "TestForwardingHandlerHoldsDeployment"
deployment := k8s.NewDeployment(
ns,
deployName,
"myimage",
[]int32{123},
nil,
map[string]string{},
)
deployment.Spec.Replicas = k8s.Int32P(0)
cache := k8s.NewMemoryDeploymentCache(map[string]*appsv1.Deployment{
deployName: deployment,
})
waitFunc := newDeployReplicasForwardWaitFunc(cache, deployName, totalWaitDur)
// this channel will be closed immediately after the replicas were increased
replicasIncreasedCh := make(chan struct{})
go func() {
time.Sleep(totalWaitDur / 2)
cache.RWM.RLock()
defer cache.RWM.RUnlock()
watcher := cache.Watchers[deployName]
modifiedDeployment := deployment.DeepCopy()
modifiedDeployment.Spec.Replicas = k8s.Int32P(1)
watcher.Action(watch.Modified, modifiedDeployment)
close(replicasIncreasedCh)
}()
r.NoError(waitFunc())
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,10 @@ package main

import (
"net/http/httptest"
"testing"

echo "github.com/labstack/echo/v4"
"github.com/stretchr/testify/suite"
)

type InterceptorSuite struct {
suite.Suite
}

func TestInterceptor(t *testing.T) {
suite.Run(t, new(InterceptorSuite))
}

func newTestCtx(method, path string) (*echo.Echo, echo.Context, *httptest.ResponseRecorder) {
req := httptest.NewRequest(method, path, nil)
rec := httptest.NewRecorder()
Expand Down
Loading