Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp: Support primary and archive storage #4873

Merged
merged 1 commit into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/jaeger-v2/config-ui.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"archiveEnabled": true
}
4 changes: 4 additions & 0 deletions cmd/jaeger-v2/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ extensions:

jaeger_query:
trace_storage: memstore
trace_storage_archive: memstore_archive
ui_config: ./cmd/jaeger-v2/config-ui.json

jaeger_storage:
memory:
memstore:
max_traces: 100000
memstore_archive:
max_traces: 100000


receivers:
Expand Down
12 changes: 3 additions & 9 deletions cmd/jaeger-v2/internal/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package internal

import (
"log"
"strings"

"github.com/spf13/cobra"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -45,14 +44,9 @@ func Command() *cobra.Command {
// back to the official RunE.
otelRunE := cmd.RunE
cmd.RunE = func(cmd *cobra.Command, args []string) error {
configProvided := false
for _, arg := range args {
if strings.HasPrefix(arg, "--config") {
configProvided = true
break
}
}
if configProvided {
// a bit of a hack to check if '--config' flag was present
configFlag := cmd.Flag("config").Value.String()
if configFlag != "" && configFlag != "[]" {
return otelRunE(cmd, args)
}
log.Print("No '--config' flags detected, using default All-in-One configuration with memory storage.")
Expand Down
6 changes: 5 additions & 1 deletion cmd/jaeger-v2/internal/extension/jaegerquery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"

queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/pkg/tenancy"
)

var _ component.ConfigValidator = (*Config)(nil)

// Config represents the configuration for jaeger-query,
type Config struct {
TraceStorage string `valid:"required" mapstructure:"trace_storage"`
queryApp.QueryOptionsBase `mapstructure:",squash"`

TraceStoragePrimary string `valid:"required" mapstructure:"trace_storage"`
TraceStorageArchive string `valid:"optional" mapstructure:"trace_storage_archive"`
confighttp.HTTPServerSettings `mapstructure:",squash"`
Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger-v2/internal/extension/jaegerquery/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewFactory() extension.Factory {

func createDefaultConfig() component.Config {
return &Config{
TraceStorage: jaegerstorage.DefaultMemoryStore,
TraceStoragePrimary: jaegerstorage.DefaultMemoryStore,
HTTPServerSettings: confighttp.HTTPServerSettings{
Endpoint: ports.PortToHostPort(ports.QueryHTTP),
},
Expand Down
35 changes: 29 additions & 6 deletions cmd/jaeger-v2/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func newServer(config *Config, otel component.TelemetrySettings) *server {
}

func (s *server) Start(ctx context.Context, host component.Host) error {
f, err := jaegerstorage.GetStorageFactory(s.config.TraceStorage, host)
f, err := jaegerstorage.GetStorageFactory(s.config.TraceStoragePrimary, host)
if err != nil {
return fmt.Errorf("cannot find storage factory: %w", err)
return fmt.Errorf("cannot find primary storage %s: %w", s.config.TraceStoragePrimary, err)
}

spanReader, err := f.CreateSpanReader()
Expand All @@ -53,7 +53,11 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
return fmt.Errorf("cannot create dependencies reader: %w", err)
}

qs := querysvc.NewQueryService(spanReader, depReader, querysvc.QueryServiceOptions{})
var opts querysvc.QueryServiceOptions
if err := s.addArchiveStorage(&opts, host); err != nil {
return err
}
qs := querysvc.NewQueryService(spanReader, depReader, opts)
metricsQueryService, _ := disabled.NewMetricsReader()
tm := tenancy.NewManager(&s.config.Tenancy)

Expand All @@ -63,7 +67,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
s.logger,
qs,
metricsQueryService,
makeQueryOptions(),
s.makeQueryOptions(),
tm,
jtracer.NoOp(),
)
Expand All @@ -78,9 +82,28 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
return nil
}

func makeQueryOptions() *queryApp.QueryOptions {
func (s *server) addArchiveStorage(opts *querysvc.QueryServiceOptions, host component.Host) error {
if s.config.TraceStorageArchive == "" {
s.logger.Info("Archive storage not configured")
return nil
}

f, err := jaegerstorage.GetStorageFactory(s.config.TraceStorageArchive, host)
if err != nil {
return fmt.Errorf("cannot find archive storage factory: %w", err)
}

if !opts.InitArchiveStorage(f, s.logger) {
s.logger.Info("Archive storage not initialized")
}
return nil
}

func (s *server) makeQueryOptions() *queryApp.QueryOptions {
return &queryApp.QueryOptions{
// TODO
QueryOptionsBase: s.config.QueryOptionsBase,

// TODO expose via config
HTTPHostPort: ports.PortToHostPort(ports.QueryHTTP),
GRPCHostPort: ports.PortToHostPort(ports.QueryGRPC),
}
Expand Down
49 changes: 31 additions & 18 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,25 @@ var tlsHTTPFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "query.http",
}

// QueryOptions holds configuration for query service
type QueryOptions struct {
// HTTPHostPort is the host:port address that the query service listens in on for http requests
HTTPHostPort string
// GRPCHostPort is the host:port address that the query service listens in on for gRPC requests
GRPCHostPort string
// BasePath is the prefix for all UI and API HTTP routes
// QueryOptionsStaticAssets contains configuration for handling static assets
type QueryOptionsStaticAssets struct {
// Path is the path for the static assets for the UI (https://github.com/uber/jaeger-ui)
Path string `valid:"optional" mapstructure:"path"`
// LogAccess tells static handler to log access to static assets, useful in debugging
LogAccess bool `valid:"optional" mapstructure:"log_access"`
}

// QueryOptionsBase holds configuration for query service shared with jaeger-v2
type QueryOptionsBase struct {
// BasePath is the base path for all HTTP routes
BasePath string
// StaticAssets is the path for the static assets for the UI (https://github.com/uber/jaeger-ui)
StaticAssets string
// LogStaticAssetsAccess tells static handler to log access to static assets, useful in debugging
LogStaticAssetsAccess bool

StaticAssets QueryOptionsStaticAssets `valid:"optional" mapstructure:"static_assets"`

// UIConfig is the path to a configuration file for the UI
UIConfig string
UIConfig string `valid:"optional" mapstructure:"ui_config"`
// BearerTokenPropagation activate/deactivate bearer token propagation to storage
BearerTokenPropagation bool
// TLSGRPC configures secure transport (Consumer to Query service GRPC API)
TLSGRPC tlscfg.Options
// TLSHTTP configures secure transport (Consumer to Query service HTTP API)
TLSHTTP tlscfg.Options
// AdditionalHeaders
AdditionalHeaders http.Header
// MaxClockSkewAdjust is the maximum duration by which jaeger-query will adjust a span
Expand All @@ -88,6 +87,20 @@ type QueryOptions struct {
EnableTracing bool
}

// QueryOptions holds configuration for query service
type QueryOptions struct {
QueryOptionsBase

// HTTPHostPort is the host:port address that the query service listens in on for http requests
HTTPHostPort string
// GRPCHostPort is the host:port address that the query service listens in on for gRPC requests
GRPCHostPort string
// TLSGRPC configures secure transport (Consumer to Query service GRPC API)
TLSGRPC tlscfg.Options
// TLSHTTP configures secure transport (Consumer to Query service HTTP API)
TLSHTTP tlscfg.Options
}

// AddFlags adds flags for QueryOptions
func AddFlags(flagSet *flag.FlagSet) {
flagSet.Var(&config.StringSlice{}, queryAdditionalHeaders, `Additional HTTP response headers. Can be specified multiple times. Format: "Key: Value"`)
Expand Down Expand Up @@ -119,8 +132,8 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q
return qOpts, fmt.Errorf("failed to process HTTP TLS options: %w", err)
}
qOpts.BasePath = v.GetString(queryBasePath)
qOpts.StaticAssets = v.GetString(queryStaticFiles)
qOpts.LogStaticAssetsAccess = v.GetBool(queryLogStaticAssetsAccess)
qOpts.StaticAssets.Path = v.GetString(queryStaticFiles)
qOpts.StaticAssets.LogAccess = v.GetBool(queryLogStaticAssetsAccess)
qOpts.UIConfig = v.GetString(queryUIConfig)
qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation)

Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func TestQueryBuilderFlags(t *testing.T) {
})
qOpts, err := new(QueryOptions).InitFromViper(v, zap.NewNop())
require.NoError(t, err)
assert.Equal(t, "/dev/null", qOpts.StaticAssets)
assert.True(t, qOpts.LogStaticAssetsAccess)
assert.Equal(t, "/dev/null", qOpts.StaticAssets.Path)
assert.True(t, qOpts.StaticAssets.LogAccess)
assert.Equal(t, "some.json", qOpts.UIConfig)
assert.Equal(t, "/jaeger", qOpts.BasePath)
assert.Equal(t, "127.0.0.1:8080", qOpts.HTTPHostPort)
Expand Down
62 changes: 44 additions & 18 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,13 @@ func TestServerHTTPTLS(t *testing.T) {
}

serverOptions := &QueryOptions{
GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""),
HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""),
TLSHTTP: test.TLS,
TLSGRPC: TLSGRPC,
BearerTokenPropagation: true,
GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""),
HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""),
TLSHTTP: test.TLS,
TLSGRPC: TLSGRPC,
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
}
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zap.NewNop()
Expand Down Expand Up @@ -483,11 +485,13 @@ func TestServerGRPCTLS(t *testing.T) {
TLSHTTP = enabledTLSCfg
}
serverOptions := &QueryOptions{
GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""),
HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""),
TLSHTTP: TLSHTTP,
TLSGRPC: test.TLS,
BearerTokenPropagation: true,
GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""),
HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""),
TLSHTTP: TLSHTTP,
TLSGRPC: test.TLS,
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
}
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zap.NewNop()
Expand Down Expand Up @@ -553,13 +557,25 @@ func TestServerGRPCTLS(t *testing.T) {

func TestServerBadHostPort(t *testing.T) {
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: "8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true},
&QueryOptions{
HTTPHostPort: "8080",
GRPCHostPort: "127.0.0.1:8081",
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
},
tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp())

assert.NotNil(t, err)
_, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", BearerTokenPropagation: true},
&QueryOptions{
HTTPHostPort: "127.0.0.1:8081",
GRPCHostPort: "9123",
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
},
tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp())

Expand Down Expand Up @@ -587,9 +603,11 @@ func TestServerInUseHostPort(t *testing.T) {
&querysvc.QueryService{},
nil,
&QueryOptions{
HTTPHostPort: tc.httpHostPort,
GRPCHostPort: tc.grpcHostPort,
BearerTokenPropagation: true,
HTTPHostPort: tc.httpHostPort,
GRPCHostPort: tc.grpcHostPort,
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
},
tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp(),
Expand Down Expand Up @@ -620,7 +638,13 @@ func TestServerSinglePort(t *testing.T) {

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, querySvc, nil,
&QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort, BearerTokenPropagation: true},
&QueryOptions{
GRPCHostPort: hostPort,
HTTPHostPort: hostPort,
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
},
tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp())
assert.Nil(t, err)
Expand Down Expand Up @@ -747,8 +771,10 @@ func TestServerHTTPTenancy(t *testing.T) {
serverOptions := &QueryOptions{
HTTPHostPort: ":8080",
GRPCHostPort: ":8080",
Tenancy: tenancy.Options{
Enabled: true,
QueryOptionsBase: QueryOptionsBase{
Tenancy: tenancy.Options{
Enabled: true,
},
},
}
tenancyMgr := tenancy.NewManager(&serverOptions.Tenancy)
Expand Down
5 changes: 3 additions & 2 deletions cmd/query/app/static_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ var (

// RegisterStaticHandler adds handler for static assets to the router.
func RegisterStaticHandler(r *mux.Router, logger *zap.Logger, qOpts *QueryOptions) {
staticHandler, err := NewStaticAssetsHandler(qOpts.StaticAssets, StaticAssetsHandlerOptions{
staticHandler, err := NewStaticAssetsHandler(qOpts.StaticAssets.Path, StaticAssetsHandlerOptions{
BasePath: qOpts.BasePath,
UIConfigPath: qOpts.UIConfig,
Logger: logger,
LogAccess: qOpts.LogStaticAssetsAccess,
LogAccess: qOpts.StaticAssets.LogAccess,
})
if err != nil {
logger.Panic("Could not create static assets handler", zap.Error(err))
Expand Down Expand Up @@ -100,6 +100,7 @@ func NewStaticAssetsHandler(staticAssetsRoot string, options StaticAssetsHandler
assetsFS: assetsFS,
}

options.Logger.Info("Using UI configuration", zap.String("path", options.UIConfigPath))
watcher, err := fswatcher.New([]string{options.UIConfigPath}, h.reloadUIConfig, h.options.Logger)
if err != nil {
return nil, err
Expand Down
24 changes: 19 additions & 5 deletions cmd/query/app/static_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,17 @@ func TestNotExistingUiConfig(t *testing.T) {
func TestRegisterStaticHandlerPanic(t *testing.T) {
logger, buf := testutils.NewLogger()
assert.Panics(t, func() {
RegisterStaticHandler(mux.NewRouter(), logger, &QueryOptions{StaticAssets: "/foo/bar"})
RegisterStaticHandler(
mux.NewRouter(),
logger,
&QueryOptions{
QueryOptionsBase: QueryOptionsBase{
StaticAssets: QueryOptionsStaticAssets{
Path: "/foo/bar",
},
},
},
)
})
assert.Contains(t, buf.String(), "Could not create static assets handler")
assert.Contains(t, buf.String(), "no such file or directory")
Expand Down Expand Up @@ -99,10 +109,14 @@ func TestRegisterStaticHandler(t *testing.T) {
r = r.PathPrefix(testCase.basePath).Subrouter()
}
RegisterStaticHandler(r, logger, &QueryOptions{
StaticAssets: "fixture",
BasePath: testCase.basePath,
UIConfig: testCase.UIConfigPath,
LogStaticAssetsAccess: testCase.logAccess,
QueryOptionsBase: QueryOptionsBase{
StaticAssets: QueryOptionsStaticAssets{
Path: "fixture",
LogAccess: testCase.logAccess,
},
BasePath: testCase.basePath,
UIConfig: testCase.UIConfigPath,
},
})

server := httptest.NewServer(r)
Expand Down
Loading
Loading