Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(v2): hedged object uploads #3952

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 82 additions & 31 deletions pkg/experiment/ingester/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/backoff"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/thanos-io/objstore"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/grafana/pyroscope/pkg/model"
pprofsplit "github.com/grafana/pyroscope/pkg/model/pprof_split"
pprofmodel "github.com/grafana/pyroscope/pkg/pprof"
"github.com/grafana/pyroscope/pkg/util/retry"
"github.com/grafana/pyroscope/pkg/validation"
)

Expand All @@ -52,8 +54,9 @@ type segmentsWriter struct {
ctx context.Context
cancel context.CancelFunc

metrics *segmentMetrics
headMetrics *memdb.HeadMetrics
metrics *segmentMetrics
headMetrics *memdb.HeadMetrics
retryLimiter *retry.RateLimiter
}

type shard struct {
Expand Down Expand Up @@ -129,6 +132,7 @@ func newSegmentWriter(l log.Logger, metrics *segmentMetrics, hm *memdb.HeadMetri
shards: make(map[shardKey]*shard),
metastore: metastoreClient,
}
sw.retryLimiter = retry.NewRateLimiter(sw.config.Upload.HedgeRateMax, int(sw.config.Upload.HedgeRateBurst))
sw.ctx, sw.cancel = context.WithCancel(context.Background())
flushWorkers := runtime.GOMAXPROCS(-1)
if config.FlushConcurrency > 0 {
Expand Down Expand Up @@ -236,12 +240,8 @@ func (s *segment) flush(ctx context.Context) (err error) {
if err = s.sw.uploadBlock(ctx, blockData, blockMeta, s); err != nil {
return fmt.Errorf("failed to upload block %s: %w", s.ulid.String(), err)
}
if err = s.sw.storeMeta(ctx, blockMeta, s); err != nil {
level.Error(s.logger).Log("msg", "failed to store meta in metastore", "err", err)
if dlqErr := s.sw.storeMetaDLQ(ctx, blockMeta, s); dlqErr != nil {
level.Error(s.logger).Log("msg", "metastore fallback failed", "err", dlqErr)
return fmt.Errorf("failed to store meta %s: %w", s.ulid.String(), dlqErr)
}
if err = s.sw.storeMetadata(ctx, blockMeta, s); err != nil {
return fmt.Errorf("failed to store meta %s: %w", s.ulid.String(), err)
}

return nil
Expand Down Expand Up @@ -567,47 +567,98 @@ func (s *segment) headForIngest(k datasetKey) *memdb.Head {
}

func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, meta *metastorev1.BlockMeta, s *segment) error {
t1 := time.Now()
uploadStart := time.Now()
var err error
defer func() {
sw.metrics.blockUploadDuration.WithLabelValues(s.sshard).Observe(time.Since(t1).Seconds())
sw.metrics.segmentUploadDuration.
WithLabelValues(statusLabelValue(err)).
Observe(time.Since(uploadStart).Seconds())
}()
sw.metrics.segmentBlockSizeBytes.WithLabelValues(s.sshard).Observe(float64(len(blockData)))

blockPath := block.ObjectPath(meta)
path := block.ObjectPath(meta)
sw.metrics.segmentSizeBytes.
WithLabelValues(s.sshard).
Observe(float64(len(blockData)))

// To mitigate tail latency issues, we use a hedged upload strategy:
// if the request is not completed within a certain time, we trigger
// a second upload attempt. Upload errors are retried explicitly and
// are included into the call duration.
uploadWithRetry := func(ctx context.Context, hedge bool) (any, error) {
retryConfig := backoff.Config{
MinBackoff: sw.config.Upload.MinBackoff,
MaxBackoff: sw.config.Upload.MaxBackoff,
MaxRetries: sw.config.Upload.MaxRetries,
}
var attemptErr error
if hedge {
// Hedged requests are not retried.
retryConfig.MaxRetries = 1
attemptStart := time.Now()
defer func() {
sw.metrics.segmentHedgedUploadDuration.
WithLabelValues(statusLabelValue(attemptErr)).
Observe(time.Since(attemptStart).Seconds())
}()
}
// Retry on all errors.
retries := backoff.New(ctx, retryConfig)
for retries.Ongoing() {
if attemptErr = sw.bucket.Upload(ctx, path, bytes.NewReader(blockData)); attemptErr == nil {
break
}
retries.Wait()
}
return nil, attemptErr
}

hedgedUpload := retry.Hedged[any]{
Call: uploadWithRetry,
Trigger: time.After(sw.config.Upload.HedgeUploadAfter),
Throttler: sw.retryLimiter,
FailFast: true,
}

if err := sw.bucket.Upload(ctx, blockPath, bytes.NewReader(blockData)); err != nil {
if _, err = hedgedUpload.Do(ctx); err != nil {
return err
}
sw.logger.Log("msg", "uploaded block", "path", blockPath, "upload_duration", time.Since(t1))

level.Debug(sw.logger).Log("msg", "uploaded block", "path", path, "upload_duration", time.Since(uploadStart))
return nil
}

func (sw *segmentsWriter) storeMeta(ctx context.Context, meta *metastorev1.BlockMeta, s *segment) error {
t1 := time.Now()
func (sw *segmentsWriter) storeMetadata(ctx context.Context, meta *metastorev1.BlockMeta, s *segment) error {
start := time.Now()
var err error
defer func() {
sw.metrics.storeMetaDuration.WithLabelValues(s.sshard).Observe(time.Since(t1).Seconds())
s.debuginfo.storeMetaDuration = time.Since(t1)
sw.metrics.storeMetadataDuration.
WithLabelValues(statusLabelValue(err)).
Observe(time.Since(start).Seconds())
s.debuginfo.storeMetaDuration = time.Since(start)
}()
_, err := sw.metastore.AddBlock(ctx, &metastorev1.AddBlockRequest{Block: meta})
if err != nil {
sw.metrics.storeMetaErrors.WithLabelValues(s.sshard).Inc()
if _, err = sw.metastore.AddBlock(ctx, &metastorev1.AddBlockRequest{Block: meta}); err == nil {
return nil
}

level.Error(s.logger).Log("msg", "failed to store meta in metastore", "err", err)
defer func() {
sw.metrics.storeMetadataDLQ.WithLabelValues(statusLabelValue(err)).Inc()
}()

if err = s.sw.storeMetadataDLQ(ctx, meta); err == nil {
return nil
}

level.Error(s.logger).Log("msg", "metastore fallback failed", "err", err)
return err
}

func (sw *segmentsWriter) storeMetaDLQ(ctx context.Context, meta *metastorev1.BlockMeta, s *segment) error {
metaBlob, err := meta.MarshalVT()
func (sw *segmentsWriter) storeMetadataDLQ(ctx context.Context, meta *metastorev1.BlockMeta) error {
metadataBytes, err := meta.MarshalVT()
if err != nil {
sw.metrics.storeMetaDLQ.WithLabelValues(s.sshard, "err").Inc()
return err
}
fullPath := block.MetadataDLQObjectPath(meta)
if err = sw.bucket.Upload(ctx, fullPath, bytes.NewReader(metaBlob)); err != nil {
sw.metrics.storeMetaDLQ.WithLabelValues(s.sshard, "err").Inc()
return fmt.Errorf("%w, %w", ErrMetastoreDLQFailed, err)
}
sw.metrics.storeMetaDLQ.WithLabelValues(s.sshard, "OK").Inc()
return nil
return sw.bucket.Upload(ctx, block.MetadataDLQObjectPath(meta), bytes.NewReader(metadataBytes))
}

type workerPool struct {
Expand Down
117 changes: 76 additions & 41 deletions pkg/experiment/ingester/segment_metrics.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
package ingester

import (
"time"

"github.com/prometheus/client_golang/prometheus"
)

type segmentMetrics struct {
segmentIngestBytes *prometheus.HistogramVec
segmentBlockSizeBytes *prometheus.HistogramVec
headSizeBytes *prometheus.HistogramVec
storeMetaDuration *prometheus.HistogramVec
segmentFlushWaitDuration *prometheus.HistogramVec
segmentFlushTimeouts *prometheus.CounterVec
storeMetaErrors *prometheus.CounterVec
storeMetaDLQ *prometheus.CounterVec
blockUploadDuration *prometheus.HistogramVec
flushSegmentDuration *prometheus.HistogramVec
flushHeadsDuration *prometheus.HistogramVec
flushServiceHeadDuration *prometheus.HistogramVec
flushServiceHeadError *prometheus.CounterVec
segmentIngestBytes *prometheus.HistogramVec
segmentSizeBytes *prometheus.HistogramVec
headSizeBytes *prometheus.HistogramVec
segmentFlushWaitDuration *prometheus.HistogramVec
segmentFlushTimeouts *prometheus.CounterVec
storeMetadataDuration *prometheus.HistogramVec
storeMetadataDLQ *prometheus.CounterVec
segmentUploadDuration *prometheus.HistogramVec
segmentHedgedUploadDuration *prometheus.HistogramVec
flushSegmentDuration *prometheus.HistogramVec
flushHeadsDuration *prometheus.HistogramVec
flushServiceHeadDuration *prometheus.HistogramVec
flushServiceHeadError *prometheus.CounterVec
}

var (
Expand All @@ -27,43 +29,69 @@ var (
)

func newSegmentMetrics(reg prometheus.Registerer) *segmentMetrics {

// TODO(kolesnikovae):
// - Use native histograms for all metrics
// - Remove unnecessary labels (e.g. shard)
// - Remove/merge/replace metrics
// - Rename to pyroscope_segment_writer_*
// - Add Help.
m := &segmentMetrics{
segmentIngestBytes: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pyroscope",
Subsystem: "segment_writer",
Name: "segment_ingest_bytes",
Buckets: prometheus.ExponentialBucketsRange(10*1024, 15*1024*1024, 20),
},
[]string{"shard", "tenant"}),
segmentBlockSizeBytes: prometheus.NewHistogramVec(
segmentSizeBytes: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pyroscope",
Name: "segment_block_size_bytes",
Subsystem: "segment_writer",
Name: "segment_size_bytes",
Buckets: prometheus.ExponentialBucketsRange(100*1024, 100*1024*1024, 20),
},
[]string{"shard"}),
storeMetaDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "pyroscope",
Name: "segment_store_meta_duration_seconds",
Buckets: networkTimingBuckets,
}, []string{"shard"}),
blockUploadDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "pyroscope",
Name: "segment_block_upload_duration_seconds",
Buckets: networkTimingBuckets,
}, []string{"shard"}),

storeMetaErrors: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pyroscope",
Name: "segment_store_meta_errors",
}, []string{"shard"}),
storeMetaDLQ: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pyroscope",
Name: "segment_store_meta_dlq",
}, []string{"shard", "status"}),
segmentUploadDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "pyroscope",
Subsystem: "segment_writer",
Name: "upload_duration_seconds",
Help: "Duration of segment upload requests.",
Buckets: prometheus.ExponentialBucketsRange(0.001, 10, 30),
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 32,
NativeHistogramMinResetDuration: time.Minute * 15,
}, []string{"status"}),

segmentHedgedUploadDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "pyroscope",
Subsystem: "segment_writer",
Name: "hedged_upload_duration_seconds",
Help: "Duration of hedged segment upload requests.",
Buckets: prometheus.ExponentialBucketsRange(0.001, 10, 30),
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 32,
NativeHistogramMinResetDuration: time.Minute * 15,
}, []string{"status"}),

storeMetadataDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "pyroscope",
Subsystem: "segment_writer",
Name: "store_metadata_duration_seconds",
Help: "Duration of store metadata requests.",
Buckets: prometheus.ExponentialBucketsRange(0.001, 10, 30),
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 32,
NativeHistogramMinResetDuration: time.Minute * 15,
}, []string{"status"}),

storeMetadataDLQ: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "pyroscope",
Subsystem: "segment_writer",
Name: "store_metadata_dlq",
Help: "Number of store metadata entries that were sent to the DLQ.",
}, []string{"status"}),

segmentFlushWaitDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "pyroscope",
Expand Down Expand Up @@ -106,13 +134,13 @@ func newSegmentMetrics(reg prometheus.Registerer) *segmentMetrics {

if reg != nil {
reg.MustRegister(m.segmentIngestBytes)
reg.MustRegister(m.segmentBlockSizeBytes)
reg.MustRegister(m.storeMetaDuration)
reg.MustRegister(m.segmentSizeBytes)
reg.MustRegister(m.storeMetadataDuration)
reg.MustRegister(m.segmentFlushWaitDuration)
reg.MustRegister(m.segmentFlushTimeouts)
reg.MustRegister(m.storeMetaErrors)
reg.MustRegister(m.storeMetaDLQ)
reg.MustRegister(m.blockUploadDuration)
reg.MustRegister(m.storeMetadataDLQ)
reg.MustRegister(m.segmentUploadDuration)
reg.MustRegister(m.segmentHedgedUploadDuration)
reg.MustRegister(m.flushHeadsDuration)
reg.MustRegister(m.flushServiceHeadDuration)
reg.MustRegister(m.flushServiceHeadError)
Expand All @@ -121,3 +149,10 @@ func newSegmentMetrics(reg prometheus.Registerer) *segmentMetrics {
}
return m
}

func statusLabelValue(err error) string {
if err == nil {
return "success"
}
return "error"
}
31 changes: 26 additions & 5 deletions pkg/experiment/ingester/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,47 @@ const (
RingName = "segment-writer"
RingKey = "segment-writer-ring"

minFlushConcurrency = 8
defaultSegmentDuration = 500 * time.Millisecond
minFlushConcurrency = 8
defaultSegmentDuration = 500 * time.Millisecond
defaultHedgedRequestMaxRate = 2 // 2 hedged requests per second
defaultHedgedRequestBurst = 10 // allow bursts of 10 hedged requests
)

type Config struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the segment writer."`
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
SegmentDuration time.Duration `yaml:"segment_duration,omitempty"`
FlushConcurrency uint `yaml:"flush_concurrency,omitempty"`
SegmentDuration time.Duration `yaml:"segment_duration,omitempty" category:"advanced"`
FlushConcurrency uint `yaml:"flush_concurrency,omitempty" category:"advanced"`
Upload UploadConfig `yaml:"upload,omitempty" category:"advanced"`
}

type UploadConfig struct {
MaxRetries int `yaml:"retry_max_retries,omitempty" category:"advanced"`
MinBackoff time.Duration `yaml:"retry_min_period,omitempty" category:"advanced"`
MaxBackoff time.Duration `yaml:"retry_max_period,omitempty" category:"advanced"`
HedgeUploadAfter time.Duration `yaml:"hedge_upload_after,omitempty" category:"advanced"`
HedgeRateMax float64 `yaml:"hedge_rate_max,omitempty" category:"advanced"`
HedgeRateBurst uint `yaml:"hedge_rate_burst,omitempty" category:"advanced"`
}

// RegisterFlags registers the flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
const prefix = "segment-writer"
cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix, f)
cfg.LifecyclerConfig.RegisterFlagsWithPrefix(prefix+".", f, util.Logger)
cfg.Upload.RegisterFlagsWithPrefix(prefix+".upload.", f)
f.DurationVar(&cfg.SegmentDuration, prefix+".segment-duration", defaultSegmentDuration, "Timeout when flushing segments to bucket.")
f.UintVar(&cfg.FlushConcurrency, prefix+".flush-concurrency", 0, "Number of concurrent flushes. Defaults to the number of CPUs, but not less than 8.")
}

func (cfg *UploadConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, prefix+".max-retries", 3, "Number of times to backoff and retry before failing.")
f.DurationVar(&cfg.MinBackoff, prefix+".retry-min-period", 50*time.Millisecond, "Minimum delay when backing off.")
f.DurationVar(&cfg.MaxBackoff, prefix+".retry-max-period", defaultSegmentDuration, "Maximum delay when backing off.")
f.DurationVar(&cfg.HedgeUploadAfter, prefix+".hedge-upload-after", defaultSegmentDuration, "Time after which to hedge the upload request.")
f.Float64Var(&cfg.HedgeRateMax, prefix+".hedge-rate-max", defaultHedgedRequestMaxRate, "Maximum number of hedged requests per second.")
f.UintVar(&cfg.HedgeRateBurst, prefix+".hedge-rate-burst", defaultHedgedRequestBurst, "Maximum number of hedged requests in a burst.")
}

func (cfg *Config) Validate() error {
// TODO(kolesnikovae): implement.
if err := cfg.LifecyclerConfig.Validate(); err != nil {
Expand Down
Loading
Loading