Skip to content

Commit

Permalink
Fix kourier tls runtime tests (#15732)
Browse files Browse the repository at this point in the history
  • Loading branch information
skonto authored Feb 3, 2025
1 parent ff8a4ab commit 9c35416
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 26 deletions.
7 changes: 1 addition & 6 deletions .github/workflows/kind-e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,13 @@ jobs:
- name: Test ${{ matrix.test-suite }}
run: |
FEATURE_FLAGS="-enable-alpha -enable-beta"
if [[ "${{ matrix.ingress}}" == "kourier-tls" ]] && [[ "${{ matrix.test-suite }}" == "runtime" ]]; then
# Disabled due to flakiness: https://github.com/knative/serving/issues/15697
FEATURE_FLAGS="$FEATURE_FLAGS -disable-optional-api"
fi
gotestsum --format testname -- \
-race -count=1 -parallel=1 -tags=e2e \
-timeout=30m \
${{ matrix.test-path }} \
-skip-cleanup-on-fail \
-disable-logstream \
$FEATURE_FLAGS \
-enable-alpha -enable-beta \
--ingress-class=${{ matrix.ingress-class || matrix.ingress }}.ingress.networking.knative.dev \
${{ matrix.test-flags }}
Expand Down
18 changes: 11 additions & 7 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (

// Throttler is the interface that Handler calls to Try to proxy the user request.
type Throttler interface {
Try(ctx context.Context, revID types.NamespacedName, fn func(string) error) error
Try(ctx context.Context, revID types.NamespacedName, fn func(string, bool) error) error
}

// activationHandler will wait for an active endpoint for a revision
Expand Down Expand Up @@ -87,14 +87,14 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

revID := RevIDFrom(r.Context())
if err := a.throttler.Try(tryContext, revID, func(dest string) error {
if err := a.throttler.Try(tryContext, revID, func(dest string, isClusterIP bool) error {
trySpan.End()

proxyCtx, proxySpan := r.Context(), (*trace.Span)(nil)
if tracingEnabled {
proxyCtx, proxySpan = trace.StartSpan(r.Context(), "activator_proxy")
}
a.proxyRequest(revID, w, r.WithContext(proxyCtx), dest, tracingEnabled, a.usePassthroughLb)
a.proxyRequest(revID, w, r.WithContext(proxyCtx), dest, tracingEnabled, a.usePassthroughLb, isClusterIP)
proxySpan.End()

return nil
Expand All @@ -114,7 +114,7 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.ResponseWriter,
r *http.Request, target string, tracingEnabled bool, usePassthroughLb bool,
r *http.Request, target string, tracingEnabled bool, usePassthroughLb bool, isClusterIP bool,
) {
netheader.RewriteHostIn(r)
r.Header.Set(netheader.ProxyKey, activator.Name)
Expand All @@ -127,7 +127,11 @@ func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.Resp

var proxy *httputil.ReverseProxy
if a.tls {
proxy = pkghttp.NewHeaderPruningReverseProxy(useSecurePort(target), hostOverride, activator.RevisionHeaders, true /* uss HTTPS */)
tlsTargetPort := networking.BackendHTTPSPort
if isClusterIP {
tlsTargetPort = 443
}
proxy = pkghttp.NewHeaderPruningReverseProxy(useSecurePort(target, tlsTargetPort), hostOverride, activator.RevisionHeaders, true /* uss HTTPS */)
} else {
proxy = pkghttp.NewHeaderPruningReverseProxy(target, hostOverride, activator.RevisionHeaders, false /* use HTTPS */)
}
Expand All @@ -148,9 +152,9 @@ func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.Resp
// useSecurePort replaces the default port with HTTPS port (8112).
// TODO: endpointsToDests() should support HTTPS instead of this overwrite but it needs metadata request to be encrypted.
// This code should be removed when https://github.com/knative/serving/issues/12821 was solved.
func useSecurePort(target string) string {
func useSecurePort(target string, port int) string {
target = strings.Split(target, ":")[0]
return target + ":" + strconv.Itoa(networking.BackendHTTPSPort)
return target + ":" + strconv.Itoa(port)
}

func WrapActivatorHandlerWithFullDuplex(h http.Handler, logger *zap.SugaredLogger) http.HandlerFunc {
Expand Down
4 changes: 2 additions & 2 deletions pkg/activator/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ type fakeThrottler struct {
err error
}

func (ft fakeThrottler) Try(_ context.Context, _ types.NamespacedName, f func(string) error) error {
func (ft fakeThrottler) Try(_ context.Context, _ types.NamespacedName, f func(string, bool) error) error {
if ft.err != nil {
return ft.err
}
return f("10.10.10.10:1234")
return f("10.10.10.10:1234", false)
}

func TestActivationHandler(t *testing.T) {
Expand Down
15 changes: 8 additions & 7 deletions pkg/activator/net/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,18 @@ func noop() {}

// Returns a dest that at the moment of choosing had an open slot
// for request.
func (rt *revisionThrottler) acquireDest(ctx context.Context) (func(), *podTracker) {
func (rt *revisionThrottler) acquireDest(ctx context.Context) (func(), *podTracker, bool) {
rt.mux.RLock()
defer rt.mux.RUnlock()

if rt.clusterIPTracker != nil {
return noop, rt.clusterIPTracker
return noop, rt.clusterIPTracker, true
}
return rt.lbPolicy(ctx, rt.assignedTrackers)
f, lbTracker := rt.lbPolicy(ctx, rt.assignedTrackers)
return f, lbTracker, false
}

func (rt *revisionThrottler) try(ctx context.Context, function func(string) error) error {
func (rt *revisionThrottler) try(ctx context.Context, function func(dest string, isClusterIP bool) error) error {
var ret error

// Retrying infinitely as long as we receive no dest. Outer semaphore and inner
Expand All @@ -229,7 +230,7 @@ func (rt *revisionThrottler) try(ctx context.Context, function func(string) erro
for reenqueue {
reenqueue = false
if err := rt.breaker.Maybe(ctx, func() {
cb, tracker := rt.acquireDest(ctx)
cb, tracker, isClusterIP := rt.acquireDest(ctx)
if tracker == nil {
// This can happen if individual requests raced each other or if pod
// capacity was decreased after passing the outer semaphore.
Expand All @@ -238,7 +239,7 @@ func (rt *revisionThrottler) try(ctx context.Context, function func(string) erro
}
defer cb()
// We already reserved a guaranteed spot. So just execute the passed functor.
ret = function(tracker.dest)
ret = function(tracker.dest, isClusterIP)
}); err != nil {
return err
}
Expand Down Expand Up @@ -518,7 +519,7 @@ func (t *Throttler) run(updateCh <-chan revisionDestsUpdate) {
}

// Try waits for capacity and then executes function, passing in a l4 dest to send a request
func (t *Throttler) Try(ctx context.Context, revID types.NamespacedName, function func(string) error) error {
func (t *Throttler) Try(ctx context.Context, revID types.NamespacedName, function func(dest string, isClusterIP bool) error) error {
rt, err := t.getOrCreateRevisionThrottler(revID)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/activator/net/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,13 @@ func TestThrottlerErrorNoRevision(t *testing.T) {
})

// Make sure it now works.
if err := throttler.Try(ctx, revID, func(string) error { return nil }); err != nil {
if err := throttler.Try(ctx, revID, func(string, bool) error { return nil }); err != nil {
t.Fatalf("Try() = %v, want no error", err)
}

// Make sure errors are propagated correctly.
innerError := errors.New("inner")
if err := throttler.Try(ctx, revID, func(string) error { return innerError }); !errors.Is(err, innerError) {
if err := throttler.Try(ctx, revID, func(string, bool) error { return innerError }); !errors.Is(err, innerError) {
t.Fatalf("Try() = %v, want %v", err, innerError)
}

Expand All @@ -331,7 +331,7 @@ func TestThrottlerErrorNoRevision(t *testing.T) {
// Eventually it should now fail.
var lastError error
wait.PollUntilContextCancel(ctx, 10*time.Millisecond, false, func(context.Context) (bool, error) {
lastError = throttler.Try(ctx, revID, func(string) error { return nil })
lastError = throttler.Try(ctx, revID, func(string, bool) error { return nil })
return lastError != nil, nil
})
if lastError == nil || lastError.Error() != `revision.serving.knative.dev "test-revision" not found` {
Expand Down Expand Up @@ -915,7 +915,7 @@ func (t *Throttler) try(ctx context.Context, requests int, try func(string) erro
for range requests {
go func() {
var result tryResult
if err := t.Try(ctx, revID, func(dest string) error {
if err := t.Try(ctx, revID, func(dest string, _ bool) error {
result = tryResult{dest: dest}
return try(dest)
}); err != nil {
Expand Down

0 comments on commit 9c35416

Please sign in to comment.