forked from grafana/tempo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathencodings_test.go
129 lines (101 loc) · 4.33 KB
/
encodings_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package e2e
import (
"context"
"os"
"testing"
"time"
util2 "github.com/grafana/tempo/integration/util"
v2 "github.com/grafana/tempo/tempodb/encoding/v2"
"github.com/grafana/e2e"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
"github.com/grafana/tempo/cmd/tempo/app"
"github.com/grafana/tempo/integration/e2e/backend"
"github.com/grafana/tempo/integration/util"
"github.com/grafana/tempo/pkg/httpclient"
tempoUtil "github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb/encoding"
)
const (
configAllEncodings = "./config-encodings.tmpl.yaml"
)
func TestEncodings(t *testing.T) {
const repeatedSearchCount = 10
for _, enc := range encoding.AllEncodings() {
t.Run(enc.Version(), func(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()
// copy config template to shared directory and expand template variables
tmplConfig := map[string]any{"Version": enc.Version()}
config, err := util2.CopyTemplateToSharedDir(s, configAllEncodings, "config.yaml", tmplConfig)
require.NoError(t, err)
// load final config
var cfg app.Config
buff, err := os.ReadFile(config)
require.NoError(t, err)
err = yaml.UnmarshalStrict(buff, &cfg)
require.NoError(t, err)
// set up the backend
_, err = backend.New(s, cfg)
require.NoError(t, err)
tempo := util2.NewTempoAllInOne()
require.NoError(t, s.StartAndWaitReady(tempo))
// Get port for the Jaeger gRPC receiver endpoint
c, err := util2.NewJaegerGRPCClient(tempo.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, c)
info := tempoUtil.NewTraceInfo(time.Now(), "")
require.NoError(t, info.EmitAllBatches(c))
expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)
// test metrics
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(util.SpanCount(expected)), "tempo_distributor_spans_received_total"))
// test echo
util.AssertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo")
apiClient := httpclient.New("http://"+tempo.Endpoint(3200), "")
// query an in-memory trace
util.QueryAndAssertTrace(t, apiClient, info)
// wait trace_idle_time and ensure trace is created in ingester
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics))
// flush trace to backend
util.CallFlush(t, tempo)
// v2 does not support querying and must be skipped
if enc.Version() != v2.VersionString {
// search for trace in backend multiple times with different attributes to make sure
// we search with different scopes and with attributes from dedicated columns
for i := 0; i < repeatedSearchCount; i++ {
util2.SearchAndAssertTrace(t, apiClient, info)
util2.SearchTraceQLAndAssertTrace(t, apiClient, info)
}
}
// sleep
time.Sleep(10 * time.Second)
// force clear completed block
util.CallFlush(t, tempo)
// test metrics
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics))
if enc.Version() != v2.VersionString {
require.NoError(t, tempo.WaitSumMetrics(e2e.Greater(15), "tempo_query_frontend_queries_total"))
}
// query trace - should fetch from backend
util.QueryAndAssertTrace(t, apiClient, info)
// create grpc client used for streaming
grpcClient, err := util2.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200))
require.NoError(t, err)
if enc.Version() == v2.VersionString {
return // v2 does not support querying and must be skipped
}
// search for trace in backend multiple times with different attributes to make sure
// we search with different scopes and with attributes from dedicated columns
now := time.Now()
for i := 0; i < repeatedSearchCount; i++ {
// search the backend. this works b/c we're passing a start/end AND setting query ingesters within min/max to 0
util2.SearchAndAssertTraceBackend(t, apiClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())
// find the trace with streaming. using the http server b/c that's what Grafana will do
util2.SearchStreamAndAssertTrace(t, context.Background(), grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())
}
})
}
}