Skip to content

Commit

Permalink
Fix map init
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanGuedes committed Feb 5, 2025
1 parent 5fe4325 commit edf28f0
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 15 deletions.
4 changes: 3 additions & 1 deletion pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func newPushStats() *Stats {
return &Stats{
LogLinesBytes: map[string]map[time.Duration]int64{},
StructuredMetadataBytes: map[string]map[time.Duration]int64{},
PolicyNumLines: map[string]int64{},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{},
}
}
Expand Down Expand Up @@ -201,6 +202,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
if _, ok := stats.StructuredMetadataBytes[policy]; !ok {
stats.StructuredMetadataBytes[policy] = make(map[time.Duration]int64)
}

stats.StructuredMetadataBytes[policy][retentionPeriodForUser] += int64(resourceAttributesAsStructuredMetadataSize)
totalBytesReceived += int64(resourceAttributesAsStructuredMetadataSize)

Expand Down Expand Up @@ -286,7 +288,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
totalBytesReceived += metadataSize
totalBytesReceived += int64(len(entry.Line))

stats.NumLines++
stats.PolicyNumLines[policy]++
if entry.Timestamp.After(stats.MostRecentEntryTimestamp) {
stats.MostRecentEntryTimestamp = entry.Timestamp
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/loghttp/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
NumLines: 1,
PolicyNumLines: map[string]int64{
"": 1,
},
LogLinesBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 9,
Expand Down Expand Up @@ -135,7 +137,9 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
NumLines: 1,
PolicyNumLines: map[string]int64{
"": 1,
},
LogLinesBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 9,
Expand Down Expand Up @@ -178,7 +182,9 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
NumLines: 1,
PolicyNumLines: map[string]int64{
"": 1,
},
LogLinesBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 9,
Expand Down Expand Up @@ -260,7 +266,9 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
NumLines: 2,
PolicyNumLines: map[string]int64{
"": 2,
},
LogLinesBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 26,
Expand Down Expand Up @@ -355,7 +363,9 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
NumLines: 2,
PolicyNumLines: map[string]int64{
"": 2,
},
LogLinesBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 26,
Expand Down Expand Up @@ -510,7 +520,9 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
expectedStats: Stats{
NumLines: 2,
PolicyNumLines: map[string]int64{
"": 2,
},
LogLinesBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 26,
Expand Down
16 changes: 10 additions & 6 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type PolicyWithRetentionWithBytes map[string]map[time.Duration]int64

type Stats struct {
Errs []error
NumLines int64
PolicyNumLines map[string]int64
LogLinesBytes PolicyWithRetentionWithBytes
StructuredMetadataBytes PolicyWithRetentionWithBytes
ResourceAndSourceMetadataLabels map[time.Duration]push.LabelsAdapter
Expand Down Expand Up @@ -156,11 +156,15 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
}
}

totalNumLines := int64(0)
// incrementing tenant metrics if we have a tenant.
if pushStats.NumLines != 0 && userID != "" {
linesIngested.WithLabelValues(userID, isAggregatedMetric, "").Add(float64(pushStats.NumLines))
for policy, numLines := range pushStats.PolicyNumLines {
if numLines != 0 && userID != "" {
linesIngested.WithLabelValues(userID, isAggregatedMetric, policy).Add(float64(numLines))
}
totalNumLines += numLines
}
linesReceivedStats.Inc(pushStats.NumLines)
linesReceivedStats.Inc(totalNumLines)

logValues := []interface{}{
"msg", "push request parsed",
Expand All @@ -169,7 +173,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
"contentEncoding", pushStats.ContentEncoding,
"bodySize", humanize.Bytes(uint64(pushStats.BodySize)),
"streams", len(req.Streams),
"entries", pushStats.NumLines,
"entries", totalNumLines,
"streamLabelsSize", humanize.Bytes(uint64(pushStats.StreamLabelsSize)),
"entriesSize", humanize.Bytes(uint64(entriesSize)),
"structuredMetadataSize", humanize.Bytes(uint64(structuredMetadataSize)),
Expand Down Expand Up @@ -307,7 +311,7 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
}

for _, e := range s.Entries {
pushStats.NumLines++
pushStats.PolicyNumLines[policy]++
entryLabelsSize := int64(util.StructuredMetadataSize(e.StructuredMetadata))
pushStats.LogLinesBytes[policy][retentionPeriod] += int64(len(e.Line))
pushStats.StructuredMetadataBytes[policy][retentionPeriod] += entryLabelsSize
Expand Down
6 changes: 4 additions & 2 deletions pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,18 @@ func TestParseRequest(t *testing.T) {
request.Header.Add("Content-Encoding", test.contentEncoding)
}

fakeLimits := &fakeLimits{enabled: test.enableServiceDiscovery}

tracker := NewMockTracker()
data, err := ParseRequest(
util_log.Logger,
"fake",
request,
nil,
&fakeLimits{enabled: test.enableServiceDiscovery},
fakeLimits,
ParseLokiRequest,
tracker,
nil,
fakeLimits.PolicyFor,
false,
)

Expand Down

0 comments on commit edf28f0

Please sign in to comment.