Skip to content

Commit

Permalink
Introduce policy to received_bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanGuedes committed Feb 3, 2025
1 parent ba0a6f7 commit 5324437
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 61 deletions.
3 changes: 3 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package distributor
import (
"time"

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/distributor/shardstreams"
"github.com/grafana/loki/v3/pkg/loghttp/push"
Expand Down Expand Up @@ -39,6 +41,7 @@ type Limits interface {
BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
EnforcedLabels(userID string) []string
PolicyFor(userID string, lbs labels.Labels) string

IngestionPartitionsTenantShardSize(userID string) int
}
23 changes: 15 additions & 8 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ const (

func newPushStats() *Stats {
return &Stats{
LogLinesBytes: map[time.Duration]int64{},
StructuredMetadataBytes: map[time.Duration]int64{},
LogLinesBytes: map[string]map[time.Duration]int64{},
StructuredMetadataBytes: map[string]map[time.Duration]int64{},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{},
}
}
Expand All @@ -50,7 +50,7 @@ func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRe
return nil, nil, err
}

req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats, logPushRequestStreams, logger)
req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats, logPushRequestStreams, logger, limits.PolicyFor)
return req, stats, nil
}

Expand Down Expand Up @@ -101,7 +101,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
return req.Logs(), nil
}

func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats, logPushRequestStreams bool, logger log.Logger) *logproto.PushRequest {
func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats, logPushRequestStreams bool, logger log.Logger, policyForResolver func(userID string, lbs labels.Labels) string) *logproto.PushRequest {
if ld.LogRecordCount() == 0 {
return &logproto.PushRequest{}
}
Expand Down Expand Up @@ -196,8 +196,12 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten

resourceAttributesAsStructuredMetadataSize := loki_util.StructuredMetadataSize(resourceAttributesAsStructuredMetadata)
retentionPeriodForUser := tenantsRetention.RetentionPeriodFor(userID, lbs)
policy := policyForResolver(userID, lbs)

stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(resourceAttributesAsStructuredMetadataSize)
if _, ok := stats.StructuredMetadataBytes[policy]; !ok {
stats.StructuredMetadataBytes[policy] = make(map[time.Duration]int64)
}
stats.StructuredMetadataBytes[policy][retentionPeriodForUser] += int64(resourceAttributesAsStructuredMetadataSize)
totalBytesReceived += int64(resourceAttributesAsStructuredMetadataSize)

stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser] = append(stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser], resourceAttributesAsStructuredMetadata...)
Expand Down Expand Up @@ -250,7 +254,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
}

scopeAttributesAsStructuredMetadataSize := loki_util.StructuredMetadataSize(scopeAttributesAsStructuredMetadata)
stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(scopeAttributesAsStructuredMetadataSize)
stats.StructuredMetadataBytes[policy][retentionPeriodForUser] += int64(scopeAttributesAsStructuredMetadataSize)
totalBytesReceived += int64(scopeAttributesAsStructuredMetadataSize)

stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser] = append(stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser], scopeAttributesAsStructuredMetadata...)
Expand All @@ -274,8 +278,11 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
pushRequestsByStream[labelsStr] = stream

metadataSize := int64(loki_util.StructuredMetadataSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize)
stats.StructuredMetadataBytes[retentionPeriodForUser] += metadataSize
stats.LogLinesBytes[retentionPeriodForUser] += int64(len(entry.Line))
stats.StructuredMetadataBytes[policy][retentionPeriodForUser] += metadataSize
if _, ok := stats.LogLinesBytes[policy]; !ok {
stats.LogLinesBytes[policy] = make(map[time.Duration]int64)
}
stats.LogLinesBytes[policy][retentionPeriodForUser] += int64(len(entry.Line))
totalBytesReceived += metadataSize
totalBytesReceived += int64(len(entry.Line))

Expand Down
87 changes: 59 additions & 28 deletions pkg/loghttp/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
expectedStats: Stats{
NumLines: 1,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 9,
LogLinesBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 9,
},
},
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 0,
StructuredMetadataBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 0,
},
},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{
time.Hour: nil,
Expand Down Expand Up @@ -132,11 +136,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
expectedStats: Stats{
NumLines: 1,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 9,
LogLinesBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 9,
},
},
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 0,
StructuredMetadataBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 0,
},
},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{
time.Hour: nil,
Expand Down Expand Up @@ -171,11 +179,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
expectedStats: Stats{
NumLines: 1,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 9,
LogLinesBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 9,
},
},
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 0,
StructuredMetadataBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 0,
},
},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{
time.Hour: nil,
Expand Down Expand Up @@ -249,11 +261,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
expectedStats: Stats{
NumLines: 2,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 26,
LogLinesBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 26,
},
},
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 37,
StructuredMetadataBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 37,
},
},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{
time.Hour: []push.LabelAdapter{
Expand Down Expand Up @@ -340,11 +356,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
expectedStats: Stats{
NumLines: 2,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 26,
LogLinesBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 26,
},
},
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 97,
StructuredMetadataBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 97,
},
},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{
time.Hour: []push.LabelAdapter{
Expand Down Expand Up @@ -491,11 +511,15 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
expectedStats: Stats{
NumLines: 2,
LogLinesBytes: map[time.Duration]int64{
time.Hour: 26,
LogLinesBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 26,
},
},
StructuredMetadataBytes: map[time.Duration]int64{
time.Hour: 113,
StructuredMetadataBytes: PolicyWithRetentionWithBytes{
"": {
time.Hour: 113,
},
},
ResourceAndSourceMetadataLabels: map[time.Duration]push.LabelsAdapter{
time.Hour: []push.LabelAdapter{
Expand Down Expand Up @@ -524,16 +548,23 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
stats,
false,
log.NewNopLogger(),
func(userID string, lbs labels.Labels) string {

Check warning on line 551 in pkg/loghttp/push/otlp_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'userID' seems to be unused, consider removing or renaming it as _ (revive)
return ""
},
)
require.Equal(t, tc.expectedPushRequest, *pushReq)
require.Equal(t, tc.expectedStats, *stats)

totalBytes := 0.0
for _, b := range stats.LogLinesBytes {
totalBytes += float64(b)
for _, policyMapping := range stats.LogLinesBytes {
for _, b := range policyMapping {
totalBytes += float64(b)
}
}
for _, b := range stats.StructuredMetadataBytes {
totalBytes += float64(b)
for _, policyMapping := range stats.StructuredMetadataBytes {
for _, b := range policyMapping {
totalBytes += float64(b)
}
}
require.Equal(t, totalBytes, tracker.Total(), "Total tracked bytes must equal total bytes of the stats.")
})
Expand Down
61 changes: 40 additions & 21 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ var (
Namespace: constants.Loki,
Name: "distributor_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant. Includes structured metadata bytes.",
}, []string{"tenant", "retention_hours", "aggregated_metric"})
}, []string{"tenant", "retention_hours", "aggregated_metric", "policy"})

structuredMetadataBytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_structured_metadata_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant for entries' structured metadata",
}, []string{"tenant", "retention_hours", "aggregated_metric"})
}, []string{"tenant", "retention_hours", "aggregated_metric", "policy"})
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_lines_received_total",
Help: "The total number of lines received per tenant",
}, []string{"tenant", "aggregated_metric"})
}, []string{"tenant", "aggregated_metric", "policy"})

bytesReceivedStats = analytics.NewCounter("distributor_bytes_received")
structuredMetadataBytesReceivedStats = analytics.NewCounter("distributor_structured_metadata_bytes_received")
Expand All @@ -73,6 +73,7 @@ type TenantsRetention interface {
type Limits interface {
OTLPConfig(userID string) OTLPConfig
DiscoverServiceName(userID string) []string
PolicyFor(userID string, lbs labels.Labels) string
}

type EmptyLimits struct{}
Expand All @@ -85,17 +86,23 @@ func (EmptyLimits) DiscoverServiceName(string) []string {
return nil
}

func (EmptyLimits) PolicyFor(userID string, lbs labels.Labels) string {

Check warning on line 89 in pkg/loghttp/push/push.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'userID' seems to be unused, consider removing or renaming it as _ (revive)
return ""
}

type (
RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error)
RequestParserWrapper func(inner RequestParser) RequestParser
ErrorWriter func(w http.ResponseWriter, error string, code int, logger log.Logger)
)

type PolicyWithRetentionWithBytes map[string]map[time.Duration]int64

type Stats struct {
Errs []error
NumLines int64
LogLinesBytes map[time.Duration]int64
StructuredMetadataBytes map[time.Duration]int64
LogLinesBytes PolicyWithRetentionWithBytes
StructuredMetadataBytes PolicyWithRetentionWithBytes
ResourceAndSourceMetadataLabels map[time.Duration]push.LabelsAdapter
StreamLabelsSize int64
MostRecentEntryTimestamp time.Time
Expand All @@ -122,28 +129,32 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete

isAggregatedMetric := fmt.Sprintf("%t", pushStats.IsAggregatedMetric)

for retentionPeriod, size := range pushStats.LogLinesBytes {
retentionHours := RetentionPeriodToString(retentionPeriod)
bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size))
bytesReceivedStats.Inc(size)
entriesSize += size
for policyName, retentionToSizeMapping := range pushStats.LogLinesBytes {
for retentionPeriod, size := range retentionToSizeMapping {
retentionHours := RetentionPeriodToString(retentionPeriod)
bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric, policyName).Add(float64(size))
bytesReceivedStats.Inc(size)
entriesSize += size
}
}

for retentionPeriod, size := range pushStats.StructuredMetadataBytes {
retentionHours := RetentionPeriodToString(retentionPeriod)
for policyName, retentionToSizeMapping := range pushStats.StructuredMetadataBytes {
for retentionPeriod, size := range retentionToSizeMapping {
retentionHours := RetentionPeriodToString(retentionPeriod)

structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size))
bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size))
bytesReceivedStats.Inc(size)
structuredMetadataBytesReceivedStats.Inc(size)
structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric, policyName).Add(float64(size))
bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric, policyName).Add(float64(size))
bytesReceivedStats.Inc(size)
structuredMetadataBytesReceivedStats.Inc(size)

entriesSize += size
structuredMetadataSize += size
entriesSize += size
structuredMetadataSize += size
}
}

// incrementing tenant metrics if we have a tenant.
if pushStats.NumLines != 0 && userID != "" {
linesIngested.WithLabelValues(userID, isAggregatedMetric).Add(float64(pushStats.NumLines))
linesIngested.WithLabelValues(userID, isAggregatedMetric, "").Add(float64(pushStats.NumLines))
}
linesReceivedStats.Inc(pushStats.NumLines)

Expand Down Expand Up @@ -282,12 +293,20 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs)
}
totalBytesReceived := int64(0)
policy := limits.PolicyFor(userID, lbs)

if _, ok := pushStats.LogLinesBytes[policy]; !ok {
pushStats.LogLinesBytes[policy] = make(map[time.Duration]int64)
}
if _, ok := pushStats.StructuredMetadataBytes[policy]; !ok {
pushStats.StructuredMetadataBytes[policy] = make(map[time.Duration]int64)
}

for _, e := range s.Entries {
pushStats.NumLines++
entryLabelsSize := int64(util.StructuredMetadataSize(e.StructuredMetadata))
pushStats.LogLinesBytes[retentionPeriod] += int64(len(e.Line))
pushStats.StructuredMetadataBytes[retentionPeriod] += entryLabelsSize
pushStats.LogLinesBytes[policy][retentionPeriod] += int64(len(e.Line))
pushStats.StructuredMetadataBytes[policy][retentionPeriod] += entryLabelsSize
totalBytesReceived += int64(len(e.Line))
totalBytesReceived += entryLabelsSize

Expand Down
Loading

0 comments on commit 5324437

Please sign in to comment.