Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump loki.write code to latest client manager changes #4483

Merged
merged 16 commits into from
Jul 27, 2023
50 changes: 20 additions & 30 deletions component/common/loki/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (

var Reasons = []string{ReasonGeneric, ReasonRateLimited, ReasonStreamLimited, ReasonLineTooLong}

var UserAgent = fmt.Sprintf("promtail/%s", build.Version)
var UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
thepalbi marked this conversation as resolved.
Show resolved Hide resolved

type Metrics struct {
encodedBytes *prometheus.CounterVec
Expand All @@ -68,49 +68,40 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
var m Metrics

m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "encoded_bytes_total",
Help: "Number of bytes encoded and ready to send.",
Name: "loki_write_encoded_bytes_total",
Help: "Number of bytes encoded and ready to send.",
}, []string{HostLabel})
m.sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_bytes_total",
Help: "Number of bytes sent.",
Name: "loki_write_sent_bytes_total",
Help: "Number of bytes sent.",
}, []string{HostLabel})
m.droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
Name: "loki_write_dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel, TenantLabel, ReasonLabel})
m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_entries_total",
Help: "Number of log entries sent to the ingester.",
Name: "loki_write_sent_entries_total",
Help: "Number of log entries sent to the ingester.",
}, []string{HostLabel})
m.droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
Name: "loki_write_dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel, TenantLabel, ReasonLabel})
m.mutatedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "mutated_entries_total",
Help: "The total number of log entries that have been mutated.",
Name: "loki_write_mutated_entries_total",
Help: "The total number of log entries that have been mutated.",
}, []string{HostLabel, TenantLabel, ReasonLabel})
m.mutatedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "mutated_bytes_total",
Help: "The total number of bytes that have been mutated.",
Name: "loki_write_mutated_bytes_total",
Help: "The total number of bytes that have been mutated.",
}, []string{HostLabel, TenantLabel, ReasonLabel})
m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
Help: "Duration of send requests.",
Name: "loki_write_request_duration_seconds",
Help: "Duration of send requests.",
}, []string{"status_code", HostLabel})
m.batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "batch_retries_total",
Help: "Number of times batches has had to be retried.",
Name: "loki_write_batch_retries_total",
Help: "Number of times batches has had to be retried.",
}, []string{HostLabel, TenantLabel})

m.countersWithHost = []*prometheus.CounterVec{
Expand Down Expand Up @@ -192,7 +183,6 @@ func New(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeT
}

func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger) (*client, error) {

if cfg.URL.URL == nil {
return nil, errors.New("client needs target URL")
}
Expand Down Expand Up @@ -222,7 +212,7 @@ func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLin
return nil, err
}

c.client, err = config.NewClientFromConfig(cfg.Client, "promtail", config.WithHTTP2Disabled())
c.client, err = config.NewClientFromConfig(cfg.Client, "GrafanaAgent", config.WithHTTP2Disabled())
if err != nil {
return nil, err
}
Expand Down
508 changes: 254 additions & 254 deletions component/common/loki/client/client_test.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion component/common/loki/client/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (l *logger) run() {
fmt.Fprint(l.Writer, "\t")
fmt.Fprint(l.Writer, e.Line)
fmt.Fprint(l.Writer, "\n")
l.Flush()
_ = l.Flush()
}
}
func (l *logger) StopNow() { l.Stop() }
Expand Down
3 changes: 1 addition & 2 deletions component/common/loki/wal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package wal

import (
"fmt"
"github.com/grafana/agent/component/common/loki"

"github.com/prometheus/common/model"

"github.com/grafana/agent/component/common/loki"
"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/util"
walUtils "github.com/grafana/loki/pkg/util/wal"
Expand Down Expand Up @@ -33,7 +33,6 @@ func ReadWAL(dir string) ([]loki.Entry, error) {
// first read series
for _, series := range walRec.Series {
if _, ok := seenSeries[uint64(series.Ref)]; !ok {

seenSeries[uint64(series.Ref)] = util.MapToModelLabelSet(series.Labels.Map())
}
}
Expand Down
4 changes: 1 addition & 3 deletions component/common/loki/wal/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ func (w *Watcher) watch(segmentNum int) error {
w.metrics.segmentRead.WithLabelValues(w.id, "timer").Inc()
if debug {
level.Debug(w.logger).Log("msg", "Segment read triggered by backup timer", "segment", segmentNum)

}
case <-w.readNotify:
w.metrics.segmentRead.WithLabelValues(w.id, "notification").Inc()
Expand Down Expand Up @@ -283,10 +282,9 @@ func (w *Watcher) Stop() {
}

// firstAndLast finds the first and last segment number for a WAL directory.
func (w *Watcher) firstAndLast() (int, int, error) {
func (w *Watcher) firstAndLast() (int, int, error) { //nolint:unparam
refs, err := readSegmentNumbers(w.walDir)
if err != nil {

return -1, -1, err
}

Expand Down
2 changes: 1 addition & 1 deletion component/common/loki/wal/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package wal

import (
"fmt"
"github.com/grafana/agent/component/common/loki"
"os"
"testing"
"time"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/prometheus/prometheus/tsdb/record"
"github.com/stretchr/testify/require"

"github.com/grafana/agent/component/common/loki"
"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
Expand Down
3 changes: 1 addition & 2 deletions component/common/loki/wal/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package wal

import (
"fmt"
"github.com/grafana/agent/component/common/loki"
"os"
"path/filepath"
"sort"
Expand All @@ -17,6 +16,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"

"github.com/grafana/agent/component/common/loki"
"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
Expand Down Expand Up @@ -134,7 +134,6 @@ func (wrt *Writer) start(maxSegmentAge time.Duration) {
if err := wrt.cleanSegments(maxSegmentAge); err != nil {
level.Error(wrt.log).Log("msg", "Error cleaning old segments", "err", err)
}
break
case <-wrt.closeCleaner:
trigger.Stop()
return
Expand Down
4 changes: 2 additions & 2 deletions component/loki/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package write
import (
"context"
"fmt"
"github.com/grafana/agent/component/common/loki/limit"
"github.com/grafana/agent/component/common/loki/wal"
"sync"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/common/loki/client"
"github.com/grafana/agent/component/common/loki/limit"
"github.com/grafana/agent/component/common/loki/wal"
"github.com/grafana/agent/pkg/build"
)

Expand Down