diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index 62098dac6d96f..38befee3b8f68 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -3,6 +3,8 @@ package distributor import ( "time" + "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/compactor/retention" "github.com/grafana/loki/v3/pkg/distributor/shardstreams" "github.com/grafana/loki/v3/pkg/loghttp/push" @@ -39,6 +41,7 @@ type Limits interface { BlockIngestionUntil(userID string) time.Time BlockIngestionStatusCode(userID string) int EnforcedLabels(userID string) []string + PolicyFor(userID string, lbs labels.Labels) string IngestionPartitionsTenantShardSize(userID string) int } diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index dbb4ec8349e63..a77209575ba34 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -37,8 +37,8 @@ const ( func newPushStats() *Stats { return &Stats{ - LogLinesBytes: map[time.Duration]int64{}, - StructuredMetadataBytes: map[time.Duration]int64{}, + LogLinesBytes: map[string]map[time.Duration]int64{}, + StructuredMetadataBytes: map[string]map[time.Duration]int64{}, ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{}, } } @@ -50,7 +50,7 @@ func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRe return nil, nil, err } - req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats, logPushRequestStreams, logger) + req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats, logPushRequestStreams, logger, limits.PolicyFor) return req, stats, nil } @@ -101,7 +101,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) { return req.Logs(), nil } -func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats, logPushRequestStreams bool, logger log.Logger) *logproto.PushRequest { +func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats, logPushRequestStreams bool, logger log.Logger, policyForResolver func(userID string, lbs labels.Labels) string) *logproto.PushRequest { if ld.LogRecordCount() == 0 { return &logproto.PushRequest{} } @@ -196,8 +196,12 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten resourceAttributesAsStructuredMetadataSize := loki_util.StructuredMetadataSize(resourceAttributesAsStructuredMetadata) retentionPeriodForUser := tenantsRetention.RetentionPeriodFor(userID, lbs) + policy := policyForResolver(userID, lbs) - stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(resourceAttributesAsStructuredMetadataSize) + if _, ok := stats.StructuredMetadataBytes[policy]; !ok { + stats.StructuredMetadataBytes[policy] = make(map[time.Duration]int64) + } + stats.StructuredMetadataBytes[policy][retentionPeriodForUser] += int64(resourceAttributesAsStructuredMetadataSize) totalBytesReceived += int64(resourceAttributesAsStructuredMetadataSize) stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser] = append(stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser], resourceAttributesAsStructuredMetadata...) @@ -250,7 +254,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten } scopeAttributesAsStructuredMetadataSize := loki_util.StructuredMetadataSize(scopeAttributesAsStructuredMetadata) - stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(scopeAttributesAsStructuredMetadataSize) + stats.StructuredMetadataBytes[policy][retentionPeriodForUser] += int64(scopeAttributesAsStructuredMetadataSize) totalBytesReceived += int64(scopeAttributesAsStructuredMetadataSize) stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser] = append(stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser], scopeAttributesAsStructuredMetadata...) @@ -274,8 +278,11 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten pushRequestsByStream[labelsStr] = stream metadataSize := int64(loki_util.StructuredMetadataSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize) - stats.StructuredMetadataBytes[retentionPeriodForUser] += metadataSize - stats.LogLinesBytes[retentionPeriodForUser] += int64(len(entry.Line)) + stats.StructuredMetadataBytes[policy][retentionPeriodForUser] += metadataSize + if _, ok := stats.LogLinesBytes[policy]; !ok { + stats.LogLinesBytes[policy] = make(map[time.Duration]int64) + } + stats.LogLinesBytes[policy][retentionPeriodForUser] += int64(len(entry.Line)) totalBytesReceived += metadataSize totalBytesReceived += int64(len(entry.Line)) diff --git a/pkg/loghttp/push/otlp_test.go b/pkg/loghttp/push/otlp_test.go index cbcf560bbcb10..4212845022bfe 100644 --- a/pkg/loghttp/push/otlp_test.go +++ b/pkg/loghttp/push/otlp_test.go @@ -93,11 +93,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, expectedStats: Stats{ NumLines: 1, - LogLinesBytes: map[time.Duration]int64{ - time.Hour: 9, + LogLinesBytes: PolicyWithRetentionWithBytes{ + "": { + time.Hour: 9, + }, }, - StructuredMetadataBytes: map[time.Duration]int64{ - time.Hour: 0, + StructuredMetadataBytes: PolicyWithRetentionWithBytes{ + "": { + time.Hour: 0, + }, }, ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{ time.Hour: nil, @@ -132,11 +136,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, expectedStats: Stats{ NumLines: 1, - LogLinesBytes: map[time.Duration]int64{ - time.Hour: 9, + LogLinesBytes: PolicyWithRetentionWithBytes{ + "": { + time.Hour: 9, + }, }, - StructuredMetadataBytes: map[time.Duration]int64{ - time.Hour: 0, + StructuredMetadataBytes: PolicyWithRetentionWithBytes{ + "": { + time.Hour: 0, + }, }, ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{ time.Hour: nil, @@ -171,11 +179,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, expectedStats: Stats{ NumLines: 1, - LogLinesBytes: map[time.Duration]int64{ - time.Hour: 9, + LogLinesBytes: PolicyWithRetentionWithBytes{ + "": { + time.Hour: 9, + }, }, - StructuredMetadataBytes: map[time.Duration]int64{ - time.Hour: 0, + StructuredMetadataBytes: PolicyWithRetentionWithBytes{ + "": { + time.Hour: 0, + }, }, ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{ time.Hour: nil, @@ -249,11 +261,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, expectedStats: Stats{ NumLines: 2, - LogLinesBytes: map[time.Duration]int64{ - time.Hour: 26, + LogLinesBytes: PolicyWithRetentionWithBytes{ + "": { + time.Hour: 26, + }, }, - StructuredMetadataBytes: map[time.Duration]int64{ - time.Hour: 37, + StructuredMetadataBytes: PolicyWithRetentionWithBytes{ + "": { + time.Hour: 37, + }, }, ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{ time.Hour: []push.LabelAdapter{ @@ -340,11 +356,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, expectedStats: Stats{ NumLines: 2, - LogLinesBytes: map[time.Duration]int64{ - time.Hour: 26, + LogLinesBytes: PolicyWithRetentionWithBytes{ + "": { + time.Hour: 26, + }, }, - StructuredMetadataBytes: map[time.Duration]int64{ - time.Hour: 97, + StructuredMetadataBytes: PolicyWithRetentionWithBytes{ + "": { + time.Hour: 97, + }, }, ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{ time.Hour: []push.LabelAdapter{ @@ -491,11 +511,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, expectedStats: Stats{ NumLines: 2, - LogLinesBytes: map[time.Duration]int64{ - time.Hour: 26, + LogLinesBytes: PolicyWithRetentionWithBytes{ + "": { + time.Hour: 26, + }, }, - StructuredMetadataBytes: map[time.Duration]int64{ - time.Hour: 113, + StructuredMetadataBytes: PolicyWithRetentionWithBytes{ + "": { + time.Hour: 113, + }, }, ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{ time.Hour: []push.LabelAdapter{ @@ -524,16 +548,23 @@ func TestOTLPToLokiPushRequest(t *testing.T) { stats, false, log.NewNopLogger(), + func(userID string, lbs labels.Labels) string { + return "" + }, ) require.Equal(t, tc.expectedPushRequest, *pushReq) require.Equal(t, tc.expectedStats, *stats) totalBytes := 0.0 - for _, b := range stats.LogLinesBytes { - totalBytes += float64(b) + for _, policyMapping := range stats.LogLinesBytes { + for _, b := range policyMapping { + totalBytes += float64(b) + } } - for _, b := range stats.StructuredMetadataBytes { - totalBytes += float64(b) + for _, policyMapping := range stats.StructuredMetadataBytes { + for _, b := range policyMapping { + totalBytes += float64(b) + } } require.Equal(t, totalBytes, tracker.Total(), "Total tracked bytes must equal total bytes of the stats.") }) diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 759e21f293ede..218db25f8c915 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -41,18 +41,18 @@ var ( Namespace: constants.Loki, Name: "distributor_bytes_received_total", Help: "The total number of uncompressed bytes received per tenant. Includes structured metadata bytes.", - }, []string{"tenant", "retention_hours", "aggregated_metric"}) + }, []string{"tenant", "retention_hours", "aggregated_metric", "policy"}) structuredMetadataBytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "distributor_structured_metadata_bytes_received_total", Help: "The total number of uncompressed bytes received per tenant for entries' structured metadata", - }, []string{"tenant", "retention_hours", "aggregated_metric"}) + }, []string{"tenant", "retention_hours", "aggregated_metric", "policy"}) linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "distributor_lines_received_total", Help: "The total number of lines received per tenant", - }, []string{"tenant", "aggregated_metric"}) + }, []string{"tenant", "aggregated_metric", "policy"}) bytesReceivedStats = analytics.NewCounter("distributor_bytes_received") structuredMetadataBytesReceivedStats = analytics.NewCounter("distributor_structured_metadata_bytes_received") @@ -73,6 +73,7 @@ type TenantsRetention interface { type Limits interface { OTLPConfig(userID string) OTLPConfig DiscoverServiceName(userID string) []string + PolicyFor(userID string, lbs labels.Labels) string } type EmptyLimits struct{} @@ -85,17 +86,23 @@ func (EmptyLimits) DiscoverServiceName(string) []string { return nil } +func (EmptyLimits) PolicyFor(userID string, lbs labels.Labels) string { + return "" +} + type ( RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) RequestParserWrapper func(inner RequestParser) RequestParser ErrorWriter func(w http.ResponseWriter, error string, code int, logger log.Logger) ) +type PolicyWithRetentionWithBytes map[string]map[time.Duration]int64 + type Stats struct { Errs []error NumLines int64 - LogLinesBytes map[time.Duration]int64 - StructuredMetadataBytes map[time.Duration]int64 + LogLinesBytes PolicyWithRetentionWithBytes + StructuredMetadataBytes PolicyWithRetentionWithBytes ResourceAndSourceMetadataLabels map[time.Duration]push.LabelsAdapter StreamLabelsSize int64 MostRecentEntryTimestamp time.Time @@ -122,28 +129,32 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete isAggregatedMetric := fmt.Sprintf("%t", pushStats.IsAggregatedMetric) - for retentionPeriod, size := range pushStats.LogLinesBytes { - retentionHours := RetentionPeriodToString(retentionPeriod) - bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size)) - bytesReceivedStats.Inc(size) - entriesSize += size + for policyName, retentionToSizeMapping := range pushStats.LogLinesBytes { + for retentionPeriod, size := range retentionToSizeMapping { + retentionHours := RetentionPeriodToString(retentionPeriod) + bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric, policyName).Add(float64(size)) + bytesReceivedStats.Inc(size) + entriesSize += size + } } - for retentionPeriod, size := range pushStats.StructuredMetadataBytes { - retentionHours := RetentionPeriodToString(retentionPeriod) + for policyName, retentionToSizeMapping := range pushStats.StructuredMetadataBytes { + for retentionPeriod, size := range retentionToSizeMapping { + retentionHours := RetentionPeriodToString(retentionPeriod) - structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size)) - bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size)) - bytesReceivedStats.Inc(size) - structuredMetadataBytesReceivedStats.Inc(size) + structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric, policyName).Add(float64(size)) + bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric, policyName).Add(float64(size)) + bytesReceivedStats.Inc(size) + structuredMetadataBytesReceivedStats.Inc(size) - entriesSize += size - structuredMetadataSize += size + entriesSize += size + structuredMetadataSize += size + } } // incrementing tenant metrics if we have a tenant. if pushStats.NumLines != 0 && userID != "" { - linesIngested.WithLabelValues(userID, isAggregatedMetric).Add(float64(pushStats.NumLines)) + linesIngested.WithLabelValues(userID, isAggregatedMetric, "").Add(float64(pushStats.NumLines)) } linesReceivedStats.Inc(pushStats.NumLines) @@ -282,12 +293,20 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs) } totalBytesReceived := int64(0) + policy := limits.PolicyFor(userID, lbs) + + if _, ok := pushStats.LogLinesBytes[policy]; !ok { + pushStats.LogLinesBytes[policy] = make(map[time.Duration]int64) + } + if _, ok := pushStats.StructuredMetadataBytes[policy]; !ok { + pushStats.StructuredMetadataBytes[policy] = make(map[time.Duration]int64) + } for _, e := range s.Entries { pushStats.NumLines++ entryLabelsSize := int64(util.StructuredMetadataSize(e.StructuredMetadata)) - pushStats.LogLinesBytes[retentionPeriod] += int64(len(e.Line)) - pushStats.StructuredMetadataBytes[retentionPeriod] += entryLabelsSize + pushStats.LogLinesBytes[policy][retentionPeriod] += int64(len(e.Line)) + pushStats.StructuredMetadataBytes[policy][retentionPeriod] += entryLabelsSize totalBytesReceived += int64(len(e.Line)) totalBytesReceived += entryLabelsSize diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index 54618eb3480cc..95c397f6891f8 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -287,10 +287,11 @@ func TestParseRequest(t *testing.T) { require.Equal(t, test.expectedBytes, bytesReceived) require.Equalf(t, tracker.Total(), float64(bytesReceived), "tracked usage bytes must equal bytes received metric") require.Equal(t, test.expectedLines, linesReceived) + policy := "" require.Equal( t, float64(test.expectedStructuredMetadataBytes), - testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric))), + testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric), policy)), ) require.Equal( t, @@ -300,6 +301,7 @@ func TestParseRequest(t *testing.T) { "fake", "", fmt.Sprintf("%t", test.aggregatedMetric), + policy, ), ), ) @@ -310,6 +312,7 @@ func TestParseRequest(t *testing.T) { linesIngested.WithLabelValues( "fake", fmt.Sprintf("%t", test.aggregatedMetric), + policy, ), ), ) @@ -321,9 +324,10 @@ func TestParseRequest(t *testing.T) { require.Equal(t, 0, structuredMetadataBytesReceived) require.Equal(t, 0, bytesReceived) require.Equal(t, 0, linesReceived) - require.Equal(t, float64(0), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric)))) - require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric)))) - require.Equal(t, float64(0), testutil.ToFloat64(linesIngested.WithLabelValues("fake", fmt.Sprintf("%t", test.aggregatedMetric)))) + policy := "" + require.Equal(t, float64(0), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric), policy))) + require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric), policy))) + require.Equal(t, float64(0), testutil.ToFloat64(linesIngested.WithLabelValues("fake", fmt.Sprintf("%t", test.aggregatedMetric), policy))) } }) } @@ -438,6 +442,10 @@ func (f *fakeLimits) OTLPConfig(_ string) OTLPConfig { return DefaultOTLPConfig(defaultGlobalOTLPConfig) } +func (f *fakeLimits) PolicyFor(_ string, _ labels.Labels) string { + return "" +} + func (f *fakeLimits) DiscoverServiceName(_ string) []string { if !f.enabled { return nil diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 51e74c587ca76..ed94c645253b7 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -1118,6 +1118,10 @@ func (o *Overrides) EnforcedLabels(userID string) []string { return o.getOverridesForUser(userID).EnforcedLabels } +func (o *Overrides) PolicyFor(userID string, lbs labels.Labels) string { + return o.getOverridesForUser(userID).PolicyStreamMapping.PolicyFor(lbs) +} + func (o *Overrides) PoliciesStreamMapping(userID string) PolicyStreamMapping { return o.getOverridesForUser(userID).PolicyStreamMapping }