diff --git a/filter/cache_storage.go b/filter/cache_storage.go index 4a418ccb4..077c5839d 100644 --- a/filter/cache_storage.go +++ b/filter/cache_storage.go @@ -44,6 +44,7 @@ func NewCacheStorage(logger moira.Logger, metrics *metrics.FilterMetrics, reader if err := storage.buildRetentions(bufio.NewScanner(reader)); err != nil { return nil, err } + return storage, nil } @@ -63,6 +64,7 @@ func (storage *Storage) getRetention(m *moira.MatchedMetric) int { if item, ok := storage.retentionsCache[m.Metric]; ok && item.timestamp+60 > m.Timestamp { return item.value } + for _, matcher := range storage.retentions { if matcher.pattern.MatchString(m.Metric) { storage.retentionsCache[m.Metric] = &retentionCacheItem{ @@ -72,6 +74,7 @@ func (storage *Storage) getRetention(m *moira.MatchedMetric) int { return matcher.retention } } + return defaultRetention } @@ -98,6 +101,7 @@ func (storage *Storage) buildRetentions(retentionScanner *bufio.Scanner) error { storage.logger.Error(). String("pattern", patternString). Msg("Invalid pattern found") + continue } @@ -112,6 +116,7 @@ func (storage *Storage) buildRetentions(retentionScanner *bufio.Scanner) error { retention: retention, }) } + return retentionScanner.Err() } diff --git a/filter/connection/listening.go b/filter/connection/listening.go index 8f707edae..ae246a6c3 100644 --- a/filter/connection/listening.go +++ b/filter/connection/listening.go @@ -25,18 +25,21 @@ type MetricsListener struct { func NewListener(port string, logger moira.Logger, metrics *metrics.FilterMetrics) (*MetricsListener, error) { address, err := net.ResolveTCPAddr("tcp", port) if nil != err { - return nil, fmt.Errorf("failed to resolve tcp address [%s]: %s", port, err.Error()) + return nil, fmt.Errorf("failed to resolve tcp address [%s]: %w", port, err) } + newListener, err := net.ListenTCP("tcp", address) if err != nil { - return nil, fmt.Errorf("failed to listen on [%s]: %s", port, err.Error()) + return nil, fmt.Errorf("failed to listen on [%s]: %w", port, err) } + listener := MetricsListener{ listener: newListener, logger: logger, handler: NewConnectionsHandler(logger), metrics: metrics, } + return &listener, nil } @@ -58,6 +61,7 @@ func (listener *MetricsListener) Listen() chan []byte { } default: } + listener.listener.SetDeadline(time.Now().Add(1e9)) //nolint conn, err := listener.listener.Accept() if nil != err { @@ -65,20 +69,23 @@ func (listener *MetricsListener) Listen() chan []byte { if ok := errors.As(err, &opErr); ok && opErr.Timeout() { continue } - listener.logger.Info(). + listener.logger.Error(). Error(err). Msg("Failed to accept connection") continue } + listener.logger.Info(). String("remote_address", conn.RemoteAddr().String()). - Msg("Someone connected") + Msg("Successfully connected") listener.handler.HandleConnection(conn, lineChan) } }) + listener.tomb.Go(func() error { return listener.checkNewLinesChannelLen(lineChan) }) listener.logger.Info().Msg("Moira Filter Listener Started") + return lineChan } diff --git a/filter/heartbeat/worker.go b/filter/heartbeat/worker.go index bcf0cd340..b1332c19f 100644 --- a/filter/heartbeat/worker.go +++ b/filter/heartbeat/worker.go @@ -39,22 +39,23 @@ func (worker *Worker) Start() { case <-checkTicker.C: newCount := worker.metrics.TotalMetricsReceived.Count() if newCount != count { - worker.logger.Debug(). - Int64("from", count). - Int64("to", newCount). - Msg("Heartbeat was updated") - if err := worker.database.UpdateMetricsHeartbeat(); err != nil { - worker.logger.Info(). + worker.logger.Error(). Error(err). - Msg("Save state failed") + Msg("Update metrics heartbeat failed") } else { + worker.logger.Debug(). + Int64("from", count). + Int64("to", newCount). + Msg("Heartbeat was updated") + count = newCount } } } } }) + worker.logger.Info().Msg("Moira Filter Heartbeat started") } diff --git a/filter/metrics_parser.go b/filter/metrics_parser.go index 12b028de3..2a0fa5922 100644 --- a/filter/metrics_parser.go +++ b/filter/metrics_parser.go @@ -31,14 +31,17 @@ func ParseMetric(input []byte) (*ParsedMetric, error) { if !inputScanner.HasNext() { return nil, fmt.Errorf("too few space-separated items: '%s'", input) } + metricBytes = inputScanner.Next() if !inputScanner.HasNext() { return nil, fmt.Errorf("too few space-separated items: '%s'", input) } + valueBytes = inputScanner.Next() if !inputScanner.HasNext() { return nil, fmt.Errorf("too few space-separated items: '%s'", input) } + timestampBytes = inputScanner.Next() if inputScanner.HasNext() { return nil, fmt.Errorf("too many space-separated items: '%s'", input) @@ -66,9 +69,11 @@ func ParseMetric(input []byte) (*ParsedMetric, error) { value, int64(timestamp), } + if timestamp == -1 { parsedMetric.Timestamp = time.Now().Unix() } + return parsedMetric, nil } @@ -78,6 +83,7 @@ func restoreMetricStringByNameAndLabels(name string, labels map[string]string) s for key := range labels { keys = append(keys, key) } + sort.Strings(keys) builder.WriteString(name) @@ -104,10 +110,12 @@ func parseNameAndLabels(metricBytes []byte) (string, map[string]string, error) { if !metricBytesScanner.HasNext() { return "", nil, fmt.Errorf("too few colon-separated items: '%s'", metricBytes) } + nameBytes := metricBytesScanner.Next() if len(nameBytes) == 0 { return "", nil, fmt.Errorf("empty metric name: '%s'", metricBytes) } + name := moira.UnsafeBytesToString(nameBytes) labels := make(map[string]string) for metricBytesScanner.HasNext() { @@ -118,10 +126,12 @@ func parseNameAndLabels(metricBytes []byte) (string, map[string]string, error) { if !labelBytesScanner.HasNext() { return "", nil, fmt.Errorf("too few equal-separated items: '%s'", labelBytes) } + labelNameBytes = labelBytesScanner.Next() if !labelBytesScanner.HasNext() { return "", nil, fmt.Errorf("too few equal-separated items: '%s'", labelBytes) } + labelValueBytes = labelBytesScanner.Next() for labelBytesScanner.HasNext() { var labelString strings.Builder @@ -129,13 +139,16 @@ func parseNameAndLabels(metricBytes []byte) (string, map[string]string, error) { labelString.Write(labelBytesScanner.Next()) labelValueBytes = append(labelValueBytes, labelString.String()...) } + if len(labelNameBytes) == 0 { return "", nil, fmt.Errorf("empty label name: '%s'", labelBytes) } + labelName := moira.UnsafeBytesToString(labelNameBytes) labelValue := moira.UnsafeBytesToString(labelValueBytes) labels[labelName] = labelValue } + return name, labels, nil } diff --git a/filter/patterns_storage.go b/filter/patterns_storage.go index a9e819750..10b8437af 100644 --- a/filter/patterns_storage.go +++ b/filter/patterns_storage.go @@ -74,6 +74,7 @@ func (storage *PatternStorage) ProcessIncomingMetric(lineBytes []byte, maxTTL ti storage.logger.Info(). Error(err). Msg("Cannot parse input") + return nil } @@ -82,6 +83,7 @@ func (storage *PatternStorage) ProcessIncomingMetric(lineBytes []byte, maxTTL ti String(moira.LogFieldNameMetricName, parsedMetric.Name). String(moira.LogFieldNameMetricTimestamp, fmt.Sprint(parsedMetric.Timestamp)). Msg("Metric is too old") + return nil } @@ -92,6 +94,7 @@ func (storage *PatternStorage) ProcessIncomingMetric(lineBytes []byte, maxTTL ti if count%10 == 0 { storage.metrics.MatchingTimer.UpdateSince(matchingStart) } + if len(matchedPatterns) > 0 { storage.metrics.MatchingMetricsReceived.Inc() return &moira.MatchedMetric{ diff --git a/filter/prefix_tree.go b/filter/prefix_tree.go index dc5245291..7a4ee7eab 100644 --- a/filter/prefix_tree.go +++ b/filter/prefix_tree.go @@ -41,6 +41,7 @@ func (source *PrefixTree) AddWithPayload(pattern string, payloadKey string, payl Msg("Pattern is ignored because it contains an empty part") return } + for i, part := range parts { found := false for _, child := range currentNode.Children { @@ -50,6 +51,7 @@ func (source *PrefixTree) AddWithPayload(pattern string, payloadKey string, payl break } } + if !found { newNode := &PatternNode{Part: part} @@ -76,9 +78,11 @@ func (source *PrefixTree) AddWithPayload(pattern string, payloadKey string, payl newNode.InnerParts = []string{part} } } + currentNode.Children = append(currentNode.Children, newNode) currentNode = newNode } + if i == len(parts)-1 { currentNode.Terminal = true if payloadValue != nil { @@ -121,6 +125,7 @@ func (source *PrefixTree) MatchWithValue(metric string) map[string]MatchingHandl if node.Payload == nil { matched[node.Prefix] = nil } + for pattern, matchingHandler := range node.Payload { matched[pattern] = matchingHandler } @@ -158,6 +163,7 @@ func (source *PrefixTree) findNodes(metric string) ([]*PatternNode, int) { if found == 0 { return nil, 0 } + return currentLevel, found } @@ -186,6 +192,7 @@ func findPart(part string, currentLevel []*PatternNode) ([]*PatternNode, int) { } } } + return nextLevel, len(nextLevel) } @@ -194,6 +201,7 @@ func split2(s, sep string) (string, string) { if len(splitResult) < 2 { //nolint return splitResult[0], "" } + return splitResult[0], splitResult[1] } @@ -203,5 +211,6 @@ func hasEmptyParts(parts []string) bool { return true } } + return false } diff --git a/filter/series_by_tag.go b/filter/series_by_tag.go index 169d6b907..03bfa1395 100644 --- a/filter/series_by_tag.go +++ b/filter/series_by_tag.go @@ -62,6 +62,7 @@ func transformWildcardToRegexpInSeriesByTag(input string) (string, bool) { for i := range slc { slc[i] = strings.TrimSpace(slc[i]) } + regularExpression = strings.Join(slc, "|") result = result[:matchedWildcardIndexes[0]] + regularExpression + result[matchedWildcardIndexes[1]:] isTransformed = true @@ -70,6 +71,7 @@ func transformWildcardToRegexpInSeriesByTag(input string) (string, bool) { if !isTransformed { return input, false } + return "^" + result + "$", true } @@ -152,6 +154,7 @@ func CreateMatchingHandlerForPattern(tagSpecs []TagSpec, compatibility *Compatib return false } } + return true } @@ -205,9 +208,11 @@ func createMatchingHandlerForOneTag(spec TagSpec, compatibility *Compatibility) if spec.Name == "name" { return matchingHandlerCondition(metric) } + if value, found := labels[spec.Name]; found { return matchingHandlerCondition(value) } + return allowMatchEmpty && matchEmpty }, nil } diff --git a/filter/series_by_tag_pattern_index.go b/filter/series_by_tag_pattern_index.go index 19186a442..7505cc5ef 100644 --- a/filter/series_by_tag_pattern_index.go +++ b/filter/series_by_tag_pattern_index.go @@ -26,7 +26,7 @@ func NewSeriesByTagPatternIndex( for pattern, tagSpecs := range tagSpecsByPattern { nameTagValue, matchingHandler, err := CreateMatchingHandlerForPattern(tagSpecs, &compatibility) if err != nil { - logger.Info(). + logger.Error(). Error(err). String("pattern", pattern). Msg("Failed to create MatchingHandler for pattern")