Skip to content

Commit

Permalink
Refactor Flow unit test to make it more readable.
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Jul 15, 2024
1 parent 9eb4221 commit 9dd6c14
Showing 1 changed file with 133 additions and 152 deletions.
285 changes: 133 additions & 152 deletions internal/component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package process

import (
"context"
"fmt"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -490,143 +491,59 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) {
}

func TestMetricsStageRefresh(t *testing.T) {
ch := loki.NewLogsReceiver()
reg := prometheus.NewRegistry()
tester := newTester(t)
defer tester.stop()

stg := `
forwardArgs := `
// This will be filled later
forward_to = []`

numLogsToSend := 3

cfgWithMetric := `
stage.metrics {
metric.counter {
name = "paulin_test"
action = "inc"
match_all = true
}
}
}` + forwardArgs

// This will be filled later
forward_to = []`
var stagesCfg Arguments
err := river.Unmarshal([]byte(stg), &stagesCfg)
require.NoError(t, err)
cfgWithMetric_Metrics := `
# HELP loki_process_custom_paulin_test
# TYPE loki_process_custom_paulin_test counter
loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
`

// Create and run the component, so that it can process and forwards logs.
opts := component.Options{
Logger: util.TestFlowLogger(t),
Registerer: reg,
OnStateChange: func(e component.Exports) {},
}
args := Arguments{
ForwardTo: []loki.LogsReceiver{ch},
Stages: stagesCfg.Stages,
}
t.Run("config with a metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithMetric,
"",
fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend))
})

c, err := New(opts, args)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.Run(ctx)

// Send a log entry to the component's receiver.
ts := time.Now()
logEntry := loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Timestamp: ts,
Line: logline,
},
}

c.receiver.Chan() <- logEntry

wantLabelSet := model.LabelSet{
"filename": "/var/log/pods/agent/agent/1.log",
"foo": "bar",
}

for i := 0; i < 1; i++ {
select {
case logEntry := <-ch.Chan():
require.True(t, ts.Equal(logEntry.Timestamp))
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t, "failed waiting for log line")
}
}

expectedMetrics := `
# HELP loki_process_custom_paulin_test
# TYPE loki_process_custom_paulin_test counter
loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} 1
`

if err := testutil.GatherAndCompare(reg,
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

args1 := Arguments{
ForwardTo: []loki.LogsReceiver{ch},
Stages: stagesCfg.Stages,
}
c.Update(args1)

// The component was "updated" with the same config.
// The component will be "updated" with the same config.
// We expect the metric to stay the same, because the component should be smart enough to
// know that the new config is the same as the old one and it should just keep running as it is.
// If it resets the metric, this could cause issues with some users who have a sidecar "autoreloader"
// which reloads the collector config every X seconds.
// Those users wouldn't expect their metrics to be reset every time the config is reloaded.
if err := testutil.GatherAndCompare(reg,
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
t.Run("config with the same metric", func(t *testing.T) {
tester.updateAndTest(numLogsToSend, cfgWithMetric,
fmt.Sprintf(cfgWithMetric_Metrics, numLogsToSend),
fmt.Sprintf(cfgWithMetric_Metrics, 2*numLogsToSend))
})

// Use a config which has no metrics stage.
// This should cause the metric to disappear.
stg2 := `
// This will be filled later
forward_to = []`

var stagesCfg2 Arguments
err = river.Unmarshal([]byte(stg2), &stagesCfg2)
require.NoError(t, err)

args2 := Arguments{
ForwardTo: []loki.LogsReceiver{ch},
Stages: stagesCfg2.Stages,
}

c.Update(args2)
cfgWithNoStages := forwardArgs

// Make sure there are no metrics - there is no metrics stage in the latest config.
if err := testutil.GatherAndCompare(reg,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

c.receiver.Chan() <- logEntry

for i := 0; i < 1; i++ {
select {
case logEntry := <-ch.Chan():
require.True(t, ts.Equal(logEntry.Timestamp))
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t, "failed waiting for log line")
}
}

// Make sure there are still no metrics after sending log entries.
if err := testutil.GatherAndCompare(reg,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
tester.updateAndTest(numLogsToSend, cfgWithNoStages, "", "")

// Use a config which has a metric with a different name,
// as well as a metric with the same name.
// Only the new metric should be visible.
stg3 := `
// as well as a metric with the same name as the one in the previous config.
// We try having a metric with the same name as before so that we can see if there
// is some sort of double registration error for that metric.
cfgWithTwoMetrics := `
stage.metrics {
metric.counter {
name = "paulin_test_3"
Expand All @@ -638,55 +555,119 @@ loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="
action = "inc"
match_all = true
}
}` + forwardArgs

expectedMetrics3 := `
# HELP loki_process_custom_paulin_test_3
# TYPE loki_process_custom_paulin_test_3 counter
loki_process_custom_paulin_test_3{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
# HELP loki_process_custom_paulin_test
# TYPE loki_process_custom_paulin_test counter
loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
`

tester.updateAndTest(numLogsToSend, cfgWithTwoMetrics,
"",
fmt.Sprintf(expectedMetrics3, numLogsToSend, numLogsToSend))
}

type tester struct {
t *testing.T
component *Component
registry *prometheus.Registry
cancelFunc context.CancelFunc
logReceiver loki.LogsReceiver
logTimestamp time.Time
logEntry loki.Entry
wantLabelSet model.LabelSet
}

// Create the component, so that it can process and forward logs.
func newTester(t *testing.T) *tester {
reg := prometheus.NewRegistry()

opts := component.Options{
Logger: util.TestFlowLogger(t),
Registerer: reg,
OnStateChange: func(e component.Exports) {},
}

// This will be filled later
forward_to = []`
var stagesCfg3 Arguments
err = river.Unmarshal([]byte(stg3), &stagesCfg3)
initialCfg := `forward_to = []`
var args Arguments
err := river.Unmarshal([]byte(initialCfg), &args)
require.NoError(t, err)

logReceiver := loki.NewLogsReceiver()
args.ForwardTo = []loki.LogsReceiver{logReceiver}

c, err := New(opts, args)
require.NoError(t, err)

args3 := Arguments{
ForwardTo: []loki.LogsReceiver{ch},
Stages: stagesCfg3.Stages,
ctx, cancel := context.WithCancel(context.Background())
go c.Run(ctx)

logTimestamp := time.Now()

return &tester{
t: t,
component: c,
registry: reg,
cancelFunc: cancel,
logReceiver: logReceiver,
logTimestamp: logTimestamp,
logEntry: loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Timestamp: logTimestamp,
Line: logline,
},
},
wantLabelSet: model.LabelSet{
"filename": "/var/log/pods/agent/agent/1.log",
"foo": "bar",
},
}
c.Update(args3)
}

// No logs have been sent since the last update.
// Therefore, the metric should not be visible.
if err := testutil.GatherAndCompare(reg,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
func (t *tester) stop() {
t.cancelFunc()
}

func (t *tester) updateAndTest(numLogsToSend int, cfg, expectedMetricsBeforeSendingLogs, expectedMetricsAfterSendingLogs string) {
var args Arguments
err := river.Unmarshal([]byte(cfg), &args)
require.NoError(t.t, err)

args.ForwardTo = []loki.LogsReceiver{t.logReceiver}

t.component.Update(args)

// Check the component metrics.
if err := testutil.GatherAndCompare(t.registry,
strings.NewReader(expectedMetricsBeforeSendingLogs)); err != nil {
t.t.Fatalf("mismatch metrics: %v", err)
}

// Send 3 log lines.
c.receiver.Chan() <- logEntry
c.receiver.Chan() <- logEntry
c.receiver.Chan() <- logEntry
// Send logs.
for i := 0; i < numLogsToSend; i++ {
t.component.receiver.Chan() <- t.logEntry
}

for i := 0; i < 3; i++ {
// Receive logs.
for i := 0; i < numLogsToSend; i++ {
select {
case logEntry := <-ch.Chan():
require.True(t, ts.Equal(logEntry.Timestamp))
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case logEntry := <-t.logReceiver.Chan():
require.True(t.t, t.logTimestamp.Equal(logEntry.Timestamp))
require.Equal(t.t, logline, logEntry.Line)
require.Equal(t.t, t.wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t, "failed waiting for log line")
require.FailNow(t.t, "failed waiting for log line")
}
}

// Expect the metric counter to be "3".
expectedMetrics3 := `
# HELP loki_process_custom_paulin_test_3
# TYPE loki_process_custom_paulin_test_3 counter
loki_process_custom_paulin_test_3{filename="/var/log/pods/agent/agent/1.log",foo="bar"} 3
# HELP loki_process_custom_paulin_test
# TYPE loki_process_custom_paulin_test counter
loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} 3
`

if err := testutil.GatherAndCompare(reg,
strings.NewReader(expectedMetrics3)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
// Check the component metrics.
if err := testutil.GatherAndCompare(t.registry,
strings.NewReader(expectedMetricsAfterSendingLogs)); err != nil {
t.t.Fatalf("mismatch metrics: %v", err)
}
}

0 comments on commit 9dd6c14

Please sign in to comment.