Skip to content

Commit

Permalink
Add the policy blocking.
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanGuedes committed Feb 11, 2025
1 parent 698456c commit 9f9622d
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 22 deletions.
40 changes: 40 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,16 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}

if ok, until := d.ShouldBlockPolicy(lbs, tenantID); ok {
err := fmt.Errorf(validation.BlockedIngestionPolicyErrorMsg, tenantID, until.Format(time.RFC3339), d.validator.Limits.BlockIngestionPolicyStatusCode(tenantID, policy))
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.BlockedIngestionPolicy, tenantID, retentionHours, policy).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.BlockedIngestionPolicy, tenantID, retentionHours, policy).Add(float64(discardedBytes))
continue
}

n := 0
pushSize := 0
prevTs := stream.Entries[0].Timestamp
Expand Down Expand Up @@ -1293,3 +1303,33 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l
func (d *Distributor) HealthyInstancesCount() int {
return int(d.healthyInstancesCount.Load())
}

// ShouldBlockPolicy checks if ingestion should be blocked for the given labels based on their policy.
// It returns true if ingestion should be blocked.
func (d *Distributor) ShouldBlockPolicy(lbs labels.Labels, tenantID string) (bool, time.Time) {
// Get policy mappings for the tenant
mapping := d.validator.Limits.PoliciesStreamMapping(tenantID)
if mapping == nil {
// No policy mappings defined, don't block
return false, time.Time{}
}

// Get the policy for these labels
policy := mapping.PolicyFor(lbs)
if policy == "" {
// No specific policy, don't block
return false, time.Time{}
}

// Check if this policy is blocked in tenant configs
blockUntil := d.validator.Limits.BlockIngestionPolicyUntil(tenantID, policy)
if blockUntil.IsZero() {
return false, time.Time{}
}

if time.Now().Before(blockUntil) {
return true, blockUntil
}

return false, time.Time{}
}
114 changes: 112 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ import (

otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"

"github.com/grafana/loki/pkg/push"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
dskit_flagext "github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
Expand All @@ -38,6 +37,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/ingester/client"
loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push"
Expand Down Expand Up @@ -1677,6 +1677,116 @@ func TestDistributor_PushIngestionBlocked(t *testing.T) {
}
}

func TestDistributor_PushIngestionBlockedByPolicy(t *testing.T) {
now := time.Now()

for _, tc := range []struct {
name string
blockUntil map[string]time.Time
blockStatusCode map[string]int
policy string
labels string
expectError bool
expectedErrorMsg string
}{
{
name: "not blocked - no policy block configured",
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "not blocked - policy block expired",
blockUntil: map[string]time.Time{
"test-policy": now.Add(-1 * time.Hour),
},
blockStatusCode: map[string]int{
"test-policy": 429,
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - policy block active",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
blockStatusCode: map[string]int{
"test-policy": 429,
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), 429),
},
{
name: "not blocked - different policy",
blockUntil: map[string]time.Time{
"blocked-policy": now.Add(1 * time.Hour),
},
blockStatusCode: map[string]int{
"blocked-policy": 429,
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - custom status code",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
blockStatusCode: map[string]int{
"test-policy": 456,
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), 456),
},
} {
t.Run(tc.name, func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

// Configure policy mapping
limits.PolicyStreamMapping = validation.PolicyStreamMapping{
tc.policy: []*validation.PriorityStream{
{
Selector: tc.labels,
Priority: 1,
},
},
}

// Configure policy blocks
if tc.blockUntil != nil {
limits.BlockIngestionPolicyUntil = make(map[string]dskit_flagext.Time)
for policy, until := range tc.blockUntil {
limits.BlockIngestionPolicyUntil[policy] = dskit_flagext.Time(until)
}
}

if tc.blockStatusCode != nil {
limits.BlockIngestionPolicyStatusCode = tc.blockStatusCode
}

distributors, _ := prepare(t, 1, 5, limits, nil)
request := makeWriteRequestWithLabels(1, 1024, []string{tc.labels}, false, false, false)
response, err := distributors[0].Push(ctx, request)

if tc.expectError {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedErrorMsg)
} else {
require.NoError(t, err)
require.Equal(t, success, response)
}
})
}
}

func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation.Limits, factory func(addr string) (ring_client.PoolClient, error)) ([]*Distributor, []mockIngester) {
t.Helper()

Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Limits interface {

BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
BlockIngestionPolicyUntil(userID string, policy string) time.Time
BlockIngestionPolicyStatusCode(userID string, policy string) int
EnforcedLabels(userID string) []string

IngestionPartitionsTenantShardSize(userID string) int
Expand Down
36 changes: 32 additions & 4 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,12 @@ type Limits struct {
OTLPConfig push.OTLPConfig `yaml:"otlp_config" json:"otlp_config" doc:"description=OTLP log ingestion configurations"`
GlobalOTLPConfig push.GlobalOTLPConfig `yaml:"-" json:"-"`

BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"`
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`
EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"`
PolicyStreamMapping PolicyStreamMapping `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors with a priority. Experimental. Example:\n policy_stream_mapping: \n finance: \n - selector: '{namespace=\"prod\", container=\"billing\"}' \n priority: 2 \n ops: \n - selector: '{namespace=\"prod\", container=\"ops\"}' \n priority: 1 \n staging: \n - selector: '{namespace=\"staging\"}' \n priority: 1"`
BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"`
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`
BlockIngestionPolicyUntil map[string]dskit_flagext.Time `yaml:"block_ingestion_policy_until" json:"block_ingestion_policy_until"`
BlockIngestionPolicyStatusCode map[string]int `yaml:"block_ingestion_policy_status_code" json:"block_ingestion_policy_status_code"`
EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"`
PolicyStreamMapping PolicyStreamMapping `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors with a priority. Experimental. Example:\n policy_stream_mapping: \n finance: \n - selector: '{namespace=\"prod\", container=\"billing\"}' \n priority: 2 \n ops: \n - selector: '{namespace=\"prod\", container=\"ops\"}' \n priority: 1 \n staging: \n - selector: '{namespace=\"staging\"}' \n priority: 1"`

IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"`

Expand Down Expand Up @@ -1120,6 +1122,32 @@ func (o *Overrides) BlockIngestionStatusCode(userID string) int {
return o.getOverridesForUser(userID).BlockIngestionStatusCode
}

func (o *Overrides) BlockIngestionPolicyUntil(userID string, policy string) time.Time {
limits := o.getOverridesForUser(userID)
if limits == nil || limits.BlockIngestionPolicyUntil == nil {
return time.Time{} // Zero time means no blocking
}

if blockUntil, ok := limits.BlockIngestionPolicyUntil[policy]; ok {
return time.Time(blockUntil)
}
return time.Time{} // Zero time means no blocking
}

// BlockIngestionPolicyStatusCode returns the status code to use when blocking ingestion for a given policy.
func (o *Overrides) BlockIngestionPolicyStatusCode(userID string, policy string) int {
limits := o.getOverridesForUser(userID)
if limits == nil {
return defaultBlockedIngestionStatusCode
}

if statusCode, ok := limits.BlockIngestionPolicyStatusCode[policy]; ok {
return statusCode
}

return defaultBlockedIngestionStatusCode
}

func (o *Overrides) EnforcedLabels(userID string) []string {
return o.getOverridesForUser(userID).EnforcedLabels
}
Expand Down
44 changes: 28 additions & 16 deletions pkg/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

dskit_flagext "github.com/grafana/dskit/flagext"

"github.com/grafana/loki/v3/pkg/compactor/deletionmode"
"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/loghttp/push"
Expand Down Expand Up @@ -226,9 +228,11 @@ ruler_remote_write_headers:
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyStatusCode: map[string]int{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
{
Expand All @@ -247,9 +251,11 @@ ruler_remote_write_headers:
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyStatusCode: map[string]int{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
{
Expand All @@ -271,10 +277,12 @@ retention_stream:
},

// Rest from new defaults
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyStatusCode: map[string]int{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
{
Expand All @@ -296,9 +304,11 @@ reject_old_samples: true
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyStatusCode: map[string]int{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
{
Expand All @@ -321,9 +331,11 @@ query_timeout: 5m
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyStatusCode: map[string]int{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
} {
Expand Down
2 changes: 2 additions & 0 deletions pkg/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ const (
StructuredMetadataTooManyErrorMsg = "stream '%s' has too many structured metadata labels: '%d', limit: '%d'. Please see `limits_config.max_structured_metadata_entries_count` or contact your Loki administrator to increase it."
BlockedIngestion = "blocked_ingestion"
BlockedIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'"
BlockedIngestionPolicy = "blocked_ingestion_policy"
BlockedIngestionPolicyErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'"
MissingEnforcedLabels = "missing_enforced_labels"
MissingEnforcedLabelsErrorMsg = "missing required labels %s for user %s"
)
Expand Down

0 comments on commit 9f9622d

Please sign in to comment.