From 244b4be8a3d349d8be9537a82a4b43078493b96f Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 20 Oct 2023 21:34:10 -0400 Subject: [PATCH] exp: Support primary and archive storage (#4873) ## Which problem is this PR solving? - Part of #4843 - Prototype defining primary and archive storage used by query service ## Description of the changes - Introduce `trace_storage_archive` argument to extension/jaegerquery - This is temporary, need to design a better shape for the config - Implement initialization of archive storage. Tested manually. - Changed memstore to implement archive storage (maybe not a good idea in retrospect) - Change how no-config condition is detected as the previous changes worked for all-in-one but broke with-config mode - Need to start adding some basic integration tests - There is a better way than using default configs for all in one - override `--config` flag with `yaml: ....` string an provide configuration as a file (embed the file into the binary) - Some refactoring of querysvc/app to be able to reuse querysvc.QueryOptions in v2 config ## How was this change tested? - Run all-in-one - Run with config - Run with custom UI config to enable Archive button (it should really be dynamically determined by backend capability) Signed-off-by: Yuri Shkuro --- cmd/jaeger-v2/config-ui.json | 3 + cmd/jaeger-v2/config.yaml | 4 ++ cmd/jaeger-v2/internal/command.go | 12 +--- .../internal/extension/jaegerquery/config.go | 6 +- .../internal/extension/jaegerquery/factory.go | 2 +- .../internal/extension/jaegerquery/server.go | 35 +++++++++-- cmd/query/app/flags.go | 49 +++++++++------ cmd/query/app/flags_test.go | 4 +- cmd/query/app/server_test.go | 62 +++++++++++++------ cmd/query/app/static_handler.go | 5 +- cmd/query/app/static_handler_test.go | 24 +++++-- cmd/query/app/token_propagation_test.go | 8 ++- plugin/storage/memory/factory.go | 10 +++ 13 files changed, 161 insertions(+), 63 deletions(-) create mode 100644 cmd/jaeger-v2/config-ui.json diff --git a/cmd/jaeger-v2/config-ui.json b/cmd/jaeger-v2/config-ui.json new file mode 100644 index 00000000000..47a7484699c --- /dev/null +++ b/cmd/jaeger-v2/config-ui.json @@ -0,0 +1,3 @@ +{ + "archiveEnabled": true +} diff --git a/cmd/jaeger-v2/config.yaml b/cmd/jaeger-v2/config.yaml index 4eca2df735b..a5239ec2428 100644 --- a/cmd/jaeger-v2/config.yaml +++ b/cmd/jaeger-v2/config.yaml @@ -15,11 +15,15 @@ extensions: jaeger_query: trace_storage: memstore + trace_storage_archive: memstore_archive + ui_config: ./cmd/jaeger-v2/config-ui.json jaeger_storage: memory: memstore: max_traces: 100000 + memstore_archive: + max_traces: 100000 receivers: diff --git a/cmd/jaeger-v2/internal/command.go b/cmd/jaeger-v2/internal/command.go index cc825a0186c..b8302b55535 100644 --- a/cmd/jaeger-v2/internal/command.go +++ b/cmd/jaeger-v2/internal/command.go @@ -5,7 +5,6 @@ package internal import ( "log" - "strings" "github.com/spf13/cobra" "go.opentelemetry.io/collector/component" @@ -45,14 +44,9 @@ func Command() *cobra.Command { // back to the official RunE. otelRunE := cmd.RunE cmd.RunE = func(cmd *cobra.Command, args []string) error { - configProvided := false - for _, arg := range args { - if strings.HasPrefix(arg, "--config") { - configProvided = true - break - } - } - if configProvided { + // a bit of a hack to check if '--config' flag was present + configFlag := cmd.Flag("config").Value.String() + if configFlag != "" && configFlag != "[]" { return otelRunE(cmd, args) } log.Print("No '--config' flags detected, using default All-in-One configuration with memory storage.") diff --git a/cmd/jaeger-v2/internal/extension/jaegerquery/config.go b/cmd/jaeger-v2/internal/extension/jaegerquery/config.go index 8751251ac17..cb089f24f80 100644 --- a/cmd/jaeger-v2/internal/extension/jaegerquery/config.go +++ b/cmd/jaeger-v2/internal/extension/jaegerquery/config.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" + queryApp "github.com/jaegertracing/jaeger/cmd/query/app" "github.com/jaegertracing/jaeger/pkg/tenancy" ) @@ -15,7 +16,10 @@ var _ component.ConfigValidator = (*Config)(nil) // Config represents the configuration for jaeger-query, type Config struct { - TraceStorage string `valid:"required" mapstructure:"trace_storage"` + queryApp.QueryOptionsBase `mapstructure:",squash"` + + TraceStoragePrimary string `valid:"required" mapstructure:"trace_storage"` + TraceStorageArchive string `valid:"optional" mapstructure:"trace_storage_archive"` confighttp.HTTPServerSettings `mapstructure:",squash"` Tenancy tenancy.Options `mapstructure:"multi_tenancy"` } diff --git a/cmd/jaeger-v2/internal/extension/jaegerquery/factory.go b/cmd/jaeger-v2/internal/extension/jaegerquery/factory.go index f752516b1d5..9efd3d02a79 100644 --- a/cmd/jaeger-v2/internal/extension/jaegerquery/factory.go +++ b/cmd/jaeger-v2/internal/extension/jaegerquery/factory.go @@ -26,7 +26,7 @@ func NewFactory() extension.Factory { func createDefaultConfig() component.Config { return &Config{ - TraceStorage: jaegerstorage.DefaultMemoryStore, + TraceStoragePrimary: jaegerstorage.DefaultMemoryStore, HTTPServerSettings: confighttp.HTTPServerSettings{ Endpoint: ports.PortToHostPort(ports.QueryHTTP), }, diff --git a/cmd/jaeger-v2/internal/extension/jaegerquery/server.go b/cmd/jaeger-v2/internal/extension/jaegerquery/server.go index 0bf386d6e97..82c1eaf71b3 100644 --- a/cmd/jaeger-v2/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger-v2/internal/extension/jaegerquery/server.go @@ -36,9 +36,9 @@ func newServer(config *Config, otel component.TelemetrySettings) *server { } func (s *server) Start(ctx context.Context, host component.Host) error { - f, err := jaegerstorage.GetStorageFactory(s.config.TraceStorage, host) + f, err := jaegerstorage.GetStorageFactory(s.config.TraceStoragePrimary, host) if err != nil { - return fmt.Errorf("cannot find storage factory: %w", err) + return fmt.Errorf("cannot find primary storage %s: %w", s.config.TraceStoragePrimary, err) } spanReader, err := f.CreateSpanReader() @@ -53,7 +53,11 @@ func (s *server) Start(ctx context.Context, host component.Host) error { return fmt.Errorf("cannot create dependencies reader: %w", err) } - qs := querysvc.NewQueryService(spanReader, depReader, querysvc.QueryServiceOptions{}) + var opts querysvc.QueryServiceOptions + if err := s.addArchiveStorage(&opts, host); err != nil { + return err + } + qs := querysvc.NewQueryService(spanReader, depReader, opts) metricsQueryService, _ := disabled.NewMetricsReader() tm := tenancy.NewManager(&s.config.Tenancy) @@ -63,7 +67,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error { s.logger, qs, metricsQueryService, - makeQueryOptions(), + s.makeQueryOptions(), tm, jtracer.NoOp(), ) @@ -78,9 +82,28 @@ func (s *server) Start(ctx context.Context, host component.Host) error { return nil } -func makeQueryOptions() *queryApp.QueryOptions { +func (s *server) addArchiveStorage(opts *querysvc.QueryServiceOptions, host component.Host) error { + if s.config.TraceStorageArchive == "" { + s.logger.Info("Archive storage not configured") + return nil + } + + f, err := jaegerstorage.GetStorageFactory(s.config.TraceStorageArchive, host) + if err != nil { + return fmt.Errorf("cannot find archive storage factory: %w", err) + } + + if !opts.InitArchiveStorage(f, s.logger) { + s.logger.Info("Archive storage not initialized") + } + return nil +} + +func (s *server) makeQueryOptions() *queryApp.QueryOptions { return &queryApp.QueryOptions{ - // TODO + QueryOptionsBase: s.config.QueryOptionsBase, + + // TODO expose via config HTTPHostPort: ports.PortToHostPort(ports.QueryHTTP), GRPCHostPort: ports.PortToHostPort(ports.QueryGRPC), } diff --git a/cmd/query/app/flags.go b/cmd/query/app/flags.go index 2f273f49c7b..f4511784625 100644 --- a/cmd/query/app/flags.go +++ b/cmd/query/app/flags.go @@ -58,26 +58,25 @@ var tlsHTTPFlagsConfig = tlscfg.ServerFlagsConfig{ Prefix: "query.http", } -// QueryOptions holds configuration for query service -type QueryOptions struct { - // HTTPHostPort is the host:port address that the query service listens in on for http requests - HTTPHostPort string - // GRPCHostPort is the host:port address that the query service listens in on for gRPC requests - GRPCHostPort string - // BasePath is the prefix for all UI and API HTTP routes +// QueryOptionsStaticAssets contains configuration for handling static assets +type QueryOptionsStaticAssets struct { + // Path is the path for the static assets for the UI (https://github.com/uber/jaeger-ui) + Path string `valid:"optional" mapstructure:"path"` + // LogAccess tells static handler to log access to static assets, useful in debugging + LogAccess bool `valid:"optional" mapstructure:"log_access"` +} + +// QueryOptionsBase holds configuration for query service shared with jaeger-v2 +type QueryOptionsBase struct { + // BasePath is the base path for all HTTP routes BasePath string - // StaticAssets is the path for the static assets for the UI (https://github.com/uber/jaeger-ui) - StaticAssets string - // LogStaticAssetsAccess tells static handler to log access to static assets, useful in debugging - LogStaticAssetsAccess bool + + StaticAssets QueryOptionsStaticAssets `valid:"optional" mapstructure:"static_assets"` + // UIConfig is the path to a configuration file for the UI - UIConfig string + UIConfig string `valid:"optional" mapstructure:"ui_config"` // BearerTokenPropagation activate/deactivate bearer token propagation to storage BearerTokenPropagation bool - // TLSGRPC configures secure transport (Consumer to Query service GRPC API) - TLSGRPC tlscfg.Options - // TLSHTTP configures secure transport (Consumer to Query service HTTP API) - TLSHTTP tlscfg.Options // AdditionalHeaders AdditionalHeaders http.Header // MaxClockSkewAdjust is the maximum duration by which jaeger-query will adjust a span @@ -88,6 +87,20 @@ type QueryOptions struct { EnableTracing bool } +// QueryOptions holds configuration for query service +type QueryOptions struct { + QueryOptionsBase + + // HTTPHostPort is the host:port address that the query service listens in on for http requests + HTTPHostPort string + // GRPCHostPort is the host:port address that the query service listens in on for gRPC requests + GRPCHostPort string + // TLSGRPC configures secure transport (Consumer to Query service GRPC API) + TLSGRPC tlscfg.Options + // TLSHTTP configures secure transport (Consumer to Query service HTTP API) + TLSHTTP tlscfg.Options +} + // AddFlags adds flags for QueryOptions func AddFlags(flagSet *flag.FlagSet) { flagSet.Var(&config.StringSlice{}, queryAdditionalHeaders, `Additional HTTP response headers. Can be specified multiple times. Format: "Key: Value"`) @@ -119,8 +132,8 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q return qOpts, fmt.Errorf("failed to process HTTP TLS options: %w", err) } qOpts.BasePath = v.GetString(queryBasePath) - qOpts.StaticAssets = v.GetString(queryStaticFiles) - qOpts.LogStaticAssetsAccess = v.GetBool(queryLogStaticAssetsAccess) + qOpts.StaticAssets.Path = v.GetString(queryStaticFiles) + qOpts.StaticAssets.LogAccess = v.GetBool(queryLogStaticAssetsAccess) qOpts.UIConfig = v.GetString(queryUIConfig) qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation) diff --git a/cmd/query/app/flags_test.go b/cmd/query/app/flags_test.go index 77cfed3fcec..969713e5a41 100644 --- a/cmd/query/app/flags_test.go +++ b/cmd/query/app/flags_test.go @@ -46,8 +46,8 @@ func TestQueryBuilderFlags(t *testing.T) { }) qOpts, err := new(QueryOptions).InitFromViper(v, zap.NewNop()) require.NoError(t, err) - assert.Equal(t, "/dev/null", qOpts.StaticAssets) - assert.True(t, qOpts.LogStaticAssetsAccess) + assert.Equal(t, "/dev/null", qOpts.StaticAssets.Path) + assert.True(t, qOpts.StaticAssets.LogAccess) assert.Equal(t, "some.json", qOpts.UIConfig) assert.Equal(t, "/jaeger", qOpts.BasePath) assert.Equal(t, "127.0.0.1:8080", qOpts.HTTPHostPort) diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 70efd2104ae..c20c97d566c 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -323,11 +323,13 @@ func TestServerHTTPTLS(t *testing.T) { } serverOptions := &QueryOptions{ - GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""), - HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""), - TLSHTTP: test.TLS, - TLSGRPC: TLSGRPC, - BearerTokenPropagation: true, + GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""), + HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""), + TLSHTTP: test.TLS, + TLSGRPC: TLSGRPC, + QueryOptionsBase: QueryOptionsBase{ + BearerTokenPropagation: true, + }, } flagsSvc := flags.NewService(ports.QueryAdminHTTP) flagsSvc.Logger = zap.NewNop() @@ -483,11 +485,13 @@ func TestServerGRPCTLS(t *testing.T) { TLSHTTP = enabledTLSCfg } serverOptions := &QueryOptions{ - GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""), - HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""), - TLSHTTP: TLSHTTP, - TLSGRPC: test.TLS, - BearerTokenPropagation: true, + GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""), + HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""), + TLSHTTP: TLSHTTP, + TLSGRPC: test.TLS, + QueryOptionsBase: QueryOptionsBase{ + BearerTokenPropagation: true, + }, } flagsSvc := flags.NewService(ports.QueryAdminHTTP) flagsSvc.Logger = zap.NewNop() @@ -553,13 +557,25 @@ func TestServerGRPCTLS(t *testing.T) { func TestServerBadHostPort(t *testing.T) { _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, - &QueryOptions{HTTPHostPort: "8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true}, + &QueryOptions{ + HTTPHostPort: "8080", + GRPCHostPort: "127.0.0.1:8081", + QueryOptionsBase: QueryOptionsBase{ + BearerTokenPropagation: true, + }, + }, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) assert.NotNil(t, err) _, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, - &QueryOptions{HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", BearerTokenPropagation: true}, + &QueryOptions{ + HTTPHostPort: "127.0.0.1:8081", + GRPCHostPort: "9123", + QueryOptionsBase: QueryOptionsBase{ + BearerTokenPropagation: true, + }, + }, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) @@ -587,9 +603,11 @@ func TestServerInUseHostPort(t *testing.T) { &querysvc.QueryService{}, nil, &QueryOptions{ - HTTPHostPort: tc.httpHostPort, - GRPCHostPort: tc.grpcHostPort, - BearerTokenPropagation: true, + HTTPHostPort: tc.httpHostPort, + GRPCHostPort: tc.grpcHostPort, + QueryOptionsBase: QueryOptionsBase{ + BearerTokenPropagation: true, + }, }, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp(), @@ -620,7 +638,13 @@ func TestServerSinglePort(t *testing.T) { querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) server, err := NewServer(flagsSvc.Logger, querySvc, nil, - &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort, BearerTokenPropagation: true}, + &QueryOptions{ + GRPCHostPort: hostPort, + HTTPHostPort: hostPort, + QueryOptionsBase: QueryOptionsBase{ + BearerTokenPropagation: true, + }, + }, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) assert.Nil(t, err) @@ -747,8 +771,10 @@ func TestServerHTTPTenancy(t *testing.T) { serverOptions := &QueryOptions{ HTTPHostPort: ":8080", GRPCHostPort: ":8080", - Tenancy: tenancy.Options{ - Enabled: true, + QueryOptionsBase: QueryOptionsBase{ + Tenancy: tenancy.Options{ + Enabled: true, + }, }, } tenancyMgr := tenancy.NewManager(&serverOptions.Tenancy) diff --git a/cmd/query/app/static_handler.go b/cmd/query/app/static_handler.go index 9e01c8ff3f4..5e9ab8f9f44 100644 --- a/cmd/query/app/static_handler.go +++ b/cmd/query/app/static_handler.go @@ -45,11 +45,11 @@ var ( // RegisterStaticHandler adds handler for static assets to the router. func RegisterStaticHandler(r *mux.Router, logger *zap.Logger, qOpts *QueryOptions) { - staticHandler, err := NewStaticAssetsHandler(qOpts.StaticAssets, StaticAssetsHandlerOptions{ + staticHandler, err := NewStaticAssetsHandler(qOpts.StaticAssets.Path, StaticAssetsHandlerOptions{ BasePath: qOpts.BasePath, UIConfigPath: qOpts.UIConfig, Logger: logger, - LogAccess: qOpts.LogStaticAssetsAccess, + LogAccess: qOpts.StaticAssets.LogAccess, }) if err != nil { logger.Panic("Could not create static assets handler", zap.Error(err)) @@ -100,6 +100,7 @@ func NewStaticAssetsHandler(staticAssetsRoot string, options StaticAssetsHandler assetsFS: assetsFS, } + options.Logger.Info("Using UI configuration", zap.String("path", options.UIConfigPath)) watcher, err := fswatcher.New([]string{options.UIConfigPath}, h.reloadUIConfig, h.options.Logger) if err != nil { return nil, err diff --git a/cmd/query/app/static_handler_test.go b/cmd/query/app/static_handler_test.go index 6fd4180d0b2..34351c17f78 100644 --- a/cmd/query/app/static_handler_test.go +++ b/cmd/query/app/static_handler_test.go @@ -48,7 +48,17 @@ func TestNotExistingUiConfig(t *testing.T) { func TestRegisterStaticHandlerPanic(t *testing.T) { logger, buf := testutils.NewLogger() assert.Panics(t, func() { - RegisterStaticHandler(mux.NewRouter(), logger, &QueryOptions{StaticAssets: "/foo/bar"}) + RegisterStaticHandler( + mux.NewRouter(), + logger, + &QueryOptions{ + QueryOptionsBase: QueryOptionsBase{ + StaticAssets: QueryOptionsStaticAssets{ + Path: "/foo/bar", + }, + }, + }, + ) }) assert.Contains(t, buf.String(), "Could not create static assets handler") assert.Contains(t, buf.String(), "no such file or directory") @@ -99,10 +109,14 @@ func TestRegisterStaticHandler(t *testing.T) { r = r.PathPrefix(testCase.basePath).Subrouter() } RegisterStaticHandler(r, logger, &QueryOptions{ - StaticAssets: "fixture", - BasePath: testCase.basePath, - UIConfig: testCase.UIConfigPath, - LogStaticAssetsAccess: testCase.logAccess, + QueryOptionsBase: QueryOptionsBase{ + StaticAssets: QueryOptionsStaticAssets{ + Path: "fixture", + LogAccess: testCase.logAccess, + }, + BasePath: testCase.basePath, + UIConfig: testCase.UIConfigPath, + }, }) server := httptest.NewServer(r) diff --git a/cmd/query/app/token_propagation_test.go b/cmd/query/app/token_propagation_test.go index 5b7a12042bc..cd9377b7a34 100644 --- a/cmd/query/app/token_propagation_test.go +++ b/cmd/query/app/token_propagation_test.go @@ -91,7 +91,13 @@ func runQueryService(t *testing.T, esURL string) *Server { querySvc := querysvc.NewQueryService(spanReader, nil, querysvc.QueryServiceOptions{}) server, err := NewServer(flagsSvc.Logger, querySvc, nil, - &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0", BearerTokenPropagation: true}, + &QueryOptions{ + GRPCHostPort: ":0", + HTTPHostPort: ":0", + QueryOptionsBase: QueryOptionsBase{ + BearerTokenPropagation: true, + }, + }, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp(), ) diff --git a/plugin/storage/memory/factory.go b/plugin/storage/memory/factory.go index c99b706d561..b6c71cbfe16 100644 --- a/plugin/storage/memory/factory.go +++ b/plugin/storage/memory/factory.go @@ -92,6 +92,16 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { return f.store, nil } +// CreateArchiveSpanReader implements storage.ArchiveFactory +func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { + return f.store, nil +} + +// CreateArchiveSpanWriter implements storage.ArchiveFactory +func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { + return f.store, nil +} + // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { return f.store, nil