Skip to content

Commit

Permalink
feat(filter): Move batch forced save timeout to config (#1014)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tetrergeru authored Apr 24, 2024
1 parent 62e8ac3 commit e885d45
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
15 changes: 9 additions & 6 deletions cmd/filter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type filterConfig struct {
DropMetricsTTL string `yaml:"drop_metrics_ttl"`
// Flags for compatibility with different graphite behaviours
Compatibility compatibility `yaml:"graphite_compatibility"`
// Time after which the batch of metrics is forced to be saved, default is 1s
BatchForcedSaveTimeout string `yaml:"batch_forced_save_timeout"`
}

func getDefault() config {
Expand All @@ -45,12 +47,13 @@ func getDefault() config {
LogPrettyFormat: false,
},
Filter: filterConfig{
Listen: ":2003",
RetentionConfig: "/etc/moira/storage-schemas.conf",
CacheCapacity: 10, //nolint
MaxParallelMatches: 0,
PatternsUpdatePeriod: "1s",
DropMetricsTTL: "1h",
Listen: ":2003",
RetentionConfig: "/etc/moira/storage-schemas.conf",
CacheCapacity: 10, //nolint
MaxParallelMatches: 0,
PatternsUpdatePeriod: "1s",
DropMetricsTTL: "1h",
BatchForcedSaveTimeout: "1s",
Compatibility: compatibility{
AllowRegexLooseStartMatch: false,
AllowRegexMatchEmpty: true,
Expand Down
3 changes: 2 additions & 1 deletion cmd/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func main() {

// Start metrics matcher
cacheCapacity := config.Filter.CacheCapacity
metricsMatcher := matchedmetrics.NewMetricsMatcher(filterMetrics, logger, database, cacheStorage, cacheCapacity)
batchForcedSaveTimeout := to.Duration(config.Filter.BatchForcedSaveTimeout)
metricsMatcher := matchedmetrics.NewMetricsMatcher(filterMetrics, logger, database, cacheStorage, cacheCapacity, batchForcedSaveTimeout)
metricsMatcher.Start(metricsChan)
defer metricsMatcher.Wait() // First stop listener
defer stopListener(listener) // Then waiting for metrics matcher handle all received events
Expand Down
33 changes: 18 additions & 15 deletions filter/matched_metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (

// MetricsMatcher make buffer of metrics and save it.
type MetricsMatcher struct {
logger moira.Logger
metrics *metrics.FilterMetrics
database moira.Database
cacheStorage *filter.Storage
cacheCapacity int
waitGroup *sync.WaitGroup
closeRequest chan struct{}
logger moira.Logger
metrics *metrics.FilterMetrics
database moira.Database
cacheStorage *filter.Storage
cacheCapacity int
waitGroup *sync.WaitGroup
closeRequest chan struct{}
batchForcedSaveTimeout time.Duration
}

// NewMetricsMatcher creates new MetricsMatcher.
Expand All @@ -28,15 +29,17 @@ func NewMetricsMatcher(
database moira.Database,
cacheStorage *filter.Storage,
cacheCapacity int,
batchForcedSaveTimeout time.Duration,
) *MetricsMatcher {
return &MetricsMatcher{
metrics: metrics,
logger: logger,
database: database,
cacheStorage: cacheStorage,
cacheCapacity: cacheCapacity,
waitGroup: &sync.WaitGroup{},
closeRequest: make(chan struct{}),
metrics: metrics,
logger: logger,
database: database,
cacheStorage: cacheStorage,
cacheCapacity: cacheCapacity,
waitGroup: &sync.WaitGroup{},
closeRequest: make(chan struct{}),
batchForcedSaveTimeout: batchForcedSaveTimeout,
}
}

Expand All @@ -63,7 +66,7 @@ func (matcher *MetricsMatcher) receiveBatch(metrics <-chan *moira.MatchedMetric)

go func() {
defer close(batchedMetrics)
batchTimer := time.NewTimer(time.Second)
batchTimer := time.NewTimer(matcher.batchForcedSaveTimeout)
defer batchTimer.Stop()
for {
batch := make(map[string]*moira.MatchedMetric, matcher.cacheCapacity)
Expand Down

0 comments on commit e885d45

Please sign in to comment.