Skip to content

Commit

Permalink
Merge branch 'log-6577' of github.com:JoaoBraveCoding/loki into log-6577
Browse files Browse the repository at this point in the history
  • Loading branch information
JoaoBraveCoding committed Jan 31, 2025
2 parents cb89029 + ae5302b commit b6fea18
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 18 deletions.
6 changes: 6 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3432,6 +3432,12 @@ discover_generic_fields:
# CLI flag: -validation.log-level-fields
[log_level_fields: <list of strings> | default = [level LEVEL Level Severity severity SEVERITY lvl LVL Lvl]]

# Maximum depth to search for log level fields in JSON logs. A value of 0 or
# less means unlimited depth. Default is 2 which searches the first 2 levels of
# the JSON object.
# CLI flag: -validation.log-level-from-json-max-depth
[log_level_from_json_max_depth: <int> | default = 2]

# When true an ingester takes into account only the streams that it owns
# according to the ring while applying the stream limit.
# CLI flag: -ingester.use-owned-stream-count
Expand Down
3 changes: 2 additions & 1 deletion operator/internal/manifests/openshift/alertingrule.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
func AlertingRuleTenantLabels(ar *lokiv1.AlertingRule) {
switch ar.Spec.TenantID {
case tenantApplication:
labels := map[string]string{
appendAlertingRuleLabels(ar, map[string]string{
opaDefaultLabelMatchers: ar.Namespace,
ocpMonitoringGroupByLabel: ar.Namespace,
}

Check failure on line 15 in operator/internal/manifests/openshift/alertingrule.go

View workflow job for this annotation

GitHub Actions / Build Broker (1.22)

missing ',' before newline in argument list

Check failure on line 15 in operator/internal/manifests/openshift/alertingrule.go

View workflow job for this annotation

GitHub Actions / Build Manager (1.22)

missing ',' before newline in argument list

Check failure on line 15 in operator/internal/manifests/openshift/alertingrule.go

View workflow job for this annotation

GitHub Actions / scorecard (1.22)

missing ',' before newline in argument list
labelMatchers := strings.Split(opaDefaultLabelMatchers, ",")

Check failure on line 16 in operator/internal/manifests/openshift/alertingrule.go

View workflow job for this annotation

GitHub Actions / Build Broker (1.22)

missing ',' in argument list

Check failure on line 16 in operator/internal/manifests/openshift/alertingrule.go

View workflow job for this annotation

GitHub Actions / Build Broker (1.22)

missing ',' before newline in argument list

Check failure on line 16 in operator/internal/manifests/openshift/alertingrule.go

View workflow job for this annotation

GitHub Actions / Build Manager (1.22)

missing ',' in argument list

Check failure on line 16 in operator/internal/manifests/openshift/alertingrule.go

View workflow job for this annotation

GitHub Actions / Build Manager (1.22)

missing ',' before newline in argument list

Check failure on line 16 in operator/internal/manifests/openshift/alertingrule.go

View workflow job for this annotation

GitHub Actions / scorecard (1.22)

missing ',' in argument list

Check failure on line 16 in operator/internal/manifests/openshift/alertingrule.go

View workflow job for this annotation

GitHub Actions / scorecard (1.22)

missing ',' before newline in argument list
Expand Down
3 changes: 2 additions & 1 deletion operator/internal/manifests/openshift/recordingrule.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
func RecordingRuleTenantLabels(r *lokiv1.RecordingRule) {
switch r.Spec.TenantID {
case tenantApplication:
labels := map[string]string{
appendRecordingRuleLabels(r, map[string]string{
opaDefaultLabelMatchers: r.Namespace,
ocpMonitoringGroupByLabel: r.Namespace,
}

Check failure on line 15 in operator/internal/manifests/openshift/recordingrule.go

View workflow job for this annotation

GitHub Actions / build (1.22)

missing ',' before newline in argument list
labelMatchers := strings.Split(opaDefaultLabelMatchers, ",")

Check failure on line 16 in operator/internal/manifests/openshift/recordingrule.go

View workflow job for this annotation

GitHub Actions / build (1.22)

missing ',' in argument list

Check failure on line 16 in operator/internal/manifests/openshift/recordingrule.go

View workflow job for this annotation

GitHub Actions / build (1.22)

missing ',' before newline in argument list
Expand Down
69 changes: 63 additions & 6 deletions pkg/distributor/field_detection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package distributor

import (
"bytes"
"errors"
"strconv"
"strings"
"unicode"
Expand Down Expand Up @@ -34,25 +35,50 @@ var (
critical = []byte("critical")
fatal = []byte("fatal")

defaultAllowedLevelFields = []string{"level", "LEVEL", "Level", "severity", "SEVERITY", "Severity", "lvl", "LVL", "Lvl"}
defaultAllowedLevelFields = []string{
"level",
"LEVEL",
"Level",
"log.level",
"severity",
"SEVERITY",
"Severity",
"SeverityText",
"lvl",
"LVL",
"Lvl",
}

errKeyFound = errors.New("key found")
)

func allowedLabelsForLevel(allowedFields []string) []string {
if len(allowedFields) == 0 {
return defaultAllowedLevelFields
}

return allowedFields
}

type FieldDetector struct {
validationContext validationContext
allowedLevelLabels []string
validationContext validationContext
allowedLevelLabelsMap map[string]struct{}
allowedLevelLabels []string
logLevelFromJSONMaxDepth int
}

func newFieldDetector(validationContext validationContext) *FieldDetector {
allowedLevelLabels := allowedLabelsForLevel(validationContext.logLevelFields)
allowedLevelLabelsMap := make(map[string]struct{}, len(allowedLevelLabels))
for _, field := range allowedLevelLabels {
allowedLevelLabelsMap[field] = struct{}{}
}

return &FieldDetector{
validationContext: validationContext,
allowedLevelLabels: allowedLabelsForLevel(validationContext.logLevelFields),
validationContext: validationContext,
allowedLevelLabelsMap: allowedLevelLabelsMap,
allowedLevelLabels: allowedLevelLabels,
logLevelFromJSONMaxDepth: validationContext.logLevelFromJSONMaxDepth,
}
}

Expand Down Expand Up @@ -154,7 +180,7 @@ func (l *FieldDetector) extractLogLevelFromLogLine(log string) string {
lineBytes := unsafe.Slice(unsafe.StringData(log), len(log))
var v []byte
if isJSON(log) {
v = getValueUsingJSONParser(lineBytes, l.allowedLevelLabels)
v = getLevelUsingJSONParser(lineBytes, l.allowedLevelLabelsMap, l.logLevelFromJSONMaxDepth)
} else if isLogFmt(lineBytes) {
v = getValueUsingLogfmtParser(lineBytes, l.allowedLevelLabels)
} else {
Expand Down Expand Up @@ -219,6 +245,37 @@ func getValueUsingJSONParser(line []byte, hints []string) []byte {
return res
}

func getLevelUsingJSONParser(line []byte, allowedLevelFields map[string]struct{}, maxDepth int) []byte {
var result []byte
var detectLevel func([]byte, int) error
detectLevel = func(data []byte, depth int) error {
// maxDepth <= 0 means no limit
if maxDepth > 0 && depth >= maxDepth {
return nil
}

return jsonparser.ObjectEach(data, func(key []byte, value []byte, dataType jsonparser.ValueType, _ int) error {
switch dataType {
case jsonparser.String:
if _, ok := allowedLevelFields[unsafe.String(unsafe.SliceData(key), len(key))]; ok {
result = value
// ErrKeyFound is used to stop parsing once we find the desired key
return errKeyFound
}
case jsonparser.Object:
if err := detectLevel(value, depth+1); err != nil {
return err
}
}

return nil
})
}

_ = detectLevel(line, 0)
return result
}

func isLogFmt(line []byte) bool {
equalIndex := bytes.Index(line, []byte("="))
if len(line) == 0 || equalIndex == -1 {
Expand Down
115 changes: 110 additions & 5 deletions pkg/distributor/field_detection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,18 +406,28 @@ func Benchmark_extractLogLevelFromLogLine(b *testing.B) {
}

func Benchmark_optParseExtractLogLevelFromLogLineJson(b *testing.B) {
logLine := `{"msg": "something" , "level": "error", "id": "1"}`

tests := map[string]string{
"level field at start": `{"level": "error", "field1": "value1", "field2": "value2", "field3": "value3", "field4": "value4", "field5": "value5", "field6": "value6", "field7": "value7", "field8": "value8", "field9": "value9"}`,
"level field in middle": `{"field1": "value1", "field2": "value2", "field3": "value3", "field4": "value4", "level": "error", "field5": "value5", "field6": "value6", "field7": "value7", "field8": "value8", "field9": "value9"}`,
"level field at end": `{"field1": "value1", "field2": "value2", "field3": "value3", "field4": "value4", "field5": "value5", "field6": "value6", "field7": "value7", "field8": "value8", "field9": "value9", "level": "error"}`,
"no level field": `{"field1": "value1", "field2": "value2", "field3": "value3", "field4": "value4", "field5": "value5", "field6": "value6", "field7": "value7", "field8": "value8", "field9": "value9"}`,
"nested level field": `{"metadata": {"level": "error"}, "field1": "value1", "field2": "value2", "field3": "value3", "field4": "value4", "field5": "value5", "field6": "value6", "field7": "value7", "field8": "value8", "field9": "value9"}`,
"deeply nested level field": `{"a": {"b": {"c": {"level": "error"}}}, "field1": "value1", "field2": "value2", "field3": "value3", "field4": "value4", "field5": "value5", "field6": "value6", "field7": "value7", "field8": "value8", "field9": "value9"}`,
}
ld := newFieldDetector(
validationContext{
discoverLogLevels: true,
allowStructuredMetadata: true,
logLevelFields: []string{"level", "LEVEL", "Level", "severity", "SEVERITY", "Severity", "lvl", "LVL", "Lvl"},
})

for i := 0; i < b.N; i++ {
level := ld.extractLogLevelFromLogLine(logLine)
require.Equal(b, constants.LogLevelError, level)
for name, logLine := range tests {
b.Run(name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = ld.extractLogLevelFromLogLine(logLine)
}
})
}
}

Expand Down Expand Up @@ -605,3 +615,98 @@ func Test_DetectGenericFields(t *testing.T) {
})
}
}

func TestGetLevelUsingJsonParser(t *testing.T) {
tests := []struct {
name string
json string
allowedLevelFields map[string]struct{}
maxDepth int
want string
}{
{
name: "simple top level field",
json: `{"level": "error"}`,
allowedLevelFields: map[string]struct{}{"level": {}},
want: "error",
},
{
name: "nested field one level deep",
json: `{"a": {"level": "info"}}`,
allowedLevelFields: map[string]struct{}{"level": {}},
want: "info",
},
{
name: "deeply nested field",
json: `{"a": {"b": {"c": {"level": "warn"}}}}`,
allowedLevelFields: map[string]struct{}{"level": {}},
want: "warn",
},
{
name: "multiple allowed fields picks first",
json: `{"severity": "error", "level": "info"}`,
allowedLevelFields: map[string]struct{}{"level": {}, "severity": {}},
want: "error",
},
{
name: "multiple nested fields picks first",
json: `{"a": {"level": "error"}, "b": {"level": "info"}}`,
allowedLevelFields: map[string]struct{}{"level": {}},
want: "error",
},
{
name: "array values are ignored",
json: `{"arr": [{"level": "debug"}], "level": "info"}`,
allowedLevelFields: map[string]struct{}{"level": {}},
want: "info",
},
{
name: "non-string values are ignored",
json: `{"level": 123, "severity": "warn"}`,
allowedLevelFields: map[string]struct{}{"level": {}, "severity": {}},
want: "warn",
},
{
name: "empty when no match",
json: `{"foo": "bar"}`,
allowedLevelFields: map[string]struct{}{"level": {}},
want: "",
},
{
name: "empty for invalid json",
json: `{"foo": "bar"`,
allowedLevelFields: map[string]struct{}{"level": {}},
want: "",
},
{
name: "custom field names",
json: `{"custom_level": "error", "log_severity": "warn"}`,
allowedLevelFields: map[string]struct{}{"custom_level": {}, "log_severity": {}},
want: "error",
},
// Adding depth-specific test cases
{
name: "depth limited - only top level",
json: `{"a": {"level": "debug"}, "level": "info"}`,
allowedLevelFields: map[string]struct{}{"level": {}},
maxDepth: 1,
want: "info",
},
{
name: "depth limited - no match",
json: `{"a": {"level": "debug"}}`,
allowedLevelFields: map[string]struct{}{"level": {}},
maxDepth: 1,
want: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getLevelUsingJSONParser([]byte(tt.json), tt.allowedLevelFields, tt.maxDepth)
if string(got) != tt.want {
t.Errorf("getLevelUsingJsonParser() = %v, want %v", string(got), tt.want)
}
})
}
}
1 change: 1 addition & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Limits interface {
DiscoverGenericFields(userID string) map[string][]string
DiscoverLogLevels(userID string) bool
LogLevelFields(userID string) []string
LogLevelFromJSONMaxDepth(userID string) int

ShardStreams(userID string) shardstreams.Config
IngestionRateStrategy() string
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type validationContext struct {
discoverGenericFields map[string][]string
discoverLogLevels bool
logLevelFields []string
logLevelFromJSONMaxDepth int

allowStructuredMetadata bool
maxStructuredMetadataSize int
Expand Down Expand Up @@ -79,6 +80,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
discoverServiceName: v.DiscoverServiceName(userID),
discoverLogLevels: v.DiscoverLogLevels(userID),
logLevelFields: v.LogLevelFields(userID),
logLevelFromJSONMaxDepth: v.LogLevelFromJSONMaxDepth(userID),
discoverGenericFields: v.DiscoverGenericFields(userID),
allowStructuredMetadata: v.AllowStructuredMetadata(userID),
maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID),
Expand Down
14 changes: 10 additions & 4 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ type Limits struct {
IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"`

// Metadata field extraction
DiscoverGenericFields FieldDetectorConfig `yaml:"discover_generic_fields" json:"discover_generic_fields" doc:"description=Experimental: Detect fields from stream labels, structured metadata, or json/logfmt formatted log line and put them into structured metadata of the log entry."`
DiscoverServiceName []string `yaml:"discover_service_name" json:"discover_service_name"`
DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"`
LogLevelFields []string `yaml:"log_level_fields" json:"log_level_fields"`
DiscoverGenericFields FieldDetectorConfig `yaml:"discover_generic_fields" json:"discover_generic_fields" doc:"description=Experimental: Detect fields from stream labels, structured metadata, or json/logfmt formatted log line and put them into structured metadata of the log entry."`
DiscoverServiceName []string `yaml:"discover_service_name" json:"discover_service_name"`
DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"`
LogLevelFields []string `yaml:"log_level_fields" json:"log_level_fields"`
LogLevelFromJSONMaxDepth int `yaml:"log_level_from_json_max_depth" json:"log_level_from_json_max_depth"`

// Ingester enforced limits.
UseOwnedStreamCount bool `yaml:"use_owned_stream_count" json:"use_owned_stream_count"`
Expand Down Expand Up @@ -297,6 +298,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&l.DiscoverLogLevels, "validation.discover-log-levels", true, "Discover and add log levels during ingestion, if not present already. Levels would be added to Structured Metadata with name level/LEVEL/Level/Severity/severity/SEVERITY/lvl/LVL/Lvl (case-sensitive) and one of the values from 'trace', 'debug', 'info', 'warn', 'error', 'critical', 'fatal' (case insensitive).")
l.LogLevelFields = []string{"level", "LEVEL", "Level", "Severity", "severity", "SEVERITY", "lvl", "LVL", "Lvl"}
f.Var((*dskit_flagext.StringSlice)(&l.LogLevelFields), "validation.log-level-fields", "Field name to use for log levels. If not set, log level would be detected based on pre-defined labels as mentioned above.")
f.IntVar(&l.LogLevelFromJSONMaxDepth, "validation.log-level-from-json-max-depth", 2, "Maximum depth to search for log level fields in JSON logs. A value of 0 or less means unlimited depth. Default is 2 which searches the first 2 levels of the JSON object.")

_ = l.RejectOldSamplesMaxAge.Set("7d")
f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Maximum accepted sample age before rejecting.")
Expand Down Expand Up @@ -1016,6 +1018,10 @@ func (o *Overrides) LogLevelFields(userID string) []string {
return o.getOverridesForUser(userID).LogLevelFields
}

func (o *Overrides) LogLevelFromJSONMaxDepth(userID string) int {
return o.getOverridesForUser(userID).LogLevelFromJSONMaxDepth
}

// VolumeEnabled returns whether volume endpoints are enabled for a user.
func (o *Overrides) VolumeEnabled(userID string) bool {
return o.getOverridesForUser(userID).VolumeEnabled
Expand Down
3 changes: 3 additions & 0 deletions production/ksonnet/loki/config.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@
ruler_enabled: false,

distributor: {
// use_no_constraints is false by default allowing either TopologySpreadConstraints or pod antiAffinity to be configured.
// If no_schedule_constraints is set to true, neither of the pod constraints will be applied.
no_schedule_constraints: false,
use_topology_spread: true,
topology_spread_max_skew: 1,
},
Expand Down
3 changes: 2 additions & 1 deletion production/ksonnet/loki/distributor.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ local k = import 'ksonnet-util/kausal.libsonnet';
) +
deployment.mixin.spec.strategy.rollingUpdate.withMaxSurge(5) +
deployment.mixin.spec.strategy.rollingUpdate.withMaxUnavailable(1) +
if $._config.distributor.use_topology_spread then
if $._config.distributor.no_schedule_constraints then {}
else if $._config.distributor.use_topology_spread then
deployment.spec.template.spec.withTopologySpreadConstraints(
// Evenly spread queriers among available nodes.
topologySpreadConstraints.labelSelector.withMatchLabels({ name: 'distributor' }) +
Expand Down

0 comments on commit b6fea18

Please sign in to comment.