diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index 9631b2578674..c0cecd928f7a 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -4,6 +4,7 @@ package process import ( "context" + "fmt" "os" "strings" "testing" @@ -490,143 +491,59 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) { } func TestMetricsStageRefresh(t *testing.T) { - ch := loki.NewLogsReceiver() - reg := prometheus.NewRegistry() + tester := newTester(t) + defer tester.stop() - stg := ` + forwardArgs := ` + // This will be filled later + forward_to = []` + + numLogsToSend := 3 + + cfgWithMetric := ` stage.metrics { metric.counter { name = "paulin_test" action = "inc" match_all = true } - } + }` + forwardArgs - // This will be filled later - forward_to = []` - var stagesCfg Arguments - err := river.Unmarshal([]byte(stg), &stagesCfg) - require.NoError(t, err) + cfgWithMetric_Metrics := ` + # HELP loki_process_custom_paulin_test + # TYPE loki_process_custom_paulin_test counter + loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + ` - // Create and run the component, so that it can process and forwards logs. - opts := component.Options{ - Logger: util.TestFlowLogger(t), - Registerer: reg, - OnStateChange: func(e component.Exports) {}, - } - args := Arguments{ - ForwardTo: []loki.LogsReceiver{ch}, - Stages: stagesCfg.Stages, - } + t.Run("config with a metric", func(t *testing.T) { + tester.updateAndTest(numLogsToSend, cfgWithMetric, + "", + fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend)) + }) - c, err := New(opts, args) - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go c.Run(ctx) - - // Send a log entry to the component's receiver. - ts := time.Now() - logEntry := loki.Entry{ - Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, - Entry: logproto.Entry{ - Timestamp: ts, - Line: logline, - }, - } - - c.receiver.Chan() <- logEntry - - wantLabelSet := model.LabelSet{ - "filename": "/var/log/pods/agent/agent/1.log", - "foo": "bar", - } - - for i := 0; i < 1; i++ { - select { - case logEntry := <-ch.Chan(): - require.True(t, ts.Equal(logEntry.Timestamp)) - require.Equal(t, logline, logEntry.Line) - require.Equal(t, wantLabelSet, logEntry.Labels) - case <-time.After(5 * time.Second): - require.FailNow(t, "failed waiting for log line") - } - } - - expectedMetrics := ` -# HELP loki_process_custom_paulin_test -# TYPE loki_process_custom_paulin_test counter -loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} 1 -` - - if err := testutil.GatherAndCompare(reg, - strings.NewReader(expectedMetrics)); err != nil { - t.Fatalf("mismatch metrics: %v", err) - } - - args1 := Arguments{ - ForwardTo: []loki.LogsReceiver{ch}, - Stages: stagesCfg.Stages, - } - c.Update(args1) - - // The component was "updated" with the same config. + // The component will be "updated" with the same config. // We expect the metric to stay the same, because the component should be smart enough to // know that the new config is the same as the old one and it should just keep running as it is. // If it resets the metric, this could cause issues with some users who have a sidecar "autoreloader" // which reloads the collector config every X seconds. // Those users wouldn't expect their metrics to be reset every time the config is reloaded. - if err := testutil.GatherAndCompare(reg, - strings.NewReader(expectedMetrics)); err != nil { - t.Fatalf("mismatch metrics: %v", err) - } + t.Run("config with the same metric", func(t *testing.T) { + tester.updateAndTest(numLogsToSend, cfgWithMetric, + fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend), + fmt.Sprintf(cfgWithMetric_Metrics, 2*numLogsToSend)) + }) // Use a config which has no metrics stage. // This should cause the metric to disappear. - stg2 := ` - // This will be filled later - forward_to = []` - - var stagesCfg2 Arguments - err = river.Unmarshal([]byte(stg2), &stagesCfg2) - require.NoError(t, err) - - args2 := Arguments{ - ForwardTo: []loki.LogsReceiver{ch}, - Stages: stagesCfg2.Stages, - } - - c.Update(args2) + cfgWithNoStages := forwardArgs - // Make sure there are no metrics - there is no metrics stage in the latest config. - if err := testutil.GatherAndCompare(reg, - strings.NewReader("")); err != nil { - t.Fatalf("mismatch metrics: %v", err) - } - - c.receiver.Chan() <- logEntry - - for i := 0; i < 1; i++ { - select { - case logEntry := <-ch.Chan(): - require.True(t, ts.Equal(logEntry.Timestamp)) - require.Equal(t, logline, logEntry.Line) - require.Equal(t, wantLabelSet, logEntry.Labels) - case <-time.After(5 * time.Second): - require.FailNow(t, "failed waiting for log line") - } - } - - // Make sure there are still no metrics after sending log entries. - if err := testutil.GatherAndCompare(reg, - strings.NewReader("")); err != nil { - t.Fatalf("mismatch metrics: %v", err) - } + tester.updateAndTest(numLogsToSend, cfgWithNoStages, "", "") // Use a config which has a metric with a different name, - // as well as a metric with the same name. - // Only the new metric should be visible. - stg3 := ` + // as well as a metric with the same name as the one in the previous config. + // We try having a metric with the same name as before so that we can see if there + // is some sort of double registration error for that metric. + cfgWithTwoMetrics := ` stage.metrics { metric.counter { name = "paulin_test_3" @@ -638,55 +555,119 @@ loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo=" action = "inc" match_all = true } + }` + forwardArgs + + expectedMetrics3 := ` + # HELP loki_process_custom_paulin_test_3 + # TYPE loki_process_custom_paulin_test_3 counter + loki_process_custom_paulin_test_3{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + # HELP loki_process_custom_paulin_test + # TYPE loki_process_custom_paulin_test counter + loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + ` + + tester.updateAndTest(numLogsToSend, cfgWithTwoMetrics, + "", + fmt.Sprintf(expectedMetrics3, numLogsToSend, numLogsToSend)) +} + +type tester struct { + t *testing.T + component *Component + registry *prometheus.Registry + cancelFunc context.CancelFunc + logReceiver loki.LogsReceiver + logTimestamp time.Time + logEntry loki.Entry + wantLabelSet model.LabelSet +} + +// Create the component, so that it can process and forward logs. +func newTester(t *testing.T) *tester { + reg := prometheus.NewRegistry() + + opts := component.Options{ + Logger: util.TestFlowLogger(t), + Registerer: reg, + OnStateChange: func(e component.Exports) {}, } - // This will be filled later - forward_to = []` - var stagesCfg3 Arguments - err = river.Unmarshal([]byte(stg3), &stagesCfg3) + initialCfg := `forward_to = []` + var args Arguments + err := river.Unmarshal([]byte(initialCfg), &args) + require.NoError(t, err) + + logReceiver := loki.NewLogsReceiver() + args.ForwardTo = []loki.LogsReceiver{logReceiver} + + c, err := New(opts, args) require.NoError(t, err) - args3 := Arguments{ - ForwardTo: []loki.LogsReceiver{ch}, - Stages: stagesCfg3.Stages, + ctx, cancel := context.WithCancel(context.Background()) + go c.Run(ctx) + + logTimestamp := time.Now() + + return &tester{ + t: t, + component: c, + registry: reg, + cancelFunc: cancel, + logReceiver: logReceiver, + logTimestamp: logTimestamp, + logEntry: loki.Entry{ + Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, + Entry: logproto.Entry{ + Timestamp: logTimestamp, + Line: logline, + }, + }, + wantLabelSet: model.LabelSet{ + "filename": "/var/log/pods/agent/agent/1.log", + "foo": "bar", + }, } - c.Update(args3) +} - // No logs have been sent since the last update. - // Therefore, the metric should not be visible. - if err := testutil.GatherAndCompare(reg, - strings.NewReader("")); err != nil { - t.Fatalf("mismatch metrics: %v", err) +func (t *tester) stop() { + t.cancelFunc() +} + +func (t *tester) updateAndTest(numLogsToSend int, cfg, expectedMetricsBeforeSendingLogs, expectedMetricsAfterSendingLogs string) { + var args Arguments + err := river.Unmarshal([]byte(cfg), &args) + require.NoError(t.t, err) + + args.ForwardTo = []loki.LogsReceiver{t.logReceiver} + + t.component.Update(args) + + // Check the component metrics. + if err := testutil.GatherAndCompare(t.registry, + strings.NewReader(expectedMetricsBeforeSendingLogs)); err != nil { + t.t.Fatalf("mismatch metrics: %v", err) } - // Send 3 log lines. - c.receiver.Chan() <- logEntry - c.receiver.Chan() <- logEntry - c.receiver.Chan() <- logEntry + // Send logs. + for i := 0; i < numLogsToSend; i++ { + t.component.receiver.Chan() <- t.logEntry + } - for i := 0; i < 3; i++ { + // Receive logs. + for i := 0; i < numLogsToSend; i++ { select { - case logEntry := <-ch.Chan(): - require.True(t, ts.Equal(logEntry.Timestamp)) - require.Equal(t, logline, logEntry.Line) - require.Equal(t, wantLabelSet, logEntry.Labels) + case logEntry := <-t.logReceiver.Chan(): + require.True(t.t, t.logTimestamp.Equal(logEntry.Timestamp)) + require.Equal(t.t, logline, logEntry.Line) + require.Equal(t.t, t.wantLabelSet, logEntry.Labels) case <-time.After(5 * time.Second): - require.FailNow(t, "failed waiting for log line") + require.FailNow(t.t, "failed waiting for log line") } } - // Expect the metric counter to be "3". - expectedMetrics3 := ` -# HELP loki_process_custom_paulin_test_3 -# TYPE loki_process_custom_paulin_test_3 counter -loki_process_custom_paulin_test_3{filename="/var/log/pods/agent/agent/1.log",foo="bar"} 3 -# HELP loki_process_custom_paulin_test -# TYPE loki_process_custom_paulin_test counter -loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} 3 -` - - if err := testutil.GatherAndCompare(reg, - strings.NewReader(expectedMetrics3)); err != nil { - t.Fatalf("mismatch metrics: %v", err) + // Check the component metrics. + if err := testutil.GatherAndCompare(t.registry, + strings.NewReader(expectedMetricsAfterSendingLogs)); err != nil { + t.t.Fatalf("mismatch metrics: %v", err) } }