Skip to content

Commit

Permalink
fix(filter): fix filter logs (#1010)
Browse files Browse the repository at this point in the history
  • Loading branch information
almostinf authored Apr 17, 2024
1 parent 56efc67 commit 62e8ac3
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 12 deletions.
5 changes: 5 additions & 0 deletions filter/cache_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func NewCacheStorage(logger moira.Logger, metrics *metrics.FilterMetrics, reader
if err := storage.buildRetentions(bufio.NewScanner(reader)); err != nil {
return nil, err
}

return storage, nil
}

Expand All @@ -63,6 +64,7 @@ func (storage *Storage) getRetention(m *moira.MatchedMetric) int {
if item, ok := storage.retentionsCache[m.Metric]; ok && item.timestamp+60 > m.Timestamp {
return item.value
}

for _, matcher := range storage.retentions {
if matcher.pattern.MatchString(m.Metric) {
storage.retentionsCache[m.Metric] = &retentionCacheItem{
Expand All @@ -72,6 +74,7 @@ func (storage *Storage) getRetention(m *moira.MatchedMetric) int {
return matcher.retention
}
}

return defaultRetention
}

Expand All @@ -98,6 +101,7 @@ func (storage *Storage) buildRetentions(retentionScanner *bufio.Scanner) error {
storage.logger.Error().
String("pattern", patternString).
Msg("Invalid pattern found")

continue
}

Expand All @@ -112,6 +116,7 @@ func (storage *Storage) buildRetentions(retentionScanner *bufio.Scanner) error {
retention: retention,
})
}

return retentionScanner.Err()
}

Expand Down
15 changes: 11 additions & 4 deletions filter/connection/listening.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,21 @@ type MetricsListener struct {
func NewListener(port string, logger moira.Logger, metrics *metrics.FilterMetrics) (*MetricsListener, error) {
address, err := net.ResolveTCPAddr("tcp", port)
if nil != err {
return nil, fmt.Errorf("failed to resolve tcp address [%s]: %s", port, err.Error())
return nil, fmt.Errorf("failed to resolve tcp address [%s]: %w", port, err)
}

newListener, err := net.ListenTCP("tcp", address)
if err != nil {
return nil, fmt.Errorf("failed to listen on [%s]: %s", port, err.Error())
return nil, fmt.Errorf("failed to listen on [%s]: %w", port, err)
}

listener := MetricsListener{
listener: newListener,
logger: logger,
handler: NewConnectionsHandler(logger),
metrics: metrics,
}

return &listener, nil
}

Expand All @@ -58,27 +61,31 @@ func (listener *MetricsListener) Listen() chan []byte {
}
default:
}

listener.listener.SetDeadline(time.Now().Add(1e9)) //nolint
conn, err := listener.listener.Accept()
if nil != err {
var opErr *net.OpError
if ok := errors.As(err, &opErr); ok && opErr.Timeout() {
continue
}
listener.logger.Info().
listener.logger.Error().
Error(err).
Msg("Failed to accept connection")
continue
}

listener.logger.Info().
String("remote_address", conn.RemoteAddr().String()).
Msg("Someone connected")
Msg("Successfully connected")

listener.handler.HandleConnection(conn, lineChan)
}
})

listener.tomb.Go(func() error { return listener.checkNewLinesChannelLen(lineChan) })
listener.logger.Info().Msg("Moira Filter Listener Started")

return lineChan
}

Expand Down
15 changes: 8 additions & 7 deletions filter/heartbeat/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,23 @@ func (worker *Worker) Start() {
case <-checkTicker.C:
newCount := worker.metrics.TotalMetricsReceived.Count()
if newCount != count {
worker.logger.Debug().
Int64("from", count).
Int64("to", newCount).
Msg("Heartbeat was updated")

if err := worker.database.UpdateMetricsHeartbeat(); err != nil {
worker.logger.Info().
worker.logger.Error().
Error(err).
Msg("Save state failed")
Msg("Update metrics heartbeat failed")
} else {
worker.logger.Debug().
Int64("from", count).
Int64("to", newCount).
Msg("Heartbeat was updated")

count = newCount
}
}
}
}
})

worker.logger.Info().Msg("Moira Filter Heartbeat started")
}

Expand Down
13 changes: 13 additions & 0 deletions filter/metrics_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ func ParseMetric(input []byte) (*ParsedMetric, error) {
if !inputScanner.HasNext() {
return nil, fmt.Errorf("too few space-separated items: '%s'", input)
}

metricBytes = inputScanner.Next()
if !inputScanner.HasNext() {
return nil, fmt.Errorf("too few space-separated items: '%s'", input)
}

valueBytes = inputScanner.Next()
if !inputScanner.HasNext() {
return nil, fmt.Errorf("too few space-separated items: '%s'", input)
}

timestampBytes = inputScanner.Next()
if inputScanner.HasNext() {
return nil, fmt.Errorf("too many space-separated items: '%s'", input)
Expand Down Expand Up @@ -66,9 +69,11 @@ func ParseMetric(input []byte) (*ParsedMetric, error) {
value,
int64(timestamp),
}

if timestamp == -1 {
parsedMetric.Timestamp = time.Now().Unix()
}

return parsedMetric, nil
}

Expand All @@ -78,6 +83,7 @@ func restoreMetricStringByNameAndLabels(name string, labels map[string]string) s
for key := range labels {
keys = append(keys, key)
}

sort.Strings(keys)

builder.WriteString(name)
Expand All @@ -104,10 +110,12 @@ func parseNameAndLabels(metricBytes []byte) (string, map[string]string, error) {
if !metricBytesScanner.HasNext() {
return "", nil, fmt.Errorf("too few colon-separated items: '%s'", metricBytes)
}

nameBytes := metricBytesScanner.Next()
if len(nameBytes) == 0 {
return "", nil, fmt.Errorf("empty metric name: '%s'", metricBytes)
}

name := moira.UnsafeBytesToString(nameBytes)
labels := make(map[string]string)
for metricBytesScanner.HasNext() {
Expand All @@ -118,24 +126,29 @@ func parseNameAndLabels(metricBytes []byte) (string, map[string]string, error) {
if !labelBytesScanner.HasNext() {
return "", nil, fmt.Errorf("too few equal-separated items: '%s'", labelBytes)
}

labelNameBytes = labelBytesScanner.Next()
if !labelBytesScanner.HasNext() {
return "", nil, fmt.Errorf("too few equal-separated items: '%s'", labelBytes)
}

labelValueBytes = labelBytesScanner.Next()
for labelBytesScanner.HasNext() {
var labelString strings.Builder
labelString.WriteString("=")
labelString.Write(labelBytesScanner.Next())
labelValueBytes = append(labelValueBytes, labelString.String()...)
}

if len(labelNameBytes) == 0 {
return "", nil, fmt.Errorf("empty label name: '%s'", labelBytes)
}

labelName := moira.UnsafeBytesToString(labelNameBytes)
labelValue := moira.UnsafeBytesToString(labelValueBytes)
labels[labelName] = labelValue
}

return name, labels, nil
}

Expand Down
3 changes: 3 additions & 0 deletions filter/patterns_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (storage *PatternStorage) ProcessIncomingMetric(lineBytes []byte, maxTTL ti
storage.logger.Info().
Error(err).
Msg("Cannot parse input")

return nil
}

Expand All @@ -82,6 +83,7 @@ func (storage *PatternStorage) ProcessIncomingMetric(lineBytes []byte, maxTTL ti
String(moira.LogFieldNameMetricName, parsedMetric.Name).
String(moira.LogFieldNameMetricTimestamp, fmt.Sprint(parsedMetric.Timestamp)).
Msg("Metric is too old")

return nil
}

Expand All @@ -92,6 +94,7 @@ func (storage *PatternStorage) ProcessIncomingMetric(lineBytes []byte, maxTTL ti
if count%10 == 0 {
storage.metrics.MatchingTimer.UpdateSince(matchingStart)
}

if len(matchedPatterns) > 0 {
storage.metrics.MatchingMetricsReceived.Inc()
return &moira.MatchedMetric{
Expand Down
9 changes: 9 additions & 0 deletions filter/prefix_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (source *PrefixTree) AddWithPayload(pattern string, payloadKey string, payl
Msg("Pattern is ignored because it contains an empty part")
return
}

for i, part := range parts {
found := false
for _, child := range currentNode.Children {
Expand All @@ -50,6 +51,7 @@ func (source *PrefixTree) AddWithPayload(pattern string, payloadKey string, payl
break
}
}

if !found {
newNode := &PatternNode{Part: part}

Expand All @@ -76,9 +78,11 @@ func (source *PrefixTree) AddWithPayload(pattern string, payloadKey string, payl
newNode.InnerParts = []string{part}
}
}

currentNode.Children = append(currentNode.Children, newNode)
currentNode = newNode
}

if i == len(parts)-1 {
currentNode.Terminal = true
if payloadValue != nil {
Expand Down Expand Up @@ -121,6 +125,7 @@ func (source *PrefixTree) MatchWithValue(metric string) map[string]MatchingHandl
if node.Payload == nil {
matched[node.Prefix] = nil
}

for pattern, matchingHandler := range node.Payload {
matched[pattern] = matchingHandler
}
Expand Down Expand Up @@ -158,6 +163,7 @@ func (source *PrefixTree) findNodes(metric string) ([]*PatternNode, int) {
if found == 0 {
return nil, 0
}

return currentLevel, found
}

Expand Down Expand Up @@ -186,6 +192,7 @@ func findPart(part string, currentLevel []*PatternNode) ([]*PatternNode, int) {
}
}
}

return nextLevel, len(nextLevel)
}

Expand All @@ -194,6 +201,7 @@ func split2(s, sep string) (string, string) {
if len(splitResult) < 2 { //nolint
return splitResult[0], ""
}

return splitResult[0], splitResult[1]
}

Expand All @@ -203,5 +211,6 @@ func hasEmptyParts(parts []string) bool {
return true
}
}

return false
}
5 changes: 5 additions & 0 deletions filter/series_by_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func transformWildcardToRegexpInSeriesByTag(input string) (string, bool) {
for i := range slc {
slc[i] = strings.TrimSpace(slc[i])
}

regularExpression = strings.Join(slc, "|")
result = result[:matchedWildcardIndexes[0]] + regularExpression + result[matchedWildcardIndexes[1]:]
isTransformed = true
Expand All @@ -70,6 +71,7 @@ func transformWildcardToRegexpInSeriesByTag(input string) (string, bool) {
if !isTransformed {
return input, false
}

return "^" + result + "$", true
}

Expand Down Expand Up @@ -152,6 +154,7 @@ func CreateMatchingHandlerForPattern(tagSpecs []TagSpec, compatibility *Compatib
return false
}
}

return true
}

Expand Down Expand Up @@ -205,9 +208,11 @@ func createMatchingHandlerForOneTag(spec TagSpec, compatibility *Compatibility)
if spec.Name == "name" {
return matchingHandlerCondition(metric)
}

if value, found := labels[spec.Name]; found {
return matchingHandlerCondition(value)
}

return allowMatchEmpty && matchEmpty
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion filter/series_by_tag_pattern_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewSeriesByTagPatternIndex(
for pattern, tagSpecs := range tagSpecsByPattern {
nameTagValue, matchingHandler, err := CreateMatchingHandlerForPattern(tagSpecs, &compatibility)
if err != nil {
logger.Info().
logger.Error().
Error(err).
String("pattern", pattern).
Msg("Failed to create MatchingHandler for pattern")
Expand Down

0 comments on commit 62e8ac3

Please sign in to comment.