diff --git a/.chloggen/tigran_stefexporter.yaml b/.chloggen/tigran_stefexporter.yaml new file mode 100644 index 0000000000000..70b287bc26533 --- /dev/null +++ b/.chloggen/tigran_stefexporter.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: stefexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add basic STEF exporter implementation + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37759] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/stefexporter/README.md b/exporter/stefexporter/README.md index fbbd24cd03bb9..ca706671b540e 100644 --- a/exporter/stefexporter/README.md +++ b/exporter/stefexporter/README.md @@ -13,3 +13,49 @@ Export data via gRPC using [Otel/STEF format](https://github.com/splunk/stef/tree/main/go/otel) format. + +## Getting Started + +The following settings are required: + +- `endpoint` (no default): host:port to which the exporter is going to send STEF metric data, + using the STEF/gRPC protocol. The valid syntax is described + [here](https://github.com/grpc/grpc/blob/master/doc/naming.md). + If a scheme of `https` is used then client transport security is enabled and overrides the `insecure` setting. +- `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) + for the full set of available options. + +Example: + +```yaml +exporters: + stef: + endpoint: otelcol2:4317 + tls: + cert_file: file.cert + key_file: file.key + stef/2: + endpoint: otelcol2:4317 + tls: + insecure: true +``` + +By default, no compression is enabled. The only supported compression method is zstd. +To enable, configure as follows: + +```yaml +exporters: + otlp: + ... + compression: zstd +``` + +## Advanced Configuration + +Several helper files are leveraged to provide additional capabilities automatically: + +- [gRPC settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configgrpc/README.md) +- [TLS and mTLS settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) +- [Queuing, timeout and retry settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md). + Note that `timeout` setting controls how long the exporter waits for ACK of a data sent + over STEF/gRPC stream. diff --git a/exporter/stefexporter/config.go b/exporter/stefexporter/config.go index 76d273150f430..6f002cf7b8c2b 100644 --- a/exporter/stefexporter/config.go +++ b/exporter/stefexporter/config.go @@ -13,11 +13,16 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) // Config defines configuration for logging exporter. type Config struct { - configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + exporterhelper.TimeoutConfig `mapstructure:",squash"` + exporterhelper.QueueConfig `mapstructure:"sending_queue"` + RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"` + configgrpc.ClientConfig `mapstructure:",squash"` } var _ component.Config = (*Config)(nil) @@ -38,6 +43,13 @@ func (c *Config) Validate() error { return fmt.Errorf(`invalid port "%s"`, port) } + switch c.Compression { + case "": + case "zstd": + default: + return fmt.Errorf("unsupported compression method %q", c.Compression) + } + return nil } diff --git a/exporter/stefexporter/exporter.go b/exporter/stefexporter/exporter.go index 8c293ae066710..a59d5a5b06190 100644 --- a/exporter/stefexporter/exporter.go +++ b/exporter/stefexporter/exporter.go @@ -5,26 +5,257 @@ package stefexporter // import "github.com/open-telemetry/opentelemetry-collecto import ( "context" + "fmt" + "sync" + stefgrpc "github.com/splunk/stef/go/grpc" + "github.com/splunk/stef/go/grpc/stef_proto" + "github.com/splunk/stef/go/otel/oteltef" + stefpdatametrics "github.com/splunk/stef/go/pdata/metrics" + stefpkg "github.com/splunk/stef/go/pkg" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" + "google.golang.org/grpc" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stefexporter/internal" ) -type stefExporter struct{} +// stefExporter implements sending metrics over STEF/gRPC stream. +// +// The exporter uses a single stream and accepts concurrent exportMetrics calls, +// sequencing the metric data as needed over a single stream. +// +// The exporter will block exportMetrics call until an acknowledgement is +// received from destination. +// +// The exporter relies on a preceding Retry helper to retry sending data that is +// not acknowledged or otherwise fails to be sent. The exporter will not retry +// sending the data itself. +type stefExporter struct { + set component.TelemetrySettings + cfg *Config + compression stefpkg.Compression + + // connMutex is taken when connecting, disconnecting or checking connection status. + connMutex sync.Mutex + isConnected bool + connID uint64 + grpcConn *grpc.ClientConn + client *stefgrpc.Client + + // The STEF writer we write metrics to and which in turns sends them over gRPC. + stefWriter *oteltef.MetricsWriter + stefWriterMutex sync.Mutex // protects stefWriter + + // lastAckID is the maximum ack ID received so far. + lastAckID uint64 + // Cond to protect and signal lastAckID. + ackCond *internal.CancellableCond +} + +type loggerWrapper struct { + logger *zap.Logger +} + +func (w *loggerWrapper) Debugf(_ context.Context, format string, v ...any) { + w.logger.Debug(fmt.Sprintf(format, v...)) +} + +func (w *loggerWrapper) Errorf(_ context.Context, format string, v ...any) { + w.logger.Error(fmt.Sprintf(format, v...)) +} + +func newStefExporter(set component.TelemetrySettings, cfg *Config) *stefExporter { + exp := &stefExporter{ + set: set, + cfg: cfg, + ackCond: internal.NewCancellableCond(), + } -func newStefExporter(_ *zap.Logger, _ *Config) *stefExporter { - return &stefExporter{} + exp.compression = stefpkg.CompressionNone + if cfg.Compression == "zstd" { + exp.compression = stefpkg.CompressionZstd + } + return exp } -func (s *stefExporter) Start(_ context.Context, _ component.Host) error { +func (s *stefExporter) Start(ctx context.Context, host component.Host) error { + // Prepare gRPC connection. + var err error + s.grpcConn, err = s.cfg.ClientConfig.ToClientConn(ctx, host, s.set) + if err != nil { + return err + } + + // No need to block Start(), we will begin connection attempt in a goroutine. + go func() { + if err := s.ensureConnected(ctx); err != nil { + s.set.Logger.Error("Error connecting to destination", zap.Error(err)) + // This is not a fatal error. exportMetrics() will try to connect again as needed. + } + }() + return nil +} + +func (s *stefExporter) Shutdown(ctx context.Context) error { + s.disconnect(ctx) + if s.grpcConn != nil { + if err := s.grpcConn.Close(); err != nil { + s.set.Logger.Error("Failed to close grpc connection", zap.Error(err)) + } + s.grpcConn = nil + } + return nil +} + +func (s *stefExporter) ensureConnected(ctx context.Context) error { + s.connMutex.Lock() + defer s.connMutex.Unlock() + + if s.isConnected { + return nil + } + + s.set.Logger.Debug("Connecting to destination", zap.String("endpoint", s.cfg.Endpoint)) + + s.ackCond.Cond.L.Lock() + // Reset lastAckID. New STEF stream ack IDs will start from 1. + s.lastAckID = 0 + // Increment connection ID, to make sure we don't confuse the new and + // previous (stale) connections. + s.connID++ + connID := s.connID + s.ackCond.Cond.L.Unlock() + + // Prepare to open a STEF/gRPC stream to the server. + grpcClient := stef_proto.NewSTEFDestinationClient(s.grpcConn) + + // Let server know about our schema. + schema, err := oteltef.MetricsWireSchema() + if err != nil { + return err + } + + settings := stefgrpc.ClientSettings{ + Logger: &loggerWrapper{s.set.Logger}, + GrpcClient: grpcClient, + ClientSchema: &schema, + Callbacks: stefgrpc.ClientCallbacks{ + OnAck: func(ackId uint64) error { return s.onGrpcAck(connID, ackId) }, + }, + } + s.client = stefgrpc.NewClient(settings) + + grpcWriter, opts, err := s.client.Connect(ctx) + if err != nil { + return fmt.Errorf("failed to connect to destination: %w", err) + } + + opts.Compression = s.compression + + // Create STEF record writer over gRPC. + s.stefWriter, err = oteltef.NewMetricsWriter(grpcWriter, opts) + if err != nil { + return err + } + + s.isConnected = true + s.set.Logger.Debug("Connected to destination", zap.String("endpoint", s.cfg.Endpoint)) + return nil } -func (s *stefExporter) Shutdown(_ context.Context) error { +func (s *stefExporter) disconnect(ctx context.Context) { + s.connMutex.Lock() + defer s.connMutex.Unlock() + + if !s.isConnected { + return + } + + if err := s.client.Disconnect(ctx); err != nil { + s.set.Logger.Error("Failed to disconnect", zap.Error(err)) + } + + s.set.Logger.Debug("Disconnected.") + s.isConnected = false +} + +func (s *stefExporter) exportMetrics(ctx context.Context, md pmetric.Metrics) error { + if err := s.ensureConnected(ctx); err != nil { + return err + } + + // stefWriter is not safe for concurrent writing, protect it. + s.stefWriterMutex.Lock() + defer s.stefWriterMutex.Unlock() + + converter := stefpdatametrics.OtlpToTEFUnsorted{} + err := converter.WriteMetrics(md, s.stefWriter) + if err != nil { + // Error to write to STEF stream typically indicates either: + // 1) A problem with the connection. We need to reconnect. + // 2) Encoding failure, possibly due to encoder bug. In this case + // we need to reconnect too, to make sure encoders start from + // initial state, which is our best chance to succeed next time. + // + // We need to reconnect. Disconnect here and the next exportMetrics() + // call will connect again. + s.disconnect(ctx) + + // TODO: check if err is because STEF encoding failed. If so we must not + // try to re-encode the same data. Return consumererror.NewPermanent(err) + // to the caller. This requires changes in STEF Go library. + + // Return an error to retry sending these metrics again next time. + return err + } + + // According to STEF gRPC spec the destination ack IDs match written record number. + // When the data we have just written is received by destination it will send us + // back and ack ID that numerically matches the last written record number. + expectedAckID := s.stefWriter.RecordCount() + + // stefWriter normally buffers written records in memory. Flush() ensures buffered + // data is sent to network. This is necessary so that the server receives it and + // sends an acknowledgement back. + if err = s.stefWriter.Flush(); err != nil { + // Failure to write the gRPC stream normally means something is + // wrong with the connection. We need to reconnect. Disconnect here + // and the next exportMetrics() call will connect again. + s.disconnect(ctx) + + // Return an error to retry sending these metrics again next time. + return err + } + + // Wait for acknowledgement. + err = s.ackCond.Wait(ctx, func() bool { return s.lastAckID >= expectedAckID }) + if err != nil { + return fmt.Errorf("error waiting for ack ID %d: %w", expectedAckID, err) + } + return nil } -func (s *stefExporter) pushMetrics(_ context.Context, _ pmetric.Metrics) error { +func (s *stefExporter) onGrpcAck(connID uint64, ackID uint64) error { + s.ackCond.Cond.L.Lock() + defer s.ackCond.Cond.L.Unlock() + + if s.connID != connID { + // This is an ack from a previous (stale) connection. This can happen + // due to a race if the ack from the old stream arrives after we decided + // to reconnect but the old stream is still functioning. We just need + // to ignore this ack, it is no longer relevant. + return nil + } + + // The IDs are expected to always monotonically increase. Check it anyway in case + // the server misbehaves and violates the expectation. + if s.lastAckID < ackID { + s.lastAckID = ackID + s.ackCond.Cond.Broadcast() + } return nil } diff --git a/exporter/stefexporter/exporter_test.go b/exporter/stefexporter/exporter_test.go index 536671c0a9ba9..4264593e42e5a 100644 --- a/exporter/stefexporter/exporter_test.go +++ b/exporter/stefexporter/exporter_test.go @@ -2,3 +2,333 @@ // SPDX-License-Identifier: Apache-2.0 package stefexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stefexporter" + +import ( + "context" + "errors" + "net" + "sync/atomic" + "testing" + "time" + + stefgrpc "github.com/splunk/stef/go/grpc" + "github.com/splunk/stef/go/grpc/stef_proto" + "github.com/splunk/stef/go/otel/oteltef" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/testdata" + "go.uber.org/zap" + "google.golang.org/grpc" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" +) + +func newGrpcServer(listener net.Listener) (*grpc.Server, int) { + serverPort := listener.Addr().(*net.TCPAddr).Port + var opts []grpc.ServerOption + grpcServer := grpc.NewServer(opts...) + return grpcServer, serverPort +} + +type mockMetricDestServer struct { + stef_proto.UnimplementedSTEFDestinationServer + logger *zap.Logger + grpcServer *grpc.Server + recordsReceived atomic.Int64 + acksSent atomic.Int64 + endpoint string + failAckCount atomic.Int64 +} + +func newMockMetricDestServer(t *testing.T, logger *zap.Logger) *mockMetricDestServer { + m := &mockMetricDestServer{logger: logger} + tcpAddr := testutil.GetAvailableLocalAddress(t) + m.endpoint = tcpAddr + return m +} + +func (m *mockMetricDestServer) start() { + listener, err := net.Listen("tcp", m.endpoint) + if err != nil { + m.logger.Fatal("Failed to find an available address to run the gRPC server", zap.Error(err)) + } + + grpcServer, serverPort := newGrpcServer(listener) + m.logger.Info("Listening for connections", zap.Int("port", serverPort)) + + m.grpcServer = grpcServer + + schema, err := oteltef.MetricsWireSchema() + if err != nil { + m.logger.Fatal("Failed to load schema", zap.Error(err)) + } + + settings := stefgrpc.ServerSettings{ + Logger: nil, + ServerSchema: &schema, + MaxDictBytes: 0, + OnStream: m.onStream, + } + mockServer := stefgrpc.NewStreamServer(settings) + stef_proto.RegisterSTEFDestinationServer(grpcServer, mockServer) + go func() { + err := grpcServer.Serve(listener) + if err != nil && !errors.Is(err, grpc.ErrServerStopped) { + m.logger.Fatal("Failed to start STEF server", zap.Error(err)) + } + }() +} + +func (m *mockMetricDestServer) stop() { + m.grpcServer.Stop() +} + +func (m *mockMetricDestServer) onStream(grpcReader stefgrpc.GrpcReader, ackFunc func(sequenceId uint64) error) error { + m.logger.Info("Incoming TEF/gRPC connection.") + + reader, err := oteltef.NewMetricsReader(grpcReader) + if err != nil { + m.logger.Error("Error creating metrics reader from connection", zap.Error(err)) + return err + } + + for { + _, err = reader.Read() + if err != nil { + m.logger.Error("Error reading from connection", zap.Error(err)) + return err + } + m.recordsReceived.Add(1) + + if m.failAckCount.Add(-1) >= 0 { + // This connection must fail to ack. + continue + } + + if err = ackFunc(reader.RecordCount()); err != nil { + return err + } + m.acksSent.Add(1) + } +} + +func runTest( + t *testing.T, + cfg *Config, + f func(cfg *Config, mockSrv *mockMetricDestServer, exp exporter.Metrics), +) { + logCfg := zap.NewDevelopmentConfig() + logCfg.DisableStacktrace = true + logger, _ := logCfg.Build() + + mockSrv := newMockMetricDestServer(t, logger) + + mockSrv.start() + defer mockSrv.stop() + + // Start an exporter and point to the server. + factory := NewFactory() + if cfg == nil { + cfg = factory.CreateDefaultConfig().(*Config) + } + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: mockSrv.endpoint, + // Use insecure mode for tests so that we don't bother with certificates. + TLSSetting: configtls.ClientConfig{Insecure: true}, + } + + // Make retries quick. We will be testing failure modes and don't want test to take too long. + cfg.RetryConfig.InitialInterval = 10 * time.Millisecond + + set := exportertest.NewNopSettings() + set.TelemetrySettings.Logger = logger + + exp, err := factory.CreateMetrics(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + require.NoError(t, exp.Start(context.Background(), host)) + + f(cfg, mockSrv, exp) +} + +func TestExport(t *testing.T) { + runTest( + t, + nil, + func(cfg *Config, mockSrv *mockMetricDestServer, exp exporter.Metrics) { + // Send some metrics. Make sure the count of batches exceeds the number of consumers + // so that we can hit the case where exporter begins to forcedly flush encoded data. + pointCount := int64(0) + for i := 0; i < 2*cfg.QueueConfig.NumConsumers; i++ { + md := testdata.GenerateMetrics(1) + pointCount += int64(md.DataPointCount()) + err := exp.ConsumeMetrics(context.Background(), md) + require.NoError(t, err) + } + + // Wait for data to be received. + assert.Eventually( + t, func() bool { return mockSrv.recordsReceived.Load() == pointCount }, + 5*time.Second, 5*time.Millisecond, + ) + }, + ) +} + +func TestReconnect(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + + // Shorten max ack waiting time so that the attempt to send on a failed + // connection times out quickly and attempt to send again is tried + // until the broken connection is detected and reconnection happens. + cfg.TimeoutConfig.Timeout = 300 * time.Millisecond + + runTest( + t, + cfg, + func(_ *Config, mockSrv *mockMetricDestServer, exp exporter.Metrics) { + mockSrv.logger.Debug("======== 1") + + md := testdata.GenerateMetrics(1) + pointCount := int64(md.DataPointCount()) + err := exp.ConsumeMetrics(context.Background(), md) + require.NoError(t, err) + + // Wait for data to be received. + assert.Eventually( + t, func() bool { return mockSrv.recordsReceived.Load() == pointCount }, + 5*time.Second, 5*time.Millisecond, + ) + + mockSrv.logger.Debug("First set of data received.") + + // Disconnect from server side to verify that the exporter will reconnect + mockSrv.logger.Debug("Restarting mock STEF server.") + mockSrv.stop() + mockSrv.start() + + mockSrv.logger.Debug("======== 2") + + // Send more data + md = testdata.GenerateMetrics(1) + pointCount += int64(md.DataPointCount()) + err = exp.ConsumeMetrics(context.Background(), md) + require.NoError(t, err) + + // Wait for data to be received. + assert.Eventually( + t, func() bool { return mockSrv.recordsReceived.Load() == pointCount }, + 5*time.Second, 5*time.Millisecond, + ) + }, + ) +} + +func TestAckTimeout(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + + // Shorten max ack waiting time so that tests run fast. + // Increase this if the second eventually() below fails sporadically. + cfg.TimeoutConfig.Timeout = 300 * time.Millisecond + + runTest( + t, + cfg, + func(_ *Config, mockSrv *mockMetricDestServer, exp exporter.Metrics) { + md := testdata.GenerateMetrics(1) + pointCount := int64(md.DataPointCount()) + + // Fail to ack the first pointCount records. We want acks to succeed on the second try only. + mockSrv.failAckCount.Store(pointCount) + + err := exp.ConsumeMetrics(context.Background(), md) + require.NoError(t, err) + + // Wait for data to be received. + assert.Eventually( + t, func() bool { return mockSrv.recordsReceived.Load() >= pointCount }, + 5*time.Second, 5*time.Millisecond, + ) + + mockSrv.logger.Debug("First set of data received. Should remain unacknowledged.") + + // Because ack was not made by the server, the exporter is going to timeout, + // reconnect and send the data again. The same data will be delivered again, + // so recordsReceived counter will be twice the point count. + assert.Eventually( + t, func() bool { return mockSrv.recordsReceived.Load() == 2*pointCount }, + 5*time.Second, 5*time.Millisecond, + ) + + mockSrv.logger.Debug("Second set of data received after reconnection. Should be acknowledged.") + // Verify that acks were sent. + assert.EqualValues(t, pointCount, mockSrv.acksSent.Load()) + }, + ) +} + +func TestStartServerAfterClient(t *testing.T) { + logCfg := zap.NewDevelopmentConfig() + logCfg.DisableStacktrace = true + logger, _ := logCfg.Build() + + mockSrv := newMockMetricDestServer(t, logger) + + // Start an exporter and point to the server. + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: mockSrv.endpoint, + // Use insecure mode for tests so that we don't bother with certificates. + TLSSetting: configtls.ClientConfig{Insecure: true}, + } + + set := exportertest.NewNopSettings() + set.TelemetrySettings.Logger = logger + + exp := newStefExporter(set.TelemetrySettings, cfg) + require.NotNil(t, exp) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + host := componenttest.NewNopHost() + require.NoError(t, exp.Start(context.Background(), host)) + + // Trying sending with server down. + md := testdata.GenerateMetrics(1) + pointCount := int64(md.DataPointCount()) + err := exp.exportMetrics(context.Background(), md) + + // Sending must fail. + require.Error(t, err) + + // Now start the server. + mockSrv.start() + defer mockSrv.stop() + + // Try sending until it succeeds. The gRPC connection may not succeed immediately. + assert.Eventually( + t, func() bool { + err = exp.exportMetrics(context.Background(), md) + return err == nil + }, 5*time.Second, 200*time.Millisecond, + ) + + // Ensure data is received. + assert.EqualValues(t, pointCount, mockSrv.recordsReceived.Load()) +} diff --git a/exporter/stefexporter/factory.go b/exporter/stefexporter/factory.go index c92174c94ccb2..70c87d4cdb32e 100644 --- a/exporter/stefexporter/factory.go +++ b/exporter/stefexporter/factory.go @@ -5,8 +5,10 @@ package stefexporter // import "github.com/open-telemetry/opentelemetry-collecto import ( "context" + "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -27,20 +29,26 @@ func NewFactory() exporter.Factory { } func createDefaultConfig() component.Config { - return &Config{} + return &Config{ + TimeoutConfig: exporterhelper.TimeoutConfig{Timeout: 15 * time.Second}, + QueueConfig: exporterhelper.NewDefaultQueueConfig(), + RetryConfig: configretry.NewDefaultBackOffConfig(), + } } func createMetricsExporter(ctx context.Context, set exporter.Settings, config component.Config) ( exporter.Metrics, error, ) { cfg := config.(*Config) - stefexporter := newStefExporter(set.TelemetrySettings.Logger, cfg) + stefexporter := newStefExporter(set.TelemetrySettings, cfg) return exporterhelper.NewMetrics( ctx, set, config, - stefexporter.pushMetrics, + stefexporter.exportMetrics, exporterhelper.WithStart(stefexporter.Start), exporterhelper.WithShutdown(stefexporter.Shutdown), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), - exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), + exporterhelper.WithTimeout(cfg.TimeoutConfig), + exporterhelper.WithQueue(cfg.QueueConfig), + exporterhelper.WithRetry(cfg.RetryConfig), ) } diff --git a/exporter/stefexporter/go.mod b/exporter/stefexporter/go.mod index f4c0b564951ca..93f3a9b156bf4 100644 --- a/exporter/stefexporter/go.mod +++ b/exporter/stefexporter/go.mod @@ -1,19 +1,30 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stefexporter -go 1.22.0 +go 1.22.7 + +toolchain go1.23.2 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.118.0 + github.com/splunk/stef/go/grpc v0.0.2 + github.com/splunk/stef/go/otel v0.0.2 + github.com/splunk/stef/go/pdata v0.0.2 + github.com/splunk/stef/go/pkg v0.0.2 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.119.0 go.opentelemetry.io/collector/component/componenttest v0.119.0 go.opentelemetry.io/collector/config/configgrpc v0.119.0 + go.opentelemetry.io/collector/config/configretry v1.25.0 + go.opentelemetry.io/collector/config/configtls v1.25.0 go.opentelemetry.io/collector/confmap v1.25.0 go.opentelemetry.io/collector/consumer v1.25.0 go.opentelemetry.io/collector/exporter v0.119.0 go.opentelemetry.io/collector/exporter/exportertest v0.119.0 go.opentelemetry.io/collector/pdata v1.25.0 + go.opentelemetry.io/collector/pdata/testdata v0.119.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + google.golang.org/grpc v1.70.0 ) require ( @@ -44,9 +55,7 @@ require ( go.opentelemetry.io/collector/config/configcompression v1.25.0 // indirect go.opentelemetry.io/collector/config/confignet v1.25.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.25.0 // indirect - go.opentelemetry.io/collector/config/configretry v1.25.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.119.0 // indirect - go.opentelemetry.io/collector/config/configtls v1.25.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.119.0 // indirect go.opentelemetry.io/collector/consumer/consumertest v0.119.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.119.0 // indirect @@ -71,7 +80,9 @@ require ( golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect - google.golang.org/grpc v1.70.0 // indirect google.golang.org/protobuf v1.36.4 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + modernc.org/b/v2 v2.1.0 // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/common => ../../internal/common diff --git a/exporter/stefexporter/go.sum b/exporter/stefexporter/go.sum index e438149944acb..5797fd3487a5e 100644 --- a/exporter/stefexporter/go.sum +++ b/exporter/stefexporter/go.sum @@ -54,8 +54,18 @@ github.com/mostynb/go-grpc-compression v1.2.3 h1:42/BKWMy0KEJGSdWvzqIyOZ95YcR9mL github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1Vjs47Km/Y2FE6ouQ7Lg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/splunk/stef/go/grpc v0.0.2 h1:dJxtRd5QujDDxhPMjLOukOPjlvhP9fJpLF5PezgaMKo= +github.com/splunk/stef/go/grpc v0.0.2/go.mod h1:GLZft2JkTHClZBkXIbxpcCLCgDB2epvVCEvfYVDmIOI= +github.com/splunk/stef/go/otel v0.0.2 h1:3Amn3EOa6lQUxjUCbZgmg/CNAUNe++XoQ9StgW32ITE= +github.com/splunk/stef/go/otel v0.0.2/go.mod h1:Fo971Tq4nAOOCilchQyMK4ZsdnW4QG+360FtlFufGXw= +github.com/splunk/stef/go/pdata v0.0.2 h1:MXeuAmHrxLqdCrcm3walwi6+Lu9N9Mx5V/rsq6CeT9s= +github.com/splunk/stef/go/pdata v0.0.2/go.mod h1:j0dgdl/Mc3kC1Xqg5/+ca3m10OPW4plvYv7qOFdXKYE= +github.com/splunk/stef/go/pkg v0.0.2 h1:UIOrx6MnYBURkM+5yX9iXN5sSTJbKGkxdo2FDYUj2vs= +github.com/splunk/stef/go/pkg v0.0.2/go.mod h1:eDMc/KOCPUv5ClCiF6Jcw8sDueYouDujMKhQoDbDtPw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= @@ -188,3 +198,9 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/b/v2 v2.1.0 h1:kMD/G43EYnsFJI/0qK1F1X659XlSs41bp01MUDidHC0= +modernc.org/b/v2 v2.1.0/go.mod h1:fQhHWDXrchyUSLjQYCslV/4uw04PW1LeiZ25D4SNmeo= +modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ= +modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY= +modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= diff --git a/exporter/stefexporter/internal/cancellablewait.go b/exporter/stefexporter/internal/cancellablewait.go new file mode 100644 index 0000000000000..0b638f1af600f --- /dev/null +++ b/exporter/stefexporter/internal/cancellablewait.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stefexporter/internal" + +import ( + "context" + "sync" +) + +type CancellableCond struct { + Cond sync.Cond +} + +func NewCancellableCond() *CancellableCond { + c := &CancellableCond{} + c.Cond = sync.Cond{L: &sync.Mutex{}} + return c +} + +// Wait waits untilCondition() to be true or until ctx is cancelled. +// untilCondition() is always called while the Cond.L lock associated with this +// CancellableCond is locked. +// Implementation borrowed from https://pkg.go.dev/context#AfterFunc +// Returns nil if condition is satisfied or error if ctx is cancelled. +func (c *CancellableCond) Wait(ctx context.Context, untilCondition func() bool) error { + c.Cond.L.Lock() + defer c.Cond.L.Unlock() + + stopByCtx := context.AfterFunc( + ctx, func() { + // We need to acquire Cond.L here to be sure that the Broadcast + // below won't occur before the call to Wait, which would result + // in a missed signal (and deadlock). + c.Cond.L.Lock() + defer c.Cond.L.Unlock() + + // If multiple goroutines are waiting on Cond simultaneously, + // we need to make sure we wake up exactly this one. + // That means that we need to Broadcast to all of the goroutines, + // which will wake them all up. + // + // If there are N concurrent calls to waitOnCond, each of the goroutines + // will spuriously wake up O(N) other goroutines that aren't ready yet, + // so this will cause the overall CPU cost to be O(N²). + c.Cond.Broadcast() + }, + ) + defer stopByCtx() + + for !untilCondition() { + c.Cond.Wait() + if ctx.Err() != nil { + return ctx.Err() + } + } + return nil +}