From 3a1524d7de3bcd81c8fc62fda45af152271cc71a Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:59:10 +0000 Subject: [PATCH] Also fix in connector and exporter --- .../component/otelcol/connector/connector.go | 23 ++++++++++--------- .../component/otelcol/exporter/exporter.go | 23 ++++++++++--------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/internal/component/otelcol/connector/connector.go b/internal/component/otelcol/connector/connector.go index a0185ae0f..bb9b3a151 100644 --- a/internal/component/otelcol/connector/connector.go +++ b/internal/component/otelcol/connector/connector.go @@ -7,15 +7,6 @@ import ( "errors" "os" - "github.com/grafana/alloy/internal/build" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/component/otelcol" - otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" - "github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer" - "github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector" - "github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer" - "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" - "github.com/grafana/alloy/internal/util/zapadapter" "github.com/prometheus/client_golang/prometheus" otelcomponent "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" @@ -26,6 +17,16 @@ import ( otelmetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/sdk/metric" + + "github.com/grafana/alloy/internal/build" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer" + "github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector" + "github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer" + "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" + "github.com/grafana/alloy/internal/util/zapadapter" ) const ( @@ -94,7 +95,7 @@ var ( func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Connector, error) { ctx, cancel := context.WithCancel(context.Background()) - consumer := lazyconsumer.New(ctx) + consumer := lazyconsumer.NewPaused(ctx) // Create a lazy collector where metrics from the upstream component will be // forwarded. @@ -116,7 +117,7 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, } if err := p.Update(args); err != nil { diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index 90f182696..22a2b56d3 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -7,15 +7,6 @@ import ( "errors" "os" - "github.com/grafana/alloy/internal/build" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/component/otelcol" - otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" - "github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector" - "github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer" - "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" - "github.com/grafana/alloy/internal/component/otelcol/internal/views" - "github.com/grafana/alloy/internal/util/zapadapter" "github.com/prometheus/client_golang/prometheus" otelcomponent "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" @@ -26,6 +17,16 @@ import ( otelmetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/sdk/metric" + + "github.com/grafana/alloy/internal/build" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector" + "github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer" + "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" + "github.com/grafana/alloy/internal/component/otelcol/internal/views" + "github.com/grafana/alloy/internal/util/zapadapter" ) // Arguments is an extension of component.Arguments which contains necessary @@ -108,7 +109,7 @@ var ( func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignal) (*Exporter, error) { ctx, cancel := context.WithCancel(context.Background()) - consumer := lazyconsumer.New(ctx) + consumer := lazyconsumer.NewPaused(ctx) // Create a lazy collector where metrics from the upstream component will be // forwarded. @@ -130,7 +131,7 @@ func New(opts component.Options, f otelexporter.Factory, args Arguments, support factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, supportedSignals: supportedSignals,