Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix kourier tls runtime tests #15732

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions .github/workflows/kind-e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,13 @@ jobs:

- name: Test ${{ matrix.test-suite }}
run: |
FEATURE_FLAGS="-enable-alpha -enable-beta"
if [[ "${{ matrix.ingress}}" == "kourier-tls" ]] && [[ "${{ matrix.test-suite }}" == "runtime" ]]; then
# Disabled due to flakiness: https://github.com/knative/serving/issues/15697
FEATURE_FLAGS="$FEATURE_FLAGS -disable-optional-api"
fi
gotestsum --format testname -- \
-race -count=1 -parallel=1 -tags=e2e \
-timeout=30m \
${{ matrix.test-path }} \
-skip-cleanup-on-fail \
-disable-logstream \
$FEATURE_FLAGS \
-enable-alpha -enable-beta \
--ingress-class=${{ matrix.ingress-class || matrix.ingress }}.ingress.networking.knative.dev \
${{ matrix.test-flags }}

Expand Down
18 changes: 11 additions & 7 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (

// Throttler is the interface that Handler calls to Try to proxy the user request.
type Throttler interface {
Try(ctx context.Context, revID types.NamespacedName, fn func(string) error) error
Try(ctx context.Context, revID types.NamespacedName, fn func(string, bool) error) error
}

// activationHandler will wait for an active endpoint for a revision
Expand Down Expand Up @@ -87,14 +87,14 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

revID := RevIDFrom(r.Context())
if err := a.throttler.Try(tryContext, revID, func(dest string) error {
if err := a.throttler.Try(tryContext, revID, func(dest string, isClusterIP bool) error {
trySpan.End()

proxyCtx, proxySpan := r.Context(), (*trace.Span)(nil)
if tracingEnabled {
proxyCtx, proxySpan = trace.StartSpan(r.Context(), "activator_proxy")
}
a.proxyRequest(revID, w, r.WithContext(proxyCtx), dest, tracingEnabled, a.usePassthroughLb)
a.proxyRequest(revID, w, r.WithContext(proxyCtx), dest, tracingEnabled, a.usePassthroughLb, isClusterIP)
proxySpan.End()

return nil
Expand All @@ -114,7 +114,7 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.ResponseWriter,
r *http.Request, target string, tracingEnabled bool, usePassthroughLb bool,
r *http.Request, target string, tracingEnabled bool, usePassthroughLb bool, isClusterIP bool,
) {
netheader.RewriteHostIn(r)
r.Header.Set(netheader.ProxyKey, activator.Name)
Expand All @@ -127,7 +127,11 @@ func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.Resp

var proxy *httputil.ReverseProxy
if a.tls {
proxy = pkghttp.NewHeaderPruningReverseProxy(useSecurePort(target), hostOverride, activator.RevisionHeaders, true /* uss HTTPS */)
tlsTargetPort := networking.BackendHTTPSPort
if isClusterIP {
tlsTargetPort = 443
}
proxy = pkghttp.NewHeaderPruningReverseProxy(useSecurePort(target, tlsTargetPort), hostOverride, activator.RevisionHeaders, true /* uss HTTPS */)
} else {
proxy = pkghttp.NewHeaderPruningReverseProxy(target, hostOverride, activator.RevisionHeaders, false /* use HTTPS */)
}
Expand All @@ -148,9 +152,9 @@ func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.Resp
// useSecurePort replaces the default port with HTTPS port (8112).
// TODO: endpointsToDests() should support HTTPS instead of this overwrite but it needs metadata request to be encrypted.
// This code should be removed when https://github.com/knative/serving/issues/12821 was solved.
func useSecurePort(target string) string {
func useSecurePort(target string, port int) string {
target = strings.Split(target, ":")[0]
return target + ":" + strconv.Itoa(networking.BackendHTTPSPort)
return target + ":" + strconv.Itoa(port)
}

func WrapActivatorHandlerWithFullDuplex(h http.Handler, logger *zap.SugaredLogger) http.HandlerFunc {
Expand Down
4 changes: 2 additions & 2 deletions pkg/activator/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ type fakeThrottler struct {
err error
}

func (ft fakeThrottler) Try(_ context.Context, _ types.NamespacedName, f func(string) error) error {
func (ft fakeThrottler) Try(_ context.Context, _ types.NamespacedName, f func(string, bool) error) error {
if ft.err != nil {
return ft.err
}
return f("10.10.10.10:1234")
return f("10.10.10.10:1234", false)
}

func TestActivationHandler(t *testing.T) {
Expand Down
15 changes: 8 additions & 7 deletions pkg/activator/net/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,18 @@ func noop() {}

// Returns a dest that at the moment of choosing had an open slot
// for request.
func (rt *revisionThrottler) acquireDest(ctx context.Context) (func(), *podTracker) {
func (rt *revisionThrottler) acquireDest(ctx context.Context) (func(), *podTracker, bool) {
rt.mux.RLock()
defer rt.mux.RUnlock()

if rt.clusterIPTracker != nil {
return noop, rt.clusterIPTracker
return noop, rt.clusterIPTracker, true
}
return rt.lbPolicy(ctx, rt.assignedTrackers)
f, lbTracker := rt.lbPolicy(ctx, rt.assignedTrackers)
return f, lbTracker, false
}

func (rt *revisionThrottler) try(ctx context.Context, function func(string) error) error {
func (rt *revisionThrottler) try(ctx context.Context, function func(dest string, isClusterIP bool) error) error {
var ret error

// Retrying infinitely as long as we receive no dest. Outer semaphore and inner
Expand All @@ -229,7 +230,7 @@ func (rt *revisionThrottler) try(ctx context.Context, function func(string) erro
for reenqueue {
reenqueue = false
if err := rt.breaker.Maybe(ctx, func() {
cb, tracker := rt.acquireDest(ctx)
cb, tracker, isClusterIP := rt.acquireDest(ctx)
if tracker == nil {
// This can happen if individual requests raced each other or if pod
// capacity was decreased after passing the outer semaphore.
Expand All @@ -238,7 +239,7 @@ func (rt *revisionThrottler) try(ctx context.Context, function func(string) erro
}
defer cb()
// We already reserved a guaranteed spot. So just execute the passed functor.
ret = function(tracker.dest)
ret = function(tracker.dest, isClusterIP)
}); err != nil {
return err
}
Expand Down Expand Up @@ -518,7 +519,7 @@ func (t *Throttler) run(updateCh <-chan revisionDestsUpdate) {
}

// Try waits for capacity and then executes function, passing in a l4 dest to send a request
func (t *Throttler) Try(ctx context.Context, revID types.NamespacedName, function func(string) error) error {
func (t *Throttler) Try(ctx context.Context, revID types.NamespacedName, function func(dest string, isClusterIP bool) error) error {
rt, err := t.getOrCreateRevisionThrottler(revID)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/activator/net/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,13 @@ func TestThrottlerErrorNoRevision(t *testing.T) {
})

// Make sure it now works.
if err := throttler.Try(ctx, revID, func(string) error { return nil }); err != nil {
if err := throttler.Try(ctx, revID, func(string, bool) error { return nil }); err != nil {
t.Fatalf("Try() = %v, want no error", err)
}

// Make sure errors are propagated correctly.
innerError := errors.New("inner")
if err := throttler.Try(ctx, revID, func(string) error { return innerError }); !errors.Is(err, innerError) {
if err := throttler.Try(ctx, revID, func(string, bool) error { return innerError }); !errors.Is(err, innerError) {
t.Fatalf("Try() = %v, want %v", err, innerError)
}

Expand All @@ -331,7 +331,7 @@ func TestThrottlerErrorNoRevision(t *testing.T) {
// Eventually it should now fail.
var lastError error
wait.PollUntilContextCancel(ctx, 10*time.Millisecond, false, func(context.Context) (bool, error) {
lastError = throttler.Try(ctx, revID, func(string) error { return nil })
lastError = throttler.Try(ctx, revID, func(string, bool) error { return nil })
return lastError != nil, nil
})
if lastError == nil || lastError.Error() != `revision.serving.knative.dev "test-revision" not found` {
Expand Down Expand Up @@ -915,7 +915,7 @@ func (t *Throttler) try(ctx context.Context, requests int, try func(string) erro
for range requests {
go func() {
var result tryResult
if err := t.Try(ctx, revID, func(dest string) error {
if err := t.Try(ctx, revID, func(dest string, _ bool) error {
result = tryResult{dest: dest}
return try(dest)
}); err != nil {
Expand Down
Loading