Skip to content

Commit

Permalink
Add promtail converters support for GCPLog (#4607)
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr authored Jul 27, 2023
1 parent 1281f4f commit 7999af5
Show file tree
Hide file tree
Showing 14 changed files with 344 additions and 97 deletions.
12 changes: 5 additions & 7 deletions component/loki/source/gcplog/gcplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
flow_relabel "github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/component/loki/source/gcplog/gcptypes"
gt "github.com/grafana/agent/component/loki/source/gcplog/internal/gcplogtarget"
"github.com/grafana/agent/pkg/util"
)
Expand All @@ -31,13 +32,10 @@ func init() {
// Arguments holds values which are used to configure the loki.source.gcplog
// component.
type Arguments struct {
// TODO(@tpaschalis) Having these types defined in an internal package
// means that an external caller cannot build this component's Arguments
// by hand for now.
PullTarget *gt.PullConfig `river:"pull,block,optional"`
PushTarget *gt.PushConfig `river:"push,block,optional"`
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`
PullTarget *gcptypes.PullConfig `river:"pull,block,optional"`
PushTarget *gcptypes.PushConfig `river:"push,block,optional"`
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"`
}

// SetToDefault implements river.Defaulter.
Expand Down
16 changes: 8 additions & 8 deletions component/loki/source/gcplog/gcplog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ import (
"testing"
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
fnet "github.com/grafana/agent/component/common/net"
flow_relabel "github.com/grafana/agent/component/common/relabel"
gt "github.com/grafana/agent/component/loki/source/gcplog/internal/gcplogtarget"

"github.com/grafana/agent/pkg/util"
"github.com/grafana/regexp"
"github.com/phayes/freeport"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
fnet "github.com/grafana/agent/component/common/net"
flow_relabel "github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/component/loki/source/gcplog/gcptypes"
"github.com/grafana/agent/pkg/util"
)

// TODO (@tpaschalis) We can't test this easily as there's no way to inject
Expand All @@ -38,7 +38,7 @@ func TestPush(t *testing.T) {

port, err := freeport.GetFreePort()
require.NoError(t, err)
args.PushTarget = &gt.PushConfig{
args.PushTarget = &gcptypes.PushConfig{
Server: &fnet.ServerConfig{
HTTP: &fnet.HTTPConfig{
ListenAddress: "localhost",
Expand Down
41 changes: 41 additions & 0 deletions component/loki/source/gcplog/gcptypes/gcptypes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package gcptypes

import (
"fmt"
"time"

fnet "github.com/grafana/agent/component/common/net"
)

// PullConfig configures a GCPLog target with the 'pull' strategy.
type PullConfig struct {
ProjectID string `river:"project_id,attr"`
Subscription string `river:"subscription,attr"`
Labels map[string]string `river:"labels,attr,optional"`
UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`
UseFullLine bool `river:"use_full_line,attr,optional"`
}

// PushConfig configures a GCPLog target with the 'push' strategy.
type PushConfig struct {
Server *fnet.ServerConfig `river:",squash"`
PushTimeout time.Duration `river:"push_timeout,attr,optional"`
Labels map[string]string `river:"labels,attr,optional"`
UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`
UseFullLine bool `river:"use_full_line,attr,optional"`
}

// SetToDefault implements river.Defaulter.
func (p *PushConfig) SetToDefault() {
*p = PushConfig{
Server: fnet.DefaultServerConfig(),
}
}

// Validate implements river.Validator.
func (p *PushConfig) Validate() error {
if p.PushTimeout < 0 {
return fmt.Errorf("push_timeout must be greater than zero")
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"google.golang.org/api/option"

"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/loki/source/gcplog/gcptypes"
)

// PullTarget represents a target that scrapes logs from a GCP project id and
Expand All @@ -28,7 +29,7 @@ type PullTarget struct {
metrics *Metrics
logger log.Logger
handler loki.EntryHandler
config *PullConfig
config *gcptypes.PullConfig
relabelConfig []*relabel.Config
jobName string

Expand Down Expand Up @@ -57,7 +58,7 @@ type pubsubSubscription interface {
}

// NewPullTarget returns the new instance of PullTarget.
func NewPullTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, jobName string, config *PullConfig, relabel []*relabel.Config, clientOptions ...option.ClientOption) (*PullTarget, error) {
func NewPullTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, jobName string, config *gcptypes.PullConfig, relabel []*relabel.Config, clientOptions ...option.ClientOption) (*PullTarget, error) {
ctx, cancel := context.WithCancel(context.Background())
ps, err := pubsub.NewClient(ctx, config.ProjectID, clientOptions...)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"testing"
"time"

"github.com/grafana/agent/component/common/loki/client/fake"

"cloud.google.com/go/pubsub"
"github.com/go-kit/log"
"github.com/grafana/dskit/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"gotest.tools/assert"

"github.com/grafana/agent/component/common/loki/client/fake"
"github.com/grafana/agent/component/loki/source/gcplog/gcptypes"
)

func TestPullTarget_RunStop(t *testing.T) {
Expand Down Expand Up @@ -203,7 +204,7 @@ const (
`
)

var testConfig = &PullConfig{
var testConfig = &gcptypes.PullConfig{
ProjectID: project,
Subscription: subscription,
Labels: map[string]string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/grafana/agent/component/common/loki"
fnet "github.com/grafana/agent/component/common/net"
"github.com/grafana/agent/component/loki/source/gcplog/gcptypes"
)

// PushTarget defines a server for receiving messages from a GCP PubSub push
Expand All @@ -29,15 +30,15 @@ type PushTarget struct {
logger log.Logger
jobName string
metrics *Metrics
config *PushConfig
config *gcptypes.PushConfig
entries chan<- loki.Entry
handler loki.EntryHandler
relabelConfigs []*relabel.Config
server *fnet.TargetServer
}

// NewPushTarget constructs a PushTarget.
func NewPushTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, jobName string, config *PushConfig, relabel []*relabel.Config, reg prometheus.Registerer) (*PushTarget, error) {
func NewPushTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, jobName string, config *gcptypes.PushConfig, relabel []*relabel.Config, reg prometheus.Registerer) (*PushTarget, error) {
wrappedLogger := log.With(logger, "component", "gcp_push")
srv, err := fnet.NewTargetServer(wrappedLogger, jobName+"_push_target", reg, config.Server)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ import (
"testing"
"time"

"github.com/grafana/agent/component/common/loki/client/fake"

"github.com/grafana/agent/component/common/loki"
fnet "github.com/grafana/agent/component/common/net"

"github.com/go-kit/log"
"github.com/phayes/freeport"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/require"

"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/common/loki/client/fake"
fnet "github.com/grafana/agent/component/common/net"
"github.com/grafana/agent/component/loki/source/gcplog/gcptypes"
)

const localhost = "127.0.0.1"
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestPushTarget(t *testing.T) {
for k, v := range tc.args.Labels {
lbls[string(k)] = string(v)
}
config := &PushConfig{
config := &gcptypes.PushConfig{
Labels: lbls,
UseIncomingTimestamp: false,
Server: &fnet.ServerConfig{
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestPushTarget_UseIncomingTimestamp(t *testing.T) {
port, err := freeport.GetFreePort()
require.NoError(t, err)
require.NoError(t, err, "error generating server config or finding open port")
config := &PushConfig{
config := &gcptypes.PushConfig{
Labels: nil,
UseIncomingTimestamp: true,
Server: &fnet.ServerConfig{
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestPushTarget_UseTenantIDHeaderIfPresent(t *testing.T) {

port, err := freeport.GetFreePort()
require.NoError(t, err)
config := &PushConfig{
config := &gcptypes.PushConfig{
Labels: nil,
UseIncomingTimestamp: true,
Server: &fnet.ServerConfig{
Expand Down Expand Up @@ -342,7 +342,7 @@ func TestPushTarget_ErroneousPayloadsAreRejected(t *testing.T) {

port, err := freeport.GetFreePort()
require.NoError(t, err)
config := &PushConfig{
config := &gcptypes.PushConfig{
Labels: nil,
Server: &fnet.ServerConfig{
HTTP: &fnet.HTTPConfig{
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestPushTarget_UsePushTimeout(t *testing.T) {

port, err := freeport.GetFreePort()
require.NoError(t, err)
config := &PushConfig{
config := &gcptypes.PushConfig{
Labels: nil,
UseIncomingTimestamp: true,
PushTimeout: time.Second,
Expand Down
40 changes: 0 additions & 40 deletions component/loki/source/gcplog/internal/gcplogtarget/types.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,7 @@
package gcplogtarget

import (
"fmt"
"time"

fnet "github.com/grafana/agent/component/common/net"
)

// Target is a common interface implemented by both GCPLog targets.
type Target interface {
Details() map[string]string
Stop() error
}

// PullConfig configures a GCPLog target with the 'pull' strategy.
type PullConfig struct {
ProjectID string `river:"project_id,attr"`
Subscription string `river:"subscription,attr"`
Labels map[string]string `river:"labels,attr,optional"`
UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`
UseFullLine bool `river:"use_full_line,attr,optional"`
}

// PushConfig configures a GCPLog target with the 'push' strategy.
type PushConfig struct {
Server *fnet.ServerConfig `river:",squash"`
PushTimeout time.Duration `river:"push_timeout,attr,optional"`
Labels map[string]string `river:"labels,attr,optional"`
UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`
UseFullLine bool `river:"use_full_line,attr,optional"`
}

// SetToDefault implements river.Defaulter.
func (p *PushConfig) SetToDefault() {
*p = PushConfig{
Server: fnet.DefaultServerConfig(),
}
}

// Validate implements river.Validator.
func (p *PushConfig) Validate() error {
if p.PushTimeout < 0 {
return fmt.Errorf("push_timeout must be greater than zero")
}
return nil
}
29 changes: 28 additions & 1 deletion converter/internal/common/weaveworks_server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package common

import (
"github.com/grafana/agent/converter/diag"
"github.com/weaveworks/common/server"

fnet "github.com/grafana/agent/component/common/net"
"github.com/grafana/agent/converter/diag"
)

func DefaultWeaveWorksServerCfg() server.Config {
Expand All @@ -13,6 +15,31 @@ func DefaultWeaveWorksServerCfg() server.Config {
return cfg
}

func WeaveWorksServerToFlowServer(config server.Config) *fnet.ServerConfig {
return &fnet.ServerConfig{
HTTP: &fnet.HTTPConfig{
ListenAddress: config.HTTPListenAddress,
ListenPort: config.HTTPListenPort,
ConnLimit: config.HTTPConnLimit,
ServerReadTimeout: config.HTTPServerReadTimeout,
ServerWriteTimeout: config.HTTPServerWriteTimeout,
ServerIdleTimeout: config.HTTPServerIdleTimeout,
},
GRPC: &fnet.GRPCConfig{
ListenAddress: config.GRPCListenAddress,
ListenPort: config.GRPCListenPort,
ConnLimit: config.GRPCConnLimit,
MaxConnectionAge: config.GRPCServerMaxConnectionAge,
MaxConnectionAgeGrace: config.GRPCServerMaxConnectionAgeGrace,
MaxConnectionIdle: config.GRPCServerMaxConnectionIdle,
ServerMaxRecvMsg: config.GPRCServerMaxRecvMsgSize,
ServerMaxSendMsg: config.GRPCServerMaxSendMsgSize,
ServerMaxConcurrentStreams: config.GPRCServerMaxConcurrentStreams,
},
GracefulShutdownTimeout: config.ServerGracefulShutdownTimeout,
}
}

func ValidateWeaveWorksServerCfg(cfg server.Config) diag.Diagnostics {
var (
diags diag.Diagnostics
Expand Down
Loading

0 comments on commit 7999af5

Please sign in to comment.