Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge /db_main into /release #136

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d10c948
impl aggr label rewrite capability
yulong-db Feb 13, 2025
cb92a78
update to include log
yulong-db Feb 14, 2025
6e67e34
redo
yulong-db Feb 14, 2025
06f1420
fix tests
yulong-db Feb 14, 2025
5270be9
add unit tests
yulong-db Feb 14, 2025
bf81fcf
tidy mod
yulong-db Feb 14, 2025
7666479
better cli arg name
yulong-db Feb 14, 2025
7cf83c4
dont' use defaultbucket explicitly
yulong-db Feb 14, 2025
5645610
add copyright to fix linter
yulong-db Feb 14, 2025
cfd71f6
fix
yulong-db Feb 15, 2025
261d290
address comments
yulong-db Feb 18, 2025
d83dae6
[querier] Impl aggr label rewrite capability (#127)
yulong-db Feb 19, 2025
7179065
[ES-1365579] add visibility into compact progress
jnyi Feb 19, 2025
82448c1
Add hot reload for relabel configs in receiver
abhijith-db Jan 31, 2025
e6fcd04
Address review comments
abhijith-db Jan 31, 2025
d35a00c
receiver: Add hot reload for relabel configs (#124)
abhijith-db Feb 19, 2025
51a5c33
[ES-1365579] add visibility into compact progress (#130)
jnyi Feb 19, 2025
24ea739
Fix the empty metric name in log lines
hczhu-db Feb 27, 2025
cf11be9
Fix the empty metric name in log lines (#132)
hczhu-db Feb 28, 2025
39ef925
/debug/release-memory http routing
hczhu-db Feb 28, 2025
8c0f8fd
Add `/debug/release-memory` http routing to Thanos (#133)
hczhu-db Feb 28, 2025
03b8fd1
[ES-1365579] check if tracing is enabled for obj store lib
jnyi Feb 20, 2025
56ab43c
build(deps): bump actions/cache from 4.0.2 to 4.2.1 (#8122)
dependabot[bot] Mar 3, 2025
2e237b4
[ES-1365579] adding tracing and debug logs for syncMeta operation (#131)
jnyi Mar 3, 2025
ef7f386
group replica strategy unit tests
yuchen-db Mar 5, 2025
9f3d881
fix unit test
yuchen-db Mar 5, 2025
06b15a3
fix unit test
yuchen-db Mar 6, 2025
0950d01
Add unit tests for group replica response strategy (#135)
yuchen-db Mar 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ jobs:
with:
go-version: 1.23.x

- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-

- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
with:
path: .mdoxcache
key: ${{ runner.os }}-mdox-${{ hashFiles('docs/**/*.md', 'examples/**/*.md', 'mixin/**/*.md', '*.md') }}
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
with:
go-version: 1.23.x

- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
with:
path: |
~/.cache/go-build
Expand All @@ -84,7 +84,7 @@ jobs:
with:
go-version: 1.23.x

- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
with:
path: |
~/.cache/go-build
Expand All @@ -111,7 +111,7 @@ jobs:
with:
go-version: 1.23.x

- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
with:
path: |
~/.cache/go-build
Expand Down Expand Up @@ -160,7 +160,7 @@ jobs:
with:
go-version: 1.23.x

- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
with:
path: |
~/.cache/go-build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/react.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
with:
node-version: ${{ matrix.node }}

- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
- uses: actions/cache@0c907a75c2c80ebcb7f088228285e798b750cf8f # v4.2.1
with:
path: ~/.npm
key: ${{ runner.os }}-node-${{ hashFiles('**/package-lock.json') }}
Expand Down
45 changes: 33 additions & 12 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func runCompact(
) (rerr error) {
deleteDelay := time.Duration(conf.deleteDelay)
compactMetrics := newCompactMetrics(reg, deleteDelay)
progressRegistry := compact.NewProgressRegistry(reg, logger)
downsampleMetrics := newDownsampleMetrics(reg)

httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -325,6 +326,7 @@ func runCompact(

ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)
ctx = objstoretracing.ContextWithTracer(ctx, tracer) // objstore tracing uses a different tracer key in context.

defer func() {
if rerr != nil {
Expand Down Expand Up @@ -444,14 +446,16 @@ func runCompact(

var cleanMtx sync.Mutex
// TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex.
cleanPartialMarked := func() error {
cleanPartialMarked := func(progress *compact.Progress) error {
cleanMtx.Lock()
defer cleanMtx.Unlock()

defer progress.Idle()
progress.Set(compact.SyncMeta)
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "syncing metas")
}

progress.Set(compact.CleanBlocks)
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt, compactMetrics.partialUploadDeleteAttempts, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "cleaning marked blocks")
Expand All @@ -461,19 +465,23 @@ func runCompact(
return nil
}

compactMainFn := func() error {
compactMainFn := func(progress *compact.Progress) error {
defer progress.Idle()
// this should happen before any compaction to remove unnecessary process on backlogs beyond retention.
if len(retentionByTenant) != 0 && len(sy.Metas()) == 0 {
level.Info(logger).Log("msg", "sync before tenant retention due to no blocks")
progress.Set(compact.SyncMeta)
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before tenant retention")
}
}

progress.Set(compact.ApplyRetention)
if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, insBkt, sy.Metas(), retentionByTenant, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, metadata.TenantRetentionExpired)); err != nil {
return errors.Wrap(err, "retention by tenant failed")
}

if err := compactor.Compact(ctx); err != nil {
if err := compactor.Compact(ctx, progress); err != nil {
return errors.Wrap(err, "whole compaction error")
}

Expand All @@ -482,10 +490,11 @@ func runCompact(
// We run two passes of this to ensure that the 1h downsampling is generated
// for 5m downsamplings created in the first run.
level.Info(logger).Log("msg", "start first pass of downsampling")
progress.Set(compact.SyncMeta)
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}

progress.Set(compact.DownSampling)
filteredMetas := sy.Metas()
noDownsampleBlocks := noDownsampleMarkerFilter.NoDownsampleMarkedBlocks()
for ul := range noDownsampleBlocks {
Expand Down Expand Up @@ -514,6 +523,7 @@ func runCompact(
}

level.Info(logger).Log("msg", "start second pass of downsampling")
progress.Set(compact.SyncMeta)
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
Expand Down Expand Up @@ -547,27 +557,29 @@ func runCompact(
}

// TODO(bwplotka): Find a way to avoid syncing if no op was done.
progress.Set(compact.SyncMeta)
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before retention")
}

progress.Set(compact.ApplyRetention)
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, insBkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil {
return errors.Wrap(err, "retention failed")
}

return cleanPartialMarked()
return cleanPartialMarked(progress)
}

g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, insBkt, "bucket client")

if !conf.wait {
return compactMainFn()
return compactMainFn(progressRegistry.Get(compact.Main))
}

// --wait=true is specified.
return runutil.Repeat(conf.waitInterval, ctx.Done(), func() error {
err := compactMainFn()
err := compactMainFn(progressRegistry.Get(compact.Main))
if err == nil {
compactMetrics.iterations.Inc()
return nil
Expand Down Expand Up @@ -633,9 +645,11 @@ func runCompact(
// For /global state make sure to fetch periodically.
return runutil.Repeat(conf.blockViewerSyncBlockInterval, ctx.Done(), func() error {
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
progress := progressRegistry.Get(compact.Web)
defer progress.Idle()
iterCtx, iterCancel := context.WithTimeout(ctx, conf.blockViewerSyncBlockTimeout)
defer iterCancel()

progress.Set(compact.SyncMeta)
_, _, err := f.Fetch(iterCtx)
return err
})
Expand All @@ -650,7 +664,7 @@ func runCompact(
if conf.cleanupBlocksInterval > 0 {
g.Add(func() error {
return runutil.Repeat(conf.cleanupBlocksInterval, ctx.Done(), func() error {
err := cleanPartialMarked()
err := cleanPartialMarked(progressRegistry.Get(compact.Cleanup))
if err != nil && compact.IsRetryError(err) {
// The RetryError signals that we hit an retriable error (transient error, no connection).
// You should alert on this being triggered too frequently.
Expand Down Expand Up @@ -678,7 +692,9 @@ func runCompact(
}

return runutil.Repeat(conf.progressCalculateInterval, ctx.Done(), func() error {

progress := progressRegistry.Get(compact.Calculate)
defer progress.Idle()
progress.Set(compact.SyncMeta)
if err := sy.SyncMetas(ctx); err != nil {
// The RetryError signals that we hit an retriable error (transient error, no connection).
// You should alert on this being triggered too frequently.
Expand All @@ -693,29 +709,34 @@ func runCompact(
}

metas := sy.Metas()
progress.Set(compact.Grouping)
groups, err := grouper.Groups(metas)
if err != nil {
return errors.Wrapf(err, "could not group metadata for compaction")
}

progress.Set(compact.CalculateProgress)
if err = ps.ProgressCalculate(ctx, groups); err != nil {
return errors.Wrapf(err, "could not calculate compaction progress")
}

progress.Set(compact.Grouping)
retGroups, err := grouper.Groups(metas)
if err != nil {
return errors.Wrapf(err, "could not group metadata for retention")
}

progress.Set(compact.CalculateProgress)
if err = rs.ProgressCalculate(ctx, retGroups); err != nil {
return errors.Wrapf(err, "could not calculate retention progress")
}

if !conf.disableDownsampling {
progress.Set(compact.Grouping)
groups, err = grouper.Groups(metas)
if err != nil {
return errors.Wrapf(err, "could not group metadata into downsample groups")
}
progress.Set(compact.CalculateProgress)
if err := ds.ProgressCalculate(ctx, groups); err != nil {
return errors.Wrapf(err, "could not calculate downsampling progress")
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ func registerQuery(app *extkingpin.App) {
enforceTenancy := cmd.Flag("query.enforce-tenancy", "Enforce tenancy on Query APIs. Responses are returned only if the label value of the configured tenant-label-name and the value of the tenant header matches.").Default("false").Bool()
tenantLabel := cmd.Flag("query.tenant-label-name", "Label name to use when enforcing tenancy (if --query.enforce-tenancy is enabled).").Default(tenancy.DefaultTenantLabel).String()

rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for __rollup__ label for aggregated metrics. If set to x, all queries on aggregated metrics will have a __rollup__=x matcher. Leave empty to disable this behavior. Default is empty.").Default("").String()

var storeRateLimits store.SeriesSelectLimits
storeRateLimits.RegisterFlags(cmd)

Expand Down Expand Up @@ -384,6 +386,7 @@ func registerQuery(app *extkingpin.App) {
*enforceTenancy,
*tenantLabel,
*enableGroupReplicaPartialStrategy,
*rewriteAggregationLabelTo,
)
})
}
Expand Down Expand Up @@ -468,6 +471,7 @@ func runQuery(
enforceTenancy bool,
tenantLabel string,
groupReplicaPartialResponseStrategy bool,
rewriteAggregationLabelTo string,
) error {
comp := component.Query
if alertQueryURL == "" {
Expand Down Expand Up @@ -597,6 +601,7 @@ func runQuery(
opts := query.Options{
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
DeduplicationFunc: queryDeduplicationFunc,
RewriteAggregationLabelTo: rewriteAggregationLabelTo,
}
level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts))
queryableCreator = query.NewQueryableCreatorWithOptions(
Expand Down
73 changes: 44 additions & 29 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
Expand Down Expand Up @@ -231,14 +229,11 @@ func runReceive(
return errors.Wrapf(err, "migrate legacy storage in %v to default tenant %v", conf.dataDir, conf.defaultTenantID)
}

relabelContentYaml, err := conf.relabelConfigPath.Content()
relabeller, err := receive.NewRelabeller(conf.relabelConfigPath, reg, logger, conf.relabelConfigReloadTimer)

if err != nil {
return errors.Wrap(err, "get content of relabel configuration")
}
var relabelConfig []*relabel.Config
if err := yaml.Unmarshal(relabelContentYaml, &relabelConfig); err != nil {
return errors.Wrap(err, "parse relabel configuration")
}

dbs := receive.NewMultiTSDB(
conf.dataDir,
Expand Down Expand Up @@ -286,30 +281,47 @@ func runReceive(
}

webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
SplitTenantLabelName: conf.splitTenantLabelName,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
MaxBackoff: time.Duration(*conf.maxBackoff),
TSDBStats: dbs,
Limiter: limiter,

Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
Relabeller: relabeller,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
SplitTenantLabelName: conf.splitTenantLabelName,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
MaxBackoff: time.Duration(*conf.maxBackoff),
TSDBStats: dbs,
Limiter: limiter,
AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
ReplicationProtocol: receive.ReplicationProtocol(conf.replicationProtocol),
})

{
if relabeller.CanReload() {
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
level.Debug(logger).Log("msg", "relabel config initialized with file watcher.")
if err := relabeller.StartConfigReloader(ctx); err != nil {
level.Error(logger).Log("msg", "initializing relabel config reloading.", "err", err)
return err
}
level.Info(logger).Log("msg", "relabel config reloading initialized.")
<-ctx.Done()
return nil
}, func(error) {
cancel()
})
}
}

grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
Expand Down Expand Up @@ -974,8 +986,9 @@ type receiveConfig struct {
ignoreBlockSize bool
allowOutOfOrderUpload bool

reqLogConfig *extflag.PathOrContent
relabelConfigPath *extflag.PathOrContent
reqLogConfig *extflag.PathOrContent
relabelConfigPath *extflag.PathOrContent
relabelConfigReloadTimer time.Duration

writeLimitsConfig *extflag.PathOrContent
storeRateLimits store.SeriesSelectLimits
Expand Down Expand Up @@ -1073,6 +1086,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.maxBackoff = extkingpin.ModelDuration(cmd.Flag("receive-forward-max-backoff", "Maximum backoff for each forward fan-out request").Default("5s").Hidden())

rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabeling configuration.", extflag.WithEnvSubstitution())
cmd.Flag("receive.relabel-config-reload-timer", "Minimum amount of time to pass for the relabel configuration to be reloaded. Helps to avoid excessive reloads.").
Default("0s").Hidden().DurationVar(&rc.relabelConfigReloadTimer)

rc.tsdbMinBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden())

Expand Down
Loading
Loading