Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filter,cli): add remove future metrics in cli and discarding metrics written into the future #1012

Merged
merged 11 commits into from
May 7, 2024
14 changes: 8 additions & 6 deletions cmd/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ type config struct {
}

type cleanupConfig struct {
Whitelist []string `yaml:"whitelist"`
Delete bool `yaml:"delete"`
AddAnonymousToWhitelist bool `json:"add_anonymous_to_whitelist"`
CleanupMetricsDuration string `yaml:"cleanup_metrics_duration"`
Whitelist []string `yaml:"whitelist"`
Delete bool `yaml:"delete"`
AddAnonymousToWhitelist bool `json:"add_anonymous_to_whitelist"`
CleanupMetricsDuration string `yaml:"cleanup_metrics_duration"`
CleanupFutureMetricsDuration string `yaml:"cleanup_future_metrics_duration"`
}

func getDefault() config {
Expand All @@ -30,8 +31,9 @@ func getDefault() config {
DialTimeout: "500ms",
},
Cleanup: cleanupConfig{
Whitelist: []string{},
CleanupMetricsDuration: "-168h",
Whitelist: []string{},
CleanupMetricsDuration: "-168h",
CleanupFutureMetricsDuration: "60m",
},
}
}
39 changes: 28 additions & 11 deletions cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ var plotting = flag.Bool("plotting", false, "enable images in all notifications"
var removeSubscriptions = flag.String("remove-subscriptions", "", "Remove given subscriptions separated by semicolons.")

var (
cleanupUsers = flag.Bool("cleanup-users", false, "Disable/delete contacts and subscriptions of missing users")
cleanupLastChecks = flag.Bool("cleanup-last-checks", false, "Delete abandoned triggers last checks.")
cleanupTags = flag.Bool("cleanup-tags", false, "Delete abandoned tags.")
cleanupMetrics = flag.Bool("cleanup-metrics", false, "Delete outdated metrics.")
cleanupRetentions = flag.Bool("cleanup-retentions", false, "Delete abandoned retentions.")
userDel = flag.String("user-del", "", "Delete all contacts and subscriptions for a user")
fromUser = flag.String("from-user", "", "Transfer subscriptions and contacts from user.")
toUser = flag.String("to-user", "", "Transfer subscriptions and contacts to user.")
cleanupUsers = flag.Bool("cleanup-users", false, "Disable/delete contacts and subscriptions of missing users")
cleanupLastChecks = flag.Bool("cleanup-last-checks", false, "Delete abandoned triggers last checks.")
cleanupTags = flag.Bool("cleanup-tags", false, "Delete abandoned tags.")
cleanupMetrics = flag.Bool("cleanup-metrics", false, "Delete outdated metrics.")
cleanupFutureMetrics = flag.Bool("cleanup-future-metrics", false, "Delete metrics from the database written to the future.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мне кажется, сейчас неконсистентно с хелпами по другим ключам

Suggested change
cleanupFutureMetrics = flag.Bool("cleanup-future-metrics", false, "Delete metrics from the database written to the future.")
cleanupFutureMetrics = flag.Bool("cleanup-future-metrics", false, "Delete metrics with future timestamps.")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Поправил

cleanupRetentions = flag.Bool("cleanup-retentions", false, "Delete abandoned retentions.")
userDel = flag.String("user-del", "", "Delete all contacts and subscriptions for a user")
fromUser = flag.String("from-user", "", "Transfer subscriptions and contacts from user.")
toUser = flag.String("to-user", "", "Transfer subscriptions and contacts to user.")
)

var (
Expand Down Expand Up @@ -230,12 +231,28 @@ func main() { //nolint
log := logger.String(moira.LogFieldNameContext, "cleanup-metrics")

log.Info().Msg("Cleanup of outdated metrics started")
err := handleCleanUpOutdatedMetrics(confCleanup, database)
if err != nil {

if err := handleCleanUpOutdatedMetrics(confCleanup, database); err != nil {
log.Error().
Error(err).
Msg("Failed to cleanup outdated metrics")
}

log.Info().Msg("Cleanup of outdated metrics finished")
}

if *cleanupFutureMetrics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

тоже спрошу, не хотим ли мы все это сразу в отчистку метрик добавить, чтобы не плодить ключики?

Copy link
Member Author

@almostinf almostinf Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мне кажется, это не нужно, тк операция удаления метрик в будущее довольно тяжелая, тк требует прохода по всем метрикам, в идеале 1 раз удалить метрики, уже записанные, а все новые, пишущиеcя в будущее, и так будут откидываться

log := logger.String(moira.LogFieldNameContext, "cleanup-future-metrics")

log.Info().Msg("Cleanup of future metrics started")

if err := handleCleanUpFutureMetrics(confCleanup, database); err != nil {
log.Error().
Error(err).
Msg("Failed to cleanup future metrics")
}

log.Info().Msg("Cleanup of future metrics finished")
}

if *cleanupLastChecks {
Expand Down Expand Up @@ -417,7 +434,7 @@ func openFile(filePath string, mode int) (*os.File, error) {
if filePath == "" {
return nil, fmt.Errorf("file is not specified")
}
file, err := os.OpenFile(filePath, mode, 0666) //nolint:gofumpt,gomnd
file, err := os.OpenFile(filePath, mode, 0o666) //nolint:gofumpt,gomnd
kissken marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("cannot open file: %w", err)
}
Expand Down
14 changes: 13 additions & 1 deletion cmd/cli/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,23 @@ func handleCleanUpOutdatedMetrics(config cleanupConfig, database moira.Database)
return err
}

err = database.CleanUpOutdatedMetrics(duration)
if err = database.CleanUpOutdatedMetrics(duration); err != nil {
return err
}

return nil
}

func handleCleanUpFutureMetrics(config cleanupConfig, database moira.Database) error {
duration, err := time.ParseDuration(config.CleanupFutureMetricsDuration)
if err != nil {
return err
}

if err = database.CleanUpFutureMetrics(duration); err != nil {
return err
}

return nil
}

Expand Down
40 changes: 36 additions & 4 deletions cmd/cli/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/moira-alert/moira/database/redis"
mocks "github.com/moira-alert/moira/mock/moira-alert"

"github.com/golang/mock/gomock"
Expand All @@ -16,9 +17,40 @@ func TestCleanUpOutdatedMetrics(t *testing.T) {
defer mockCtrl.Finish()
db := mocks.NewMockDatabase(mockCtrl)

Convey("Test cleanup", t, func() {
db.EXPECT().CleanUpOutdatedMetrics(-168 * time.Hour).Return(nil)
err := handleCleanUpOutdatedMetrics(conf.Cleanup, db)
So(err, ShouldBeNil)
Convey("Test cleanup outdated metrics", t, func() {
Convey("With valid duration", func() {
db.EXPECT().CleanUpOutdatedMetrics(-168 * time.Hour).Return(nil)
err := handleCleanUpOutdatedMetrics(conf.Cleanup, db)
So(err, ShouldBeNil)
})

Convey("With invalid duration", func() {
conf.Cleanup.CleanupMetricsDuration = "168h"
db.EXPECT().CleanUpOutdatedMetrics(168 * time.Hour).Return(redis.ErrCleanUpDurationGreaterThanZero)
err := handleCleanUpOutdatedMetrics(conf.Cleanup, db)
So(err, ShouldEqual, redis.ErrCleanUpDurationGreaterThanZero)
})
})
}

func TestCleanUpFutureMetrics(t *testing.T) {
conf := getDefault()
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
db := mocks.NewMockDatabase(mockCtrl)

Convey("Test cleanup future metrics", t, func() {
Convey("With valid duration", func() {
db.EXPECT().CleanUpFutureMetrics(60 * time.Minute).Return(nil)
err := handleCleanUpFutureMetrics(conf.Cleanup, db)
So(err, ShouldBeNil)
})

Convey("With invalid duration", func() {
conf.Cleanup.CleanupFutureMetricsDuration = "-60m"
db.EXPECT().CleanUpFutureMetrics(-60 * time.Minute).Return(redis.ErrCleanUpDurationLessThanZero)
err := handleCleanUpFutureMetrics(conf.Cleanup, db)
So(err, ShouldEqual, redis.ErrCleanUpDurationLessThanZero)
})
})
}
52 changes: 34 additions & 18 deletions database/redis/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
"gopkg.in/tomb.v2"
)

var (
ErrCleanUpDurationLessThanZero = errors.New("clean up duration value must be greater than zero, otherwise the current metrics may be deleted")
ErrCleanUpDurationGreaterThanZero = errors.New("clean up duration value must be less than zero, otherwise all metrics will be removed")
)

// GetPatterns gets updated patterns array.
func (connector *DbConnector) GetPatterns() ([]string, error) {
c := *connector.client
Expand Down Expand Up @@ -307,14 +312,15 @@ func (connector *DbConnector) RemoveMetricRetention(metric string) error {
}

// RemoveMetricValues remove metric timestamps values from 0 to given time.
func (connector *DbConnector) RemoveMetricValues(metric string, toTime int64) (int64, error) {
func (connector *DbConnector) RemoveMetricValues(metric string, from, to string) (int64, error) {
if !connector.needRemoveMetrics(metric) {
return 0, nil
}

c := *connector.client
result, err := c.ZRemRangeByScore(connector.context, metricDataKey(metric), "-inf", strconv.FormatInt(toTime, 10)).Result()
result, err := c.ZRemRangeByScore(connector.context, metricDataKey(metric), from, to).Result()
Tetrergeru marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return 0, fmt.Errorf("failed to remove metrics from -inf to %v, error: %w", toTime, err)
return 0, fmt.Errorf("failed to remove metrics from %s to %s, error: %w", from, to, err)
}

return result, nil
Expand Down Expand Up @@ -344,23 +350,25 @@ func (connector *DbConnector) needRemoveMetrics(metric string) bool {
return err == nil
}

func cleanUpOutdatedMetricsOnRedisNode(connector *DbConnector, client redis.UniversalClient, duration time.Duration) error {
func cleanUpMetricsOnRedisNode(connector *DbConnector, client redis.UniversalClient, from, to string) error {
metricsIterator := client.ScanType(connector.context, 0, metricDataKey("*"), 0, "zset").Iterator()
var count int64

for metricsIterator.Next(connector.context) {
key := metricsIterator.Val()
metric := strings.TrimPrefix(key, metricDataKey(""))
deletedCount, err := flushMetric(connector, metric, duration)

deletedCount, err := connector.RemoveMetricValues(metric, from, to)
if err != nil {
return err
}

count += deletedCount
}

connector.logger.Info().
Int64("count deleted metrics", count).
Msg("Cleaned up usefully metrics for trigger")
Msg("Cleaned up metrics")

return nil
}
Expand Down Expand Up @@ -388,11 +396,29 @@ func cleanUpAbandonedRetentionsOnRedisNode(connector *DbConnector, client redis.

func (connector *DbConnector) CleanUpOutdatedMetrics(duration time.Duration) error {
if duration >= 0 {
return errors.New("clean up duration value must be less than zero, otherwise all metrics will be removed")
return ErrCleanUpDurationGreaterThanZero
}

from := "-inf"
toTs := time.Now().UTC().Add(duration).Unix()
to := strconv.FormatInt(toTs, 10)

return connector.callFunc(func(connector *DbConnector, client redis.UniversalClient) error {
return cleanUpOutdatedMetricsOnRedisNode(connector, client, duration)
return cleanUpMetricsOnRedisNode(connector, client, from, to)
})
}

func (connector *DbConnector) CleanUpFutureMetrics(duration time.Duration) error {
if duration <= 0 {
return ErrCleanUpDurationLessThanZero
}

fromTs := connector.clock.Now().Add(duration).Unix()
from := strconv.FormatInt(fromTs, 10)
to := "+inf"

return connector.callFunc(func(connector *DbConnector, client redis.UniversalClient) error {
return cleanUpMetricsOnRedisNode(connector, client, from, to)
})
}

Expand Down Expand Up @@ -475,16 +501,6 @@ func (connector *DbConnector) RemoveAllMetrics() error {
return connector.callFunc(removeAllMetricsOnRedisNode)
}

func flushMetric(database moira.Database, metric string, duration time.Duration) (int64, error) {
lastTs := time.Now().UTC()
toTs := lastTs.Add(duration).Unix()
deletedCount, err := database.RemoveMetricValues(metric, toTs)
if err != nil {
return deletedCount, err
}
return deletedCount, nil
}

var patternsListKey = "moira-pattern-list"

var metricEventsChannels = []string{
Expand Down
Loading
Loading