Skip to content

Commit

Permalink
feat: expose pprof endpoints from multiinstance diagnostics server
Browse files Browse the repository at this point in the history
  • Loading branch information
czeslavo committed Feb 7, 2025
1 parent 18d8819 commit d7df774
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 3 deletions.
32 changes: 30 additions & 2 deletions pkg/manager/multiinstance/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"net/http/pprof"
"sync"
"time"

Expand All @@ -23,17 +24,39 @@ const (
type DiagnosticsServer struct {
listenerPort int
handlers map[manager.ID]http.Handler
pprofMux *http.ServeMux

muxLock sync.Mutex
mux *http.ServeMux
}

func NewDiagnosticsServer(listenerPort int) *DiagnosticsServer {
return &DiagnosticsServer{
// DiagnosticsServerOption is a functional option for configuring the DiagnosticsServer.
type DiagnosticsServerOption func(*DiagnosticsServer)

func WithPprofHandler() DiagnosticsServerOption {
return func(s *DiagnosticsServer) {
s.pprofMux = http.NewServeMux()
s.pprofMux.HandleFunc("/", pprof.Index)
s.pprofMux.HandleFunc("/{action}", pprof.Index)
s.pprofMux.HandleFunc("/cmdline", pprof.Cmdline)
s.pprofMux.HandleFunc("/profile", pprof.Profile)
s.pprofMux.HandleFunc("/symbol", pprof.Symbol)
s.pprofMux.HandleFunc("/trace", pprof.Trace)
}
}

func NewDiagnosticsServer(listenerPort int, opts ...DiagnosticsServerOption) *DiagnosticsServer {
s := &DiagnosticsServer{
listenerPort: listenerPort,
handlers: make(map[manager.ID]http.Handler),
mux: http.NewServeMux(),
}

for _, opt := range opts {
opt(s)
}

return s
}

// Start starts the diagnostics server.
Expand Down Expand Up @@ -79,6 +102,11 @@ func (s *DiagnosticsServer) UnregisterInstance(instanceID manager.ID) {
// rebuildMux rebuilds the mux with the current handlers. It should be called with the muxLock held.
func (s *DiagnosticsServer) rebuildMux() {
s.mux = http.NewServeMux()

if s.pprofMux != nil {
s.mux.Handle("/debug/pprof/", http.StripPrefix("/debug/pprof", s.pprofMux))
}

for instanceID, handler := range s.handlers {
// It's possible an instance doesn't have a diagnostics handler. Handle that gracefully.
if handler == nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/manager/multiinstance/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package multiinstance
import (
"context"
"net/http"
"runtime/pprof"
"sync"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -154,7 +155,12 @@ func (m *Manager) runInstance(ctx context.Context, instanceID manager.ID) {
}

m.logger.Info("Starting instance", "instanceID", instanceID)
go in.Run(ctx)

// Wrap with pprof.Do to add instanceID to the pprof labels. That will make it easier to identify which instance
// is responsible for the CPU consumption.
pprof.Do(ctx, pprof.Labels("instanceID", instanceID.String()), func(ctx context.Context) {
go in.Run(ctx)
})

// If diagnostics are enabled, register the instance with the diagnostics exposer.
if m.diagnosticsExposer != nil {
Expand Down
47 changes: 47 additions & 0 deletions test/envtest/multiinstance_manager_diagnostics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import (
"time"

"github.com/go-logr/logr/testr"
"github.com/google/uuid"
kongv1 "github.com/kong/kubernetes-configuration/api/configuration/v1"

Check failure on line 12 in test/envtest/multiinstance_manager_diagnostics_test.go

View workflow job for this annotation

GitHub Actions / linters / lint

File is not properly formatted (gci)
"github.com/kong/kubernetes-configuration/pkg/clientset"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Check failure on line 18 in test/envtest/multiinstance_manager_diagnostics_test.go

View workflow job for this annotation

GitHub Actions / linters / lint

File is not properly formatted (gci)
"github.com/kong/kubernetes-configuration/pkg/clientset/scheme"

Expand Down Expand Up @@ -72,3 +77,45 @@ func TestMultiInstanceManagerDiagnostics(t *testing.T) {
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}, waitTime, tickTime, "diagnostics should no longer be available after stopping the instance")
}

func TestMultiInstanceManager_Profiling(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

envcfg := Setup(t, scheme.Scheme)
diagPort := helpers.GetFreePort(t)
t.Logf("Diagnostics port: %d", diagPort)

t.Log("Starting the diagnostics server and the multi-instance manager")
diagServer := multiinstance.NewDiagnosticsServer(diagPort, multiinstance.WithPprofHandler())
go func() {
require.ErrorIs(t, diagServer.Start(ctx), http.ErrServerClosed)
}()
multimgr := multiinstance.NewManager(testr.New(t), multiinstance.WithDiagnosticsExposer(diagServer))
go func() {
require.NoError(t, multimgr.Run(ctx))
}()

m1 := SetupManager(ctx, t, lo.Must(manager.NewID("cp-1")), envcfg, AdminAPIOptFns(), WithDiagnosticsWithoutServer())
m2 := SetupManager(ctx, t, lo.Must(manager.NewID("cp-2")), envcfg, AdminAPIOptFns(), WithDiagnosticsWithoutServer())

require.NoError(t, multimgr.ScheduleInstance(m1))
require.NoError(t, multimgr.ScheduleInstance(m2))

cl, err := clientset.NewForConfig(envcfg)
require.NoError(t, err)
for range time.Tick(100 * time.Millisecond) {
_, err := cl.ConfigurationV1().KongConsumers("default").Create(ctx, &kongv1.KongConsumer{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-consumer-",
},
Username: "test-consumer-" + uuid.NewString(),
}, metav1.CreateOptions{})
require.NoError(t, err)
t.Log("Created a consumer")
}

// TODO(czeslavo): verify in this test that CPU profile data contains labels for each instance
}

0 comments on commit d7df774

Please sign in to comment.