Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make OTLP receiver listen on all IPs again #5739

Merged
merged 10 commits into from
Jul 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ func AddFlags(flags *flag.FlagSet) {
addGRPCFlags(flags, grpcServerFlagsCfg, ports.PortToHostPort(ports.CollectorGRPC))

flags.Bool(flagCollectorOTLPEnabled, true, "Enables OpenTelemetry OTLP receiver on dedicated HTTP and gRPC ports")
addHTTPFlags(flags, otlpServerFlagsCfg.HTTP, "")
addHTTPFlags(flags, otlpServerFlagsCfg.HTTP, ":4318")
corsOTLPFlags.AddFlags(flags)
addGRPCFlags(flags, otlpServerFlagsCfg.GRPC, "")
addGRPCFlags(flags, otlpServerFlagsCfg.GRPC, ":4317")

flags.String(flagZipkinHTTPHostPort, "", "The host:port (e.g. 127.0.0.1:9411 or :9411) of the collector's Zipkin server (disabled by default)")
flags.Bool(flagZipkinKeepAliveEnabled, true, "KeepAlive configures allow Keep-Alive for Zipkin HTTP server (enabled by default)")
Expand Down
1 change: 0 additions & 1 deletion cmd/collector/app/server/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func TestCollectorReflection(t *testing.T) {

grpctest.ReflectionServiceValidator{
HostPort: params.HostPortActual,
Server: server,
ExpectedServices: []string{
"jaeger.api_v2.CollectorService",
"jaeger.api_v2.SamplingManager",
Expand Down
12 changes: 8 additions & 4 deletions cmd/jaeger/internal/extension/jaegerquery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package jaegerquery
import (
"github.com/asaskevich/govalidator"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"

queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
Expand All @@ -18,10 +19,13 @@ var _ component.ConfigValidator = (*Config)(nil)
type Config struct {
queryApp.QueryOptionsBase `mapstructure:",squash"`

TraceStoragePrimary string `valid:"required" mapstructure:"trace_storage"`
TraceStorageArchive string `valid:"optional" mapstructure:"trace_storage_archive"`
confighttp.ServerConfig `mapstructure:",squash"`
Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
TraceStoragePrimary string `valid:"required" mapstructure:"trace_storage"`
TraceStorageArchive string `valid:"optional" mapstructure:"trace_storage_archive"`

HTTP confighttp.ServerConfig `mapstructure:",squash"`
GRPC configgrpc.ServerConfig `mapstructure:",squash"`

Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
}

func (cfg *Config) Validate() error {
Expand Down
10 changes: 9 additions & 1 deletion cmd/jaeger/internal/extension/jaegerquery/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/extension"

"github.com/jaegertracing/jaeger/ports"
Expand All @@ -25,9 +27,15 @@ func NewFactory() extension.Factory {

func createDefaultConfig() component.Config {
return &Config{
ServerConfig: confighttp.ServerConfig{
HTTP: confighttp.ServerConfig{
Endpoint: ports.PortToHostPort(ports.QueryHTTP),
},
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: ports.PortToHostPort(ports.QueryGRPC),
Transport: confignet.TransportTypeTCP,
},
},
}
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/jaegertracing/jaeger/pkg/telemetery"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/ports"
)

var (
Expand Down Expand Up @@ -127,9 +126,10 @@ func (s *server) makeQueryOptions() *queryApp.QueryOptions {
return &queryApp.QueryOptions{
QueryOptionsBase: s.config.QueryOptionsBase,

// TODO expose via config
HTTPHostPort: ports.PortToHostPort(ports.QueryHTTP),
GRPCHostPort: ports.PortToHostPort(ports.QueryGRPC),
// TODO utilize OTEL helpers for creating HTTP/GRPC servers
HTTPHostPort: s.config.HTTP.Endpoint,
GRPCHostPort: s.config.GRPC.NetAddr.Endpoint,
// TODO handle TLS
}
}

Expand Down
33 changes: 31 additions & 2 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package jaegerquery
import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -17,6 +19,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/internal/grpctest"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
Expand Down Expand Up @@ -160,14 +163,40 @@ func TestServerStart(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
telemetrySettings := component.TelemetrySettings{
Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
ReportStatus: func(*component.StatusEvent) {},
}
tt.config.HTTP.Endpoint = ":0"
tt.config.GRPC.NetAddr.Endpoint = ":0"
server := newServer(tt.config, telemetrySettings)
err := server.Start(context.Background(), host)

if tt.expectedErr == "" {
require.NoError(t, err)
defer server.Shutdown(context.Background())
// We need to wait for servers to become available.
// Otherwise, we could call shutdown before the servers are even started,
// which could cause flaky code coverage by going through error cases.
require.Eventually(t,
func() bool {
resp, err := http.Get(fmt.Sprintf("http://%s/", server.server.HTTPAddr()))
if err != nil {
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
},
10*time.Second,
100*time.Millisecond,
"server not started")
grpctest.ReflectionServiceValidator{
HostPort: server.server.GRPCAddr(),
ExpectedServices: []string{
"jaeger.api_v2.QueryService",
"jaeger.api_v3.QueryService",
"jaeger.api_v2.metrics.MetricsQueryService",
"grpc.health.v1.Health",
},
}.Execute(t)
} else {
require.ErrorContains(t, err, tt.expectedErr)
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/query/app/handler_archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestGetArchivedTrace_NotFound(t *testing.T) {
} {
tc := tc // capture loop var
t.Run(tc.name, func(t *testing.T) {
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
var response structuredResponse
Expand All @@ -66,7 +66,7 @@ func TestGetArchivedTraceSuccess(t *testing.T) {
mockReader := &spanstoremocks.Reader{}
mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
// make main reader return NotFound
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
Expand All @@ -81,7 +81,7 @@ func TestGetArchivedTraceSuccess(t *testing.T) {

// Test failure in parsing trace ID.
func TestArchiveTrace_BadTraceID(t *testing.T) {
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
var response structuredResponse
err := postJSON(ts.server.URL+"/api/archive/badtraceid", []string{}, &response)
require.Error(t, err)
Expand All @@ -95,7 +95,7 @@ func TestArchiveTrace_TraceNotFound(t *testing.T) {
Return(nil, spanstore.ErrTraceNotFound).Once()
mockWriter := &spanstoremocks.Writer{}
// Not actually going to write the trace, so no need to define mockWriter action
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
// make main reader return NotFound
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
Expand All @@ -106,7 +106,7 @@ func TestArchiveTrace_TraceNotFound(t *testing.T) {
}

func TestArchiveTrace_NoStorage(t *testing.T) {
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
var response structuredResponse
err := postJSON(ts.server.URL+"/api/archive/"+mockTraceID.String(), []string{}, &response)
require.EqualError(t, err, `500 error from server: {"data":null,"total":0,"limit":0,"offset":0,"errors":[{"code":500,"msg":"archive span storage was not configured"}]}`+"\n")
Expand All @@ -117,7 +117,7 @@ func TestArchiveTrace_Success(t *testing.T) {
mockWriter := &spanstoremocks.Writer{}
mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(nil).Times(2)
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
var response structuredResponse
Expand All @@ -130,7 +130,7 @@ func TestArchiveTrace_WriteErrors(t *testing.T) {
mockWriter := &spanstoremocks.Writer{}
mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(errors.New("cannot save")).Times(2)
withTestServer(func(ts *testServer) {
withTestServer(t, func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
var response structuredResponse
Expand Down
19 changes: 8 additions & 11 deletions cmd/query/app/handler_deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,7 @@ func TestFilterDependencies(t *testing.T) {
}

func TestGetDependenciesSuccess(t *testing.T) {
ts := initializeTestServer()
defer ts.server.Close()
ts := initializeTestServer(t)
expectedDependencies := []model.DependencyLink{{Parent: "killer", Child: "queen", CallCount: 12}}
endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier)
ts.dependencyReader.On("GetDependencies",
Expand All @@ -332,29 +331,27 @@ func TestGetDependenciesSuccess(t *testing.T) {
}

func TestGetDependenciesCassandraFailure(t *testing.T) {
ts := initializeTestServer()
defer ts.server.Close()
ts := initializeTestServer(t)
endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier)
ts.dependencyReader.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1)
ts.dependencyReader.On("GetDependencies",
mock.Anything, // context
endTs,
defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1)

var response structuredResponse
err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing", &response)
require.Error(t, err)
}

func TestGetDependenciesEndTimeParsingFailure(t *testing.T) {
ts := initializeTestServer()
defer ts.server.Close()

ts := initializeTestServer(t)
var response structuredResponse
err := getJSON(ts.server.URL+"/api/dependencies?endTs=shazbot&service=testing", &response)
require.Error(t, err)
}

func TestGetDependenciesLookbackParsingFailure(t *testing.T) {
ts := initializeTestServer()
defer ts.server.Close()

ts := initializeTestServer(t)
var response structuredResponse
err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing&lookback=shazbot", &response)
require.Error(t, err)
Expand Down
Loading
Loading