From 2e32ce686fc65ce490827679050747b62fe45f54 Mon Sep 17 00:00:00 2001 From: matt durham Date: Tue, 8 Oct 2024 15:12:54 -0400 Subject: [PATCH] Fix race conditions in unit tests. --- .../prometheus/remote/queue/e2e_stats_test.go | 14 +++ .../prometheus/remote/queue/e2e_test.go | 118 +++++++++++++----- 2 files changed, 103 insertions(+), 29 deletions(-) diff --git a/internal/component/prometheus/remote/queue/e2e_stats_test.go b/internal/component/prometheus/remote/queue/e2e_stats_test.go index 38cb36dd11..81cd26c75d 100644 --- a/internal/component/prometheus/remote/queue/e2e_stats_test.go +++ b/internal/component/prometheus/remote/queue/e2e_stats_test.go @@ -47,6 +47,8 @@ const alloyMetadataFailed = "alloy_queue_metadata_network_failed" const alloyMetadataRetried429 = "alloy_queue_metadata_network_retried_429" const alloyMetadataRetried = "alloy_queue_metadata_network_retried" +const alloyNetworkTimestamp = "alloy_queue_series_network_timestamp_seconds" + // TestMetadata is the large end to end testing for the queue based wal, specifically for metadata. func TestMetadata(t *testing.T) { // Check assumes you are checking for any value that is not 0. @@ -187,6 +189,10 @@ func TestMetrics(t *testing.T) { name: inTimestamp, valueFunc: isReasonableTimeStamp, }, + { + name: alloyNetworkTimestamp, + valueFunc: greaterThenZero, + }, }, }, { @@ -308,6 +314,10 @@ func TestMetrics(t *testing.T) { name: inTimestamp, valueFunc: isReasonableTimeStamp, }, + { + name: alloyNetworkTimestamp, + valueFunc: greaterThenZero, + }, }, }, { @@ -429,6 +439,10 @@ func TestMetrics(t *testing.T) { name: inTimestamp, valueFunc: isReasonableTimeStamp, }, + { + name: alloyNetworkTimestamp, + valueFunc: greaterThenZero, + }, }, }, { diff --git a/internal/component/prometheus/remote/queue/e2e_test.go b/internal/component/prometheus/remote/queue/e2e_test.go index 38dc96efb0..9704ff7f6d 100644 --- a/internal/component/prometheus/remote/queue/e2e_test.go +++ b/internal/component/prometheus/remote/queue/e2e_test.go @@ -32,8 +32,8 @@ func TestE2E(t *testing.T) { type e2eTest struct { name string maker func(index int, app storage.Appender) (float64, labels.Labels) - tester func(samples []prompb.TimeSeries) - testMeta func(samples []prompb.MetricMetadata) + tester func(samples *safeSlice[prompb.TimeSeries]) + testMeta func(samples *safeSlice[prompb.MetricMetadata]) } tests := []e2eTest{ { @@ -44,9 +44,10 @@ func TestE2E(t *testing.T) { require.NoError(t, errApp) return v, lbls }, - tester: func(samples []prompb.TimeSeries) { + tester: func(samples *safeSlice[prompb.TimeSeries]) { t.Helper() - for _, s := range samples { + for i := 0; i < samples.Len(); i++ { + s := samples.Get(i) require.True(t, len(s.Samples) == 1) require.True(t, s.Samples[0].Timestamp > 0) require.True(t, s.Samples[0].Value > 0) @@ -64,8 +65,9 @@ func TestE2E(t *testing.T) { require.NoError(t, errApp) return 0, lbls }, - testMeta: func(samples []prompb.MetricMetadata) { - for _, s := range samples { + testMeta: func(samples *safeSlice[prompb.MetricMetadata]) { + for i := 0; i < samples.Len(); i++ { + s := samples.Get(i) require.True(t, s.GetUnit() == "seconds") require.True(t, s.Help == "metadata help") require.True(t, s.Unit == "seconds") @@ -83,9 +85,10 @@ func TestE2E(t *testing.T) { require.NoError(t, errApp) return h.Sum, lbls }, - tester: func(samples []prompb.TimeSeries) { + tester: func(samples *safeSlice[prompb.TimeSeries]) { t.Helper() - for _, s := range samples { + for i := 0; i < samples.Len(); i++ { + s := samples.Get(i) require.True(t, len(s.Samples) == 1) require.True(t, s.Samples[0].Timestamp > 0) require.True(t, s.Samples[0].Value == 0) @@ -102,9 +105,10 @@ func TestE2E(t *testing.T) { require.NoError(t, errApp) return h.Sum, lbls }, - tester: func(samples []prompb.TimeSeries) { + tester: func(samples *safeSlice[prompb.TimeSeries]) { t.Helper() - for _, s := range samples { + for i := 0; i < samples.Len(); i++ { + s := samples.Get(i) require.True(t, len(s.Samples) == 1) require.True(t, s.Samples[0].Timestamp > 0) require.True(t, s.Samples[0].Value == 0) @@ -122,23 +126,23 @@ func TestE2E(t *testing.T) { } const ( - iterations = 100 + iterations = 10 items = 10_000 ) -func runTest(t *testing.T, add func(index int, appendable storage.Appender) (float64, labels.Labels), test func(samples []prompb.TimeSeries), metaTest func(meta []prompb.MetricMetadata)) { +func runTest(t *testing.T, add func(index int, appendable storage.Appender) (float64, labels.Labels), test func(samples *safeSlice[prompb.TimeSeries]), metaTest func(meta *safeSlice[prompb.MetricMetadata])) { l := util.TestAlloyLogger(t) done := make(chan struct{}) var series atomic.Int32 var meta atomic.Int32 - samples := make([]prompb.TimeSeries, 0) - metaSamples := make([]prompb.MetricMetadata, 0) + samples := newSafeSlice[prompb.TimeSeries]() + metaSamples := newSafeSlice[prompb.MetricMetadata]() srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { newSamples, newMetadata := handlePost(t, w, r) series.Add(int32(len(newSamples))) meta.Add(int32(len(newMetadata))) - samples = append(samples, newSamples...) - metaSamples = append(metaSamples, newMetadata...) + samples.AddSlice(newSamples) + metaSamples.AddSlice(newMetadata) if series.Load() == iterations*items { done <- struct{}{} } @@ -158,40 +162,43 @@ func runTest(t *testing.T, add func(index int, appendable storage.Appender) (flo // Wait for export to spin up. exp := <-expCh - index := 0 - results := make(map[float64]labels.Labels) - mut := sync.Mutex{} + index := atomic.NewInt64(0) + results := &safeMap{ + results: make(map[float64]labels.Labels), + } for i := 0; i < iterations; i++ { go func() { app := exp.Receiver.Appender(ctx) for j := 0; j < items; j++ { - index++ - v, lbl := add(index, app) - mut.Lock() - results[v] = lbl - mut.Unlock() + val := index.Add(1) + v, lbl := add(int(val), app) + results.Add(v, lbl) } require.NoError(t, app.Commit()) }() } // This is a weird use case to handle eventually. - tm := time.NewTimer(15 * time.Second) + // With race turned on this can take a long time. + tm := time.NewTimer(20 * time.Second) select { case <-done: case <-tm.C: + require.Truef(t, false, "failed to collect signals in the appropriate time") } cancel() - for _, s := range samples { + + for i := 0; i < samples.Len(); i++ { + s := samples.Get(i) if len(s.Histograms) == 1 { - lbls, ok := results[s.Histograms[0].Sum] + lbls, ok := results.Get(s.Histograms[0].Sum) require.True(t, ok) for i, sLbl := range s.Labels { require.True(t, lbls[i].Name == sLbl.Name) require.True(t, lbls[i].Value == sLbl.Value) } } else { - lbls, ok := results[s.Samples[0].Value] + lbls, ok := results.Get(s.Samples[0].Value) require.True(t, ok) for i, sLbl := range s.Labels { require.True(t, lbls[i].Name == sLbl.Name) @@ -204,7 +211,9 @@ func runTest(t *testing.T, add func(index int, appendable storage.Appender) (flo } else { metaTest(metaSamples) } - require.Truef(t, types.OutStandingTimeSeriesBinary.Load() == 0, "there are %d time series not collected", types.OutStandingTimeSeriesBinary.Load()) + require.Eventuallyf(t, func() bool { + return types.OutStandingTimeSeriesBinary.Load() == 0 + }, 2*time.Second, 100*time.Millisecond, "there are %d time series not collected", types.OutStandingTimeSeriesBinary.Load()) } func handlePost(t *testing.T, _ http.ResponseWriter, r *http.Request) ([]prompb.TimeSeries, []prompb.MetricMetadata) { @@ -362,3 +371,54 @@ func newComponent(t *testing.T, l *logging.Logger, url string, exp chan Exports, }}, }) } + +func newSafeSlice[T any]() *safeSlice[T] { + return &safeSlice[T]{slice: make([]T, 0)} +} + +type safeSlice[T any] struct { + slice []T + mut sync.Mutex +} + +func (s *safeSlice[T]) Add(v T) { + s.mut.Lock() + defer s.mut.Unlock() + s.slice = append(s.slice, v) +} + +func (s *safeSlice[T]) AddSlice(v []T) { + s.mut.Lock() + defer s.mut.Unlock() + s.slice = append(s.slice, v...) +} + +func (s *safeSlice[T]) Len() int { + s.mut.Lock() + defer s.mut.Unlock() + return len(s.slice) +} + +func (s *safeSlice[T]) Get(i int) T { + s.mut.Lock() + defer s.mut.Unlock() + return s.slice[i] +} + +type safeMap struct { + mut sync.Mutex + results map[float64]labels.Labels +} + +func (s *safeMap) Add(v float64, ls labels.Labels) { + s.mut.Lock() + defer s.mut.Unlock() + s.results[v] = ls +} + +func (s *safeMap) Get(v float64) (labels.Labels, bool) { + s.mut.Lock() + defer s.mut.Unlock() + res, ok := s.results[v] + return res, ok +}