Skip to content

Commit

Permalink
Merge pull request #6 from ygsa/fllow_arstercz_influx
Browse files Browse the repository at this point in the history
Switch MongoDB libraries. Update test data.
  • Loading branch information
arstercz authored Apr 28, 2024
2 parents 2075a74 + 4b6377f commit a54b64f
Show file tree
Hide file tree
Showing 11 changed files with 618 additions and 291 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.1
github.com/aws/aws-sdk-go-v2/service/ec2 v1.1.0
github.com/aws/smithy-go v1.0.0
github.com/beevik/ntp v0.3.0 // indirect
github.com/beevik/ntp v0.3.0
github.com/benbjohnson/clock v1.0.3
github.com/bitly/go-hostpool v0.1.0 // indirect
github.com/bmatcuk/doublestar/v3 v3.0.0
Expand Down Expand Up @@ -132,13 +132,14 @@ require (
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4 // indirect
go.mongodb.org/mongo-driver v1.5.3
go.starlark.net v0.0.0-20210312235212-74c10e2c17dc
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa
golang.org/x/text v0.3.4
golang.org/x/text v0.3.5
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4
google.golang.org/api v0.20.0
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
Expand All @@ -147,7 +148,6 @@ require (
gopkg.in/fatih/pool.v2 v2.0.0 // indirect
gopkg.in/gorethink/gorethink.v3 v3.0.5
gopkg.in/ldap.v3 v3.1.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
gopkg.in/olivere/elastic.v5 v5.0.70
gopkg.in/yaml.v2 v2.3.0
gotest.tools v2.2.0+incompatible
Expand Down
80 changes: 79 additions & 1 deletion go.sum

Large diffs are not rendered by default.

179 changes: 84 additions & 95 deletions plugins/inputs/mongodb/mongodb.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package mongodb

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/url"
"strings"
"sync"
Expand All @@ -13,20 +13,24 @@ import (
"github.com/influxdata/telegraf"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"gopkg.in/mgo.v2"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
)

type MongoDB struct {
Servers []string
Ssl Ssl
mongos map[string]*Server
GatherClusterStatus bool
GatherPerdbStats bool
GatherColStats bool
GatherTopStat bool
ColStatsDbs []string
tlsint.ClientConfig

Log telegraf.Logger
Log telegraf.Logger `toml:"-"`

clients []*Server
}

type Ssl struct {
Expand All @@ -53,6 +57,10 @@ var sampleConfig = `
## When true, collect per collection stats
# gather_col_stats = false
## When true, collect usage statistics for each collection
## (insert, update, queries, remove, getmore, commands etc...).
# gather_top_stat = false
## List of db where collections stats are collected
## If empty, all db are concerned
# col_stats_dbs = ["local"]
Expand All @@ -73,126 +81,107 @@ func (*MongoDB) Description() string {
return "Read metrics from one or many MongoDB servers"
}

var localhost = &url.URL{Host: "mongodb://127.0.0.1:27017"}
func (m *MongoDB) Init() error {
var tlsConfig *tls.Config
if m.Ssl.Enabled {
// Deprecated TLS config
tlsConfig = &tls.Config{
InsecureSkipVerify: m.ClientConfig.InsecureSkipVerify,
}
if len(m.Ssl.CaCerts) == 0 {
return fmt.Errorf("you must explicitly set insecure_skip_verify to skip cerificate validation")
}

roots := x509.NewCertPool()
for _, caCert := range m.Ssl.CaCerts {
if ok := roots.AppendCertsFromPEM([]byte(caCert)); !ok {
return fmt.Errorf("failed to parse root certificate")
}
}
tlsConfig.RootCAs = roots
} else {
var err error
tlsConfig, err = m.ClientConfig.TLSConfig()
if err != nil {
return err
}
}

// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
if len(m.Servers) == 0 {
m.gatherServer(m.getMongoServer(localhost), acc)
return nil
m.Servers = []string{"mongodb://127.0.0.1:27017"}
}

var wg sync.WaitGroup
for i, serv := range m.Servers {
if !strings.HasPrefix(serv, "mongodb://") {
for _, connURL := range m.Servers {
if !strings.HasPrefix(connURL, "mongodb://") && !strings.HasPrefix(connURL, "mongodb+srv://") {
// Preserve backwards compatibility for hostnames without a
// scheme, broken in go 1.8. Remove in Telegraf 2.0
serv = "mongodb://" + serv
m.Log.Warnf("Using %q as connection URL; please update your configuration to use an URL", serv)
m.Servers[i] = serv
connURL = "mongodb://" + connURL
m.Log.Warnf("Using %q as connection URL; please update your configuration to use an URL", connURL)
}

u, err := url.Parse(serv)
u, err := url.Parse(connURL)
if err != nil {
m.Log.Errorf("Unable to parse address %q: %s", serv, err.Error())
continue
return fmt.Errorf("unable to parse connection URL: %q", err)
}
if u.Host == "" {
m.Log.Errorf("Unable to parse address %q", serv)
continue
}

wg.Add(1)
go func(srv *Server) {
defer wg.Done()
err := m.gatherServer(srv, acc)
if err != nil {
m.Log.Errorf("Error in plugin: %v", err)
}
}(m.getMongoServer(u))
}

wg.Wait()
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() //nolint:revive

func (m *MongoDB) getMongoServer(url *url.URL) *Server {
if _, ok := m.mongos[url.Host]; !ok {
m.mongos[url.Host] = &Server{
Log: m.Log,
URL: url,
opts := options.Client().ApplyURI(connURL)
if tlsConfig != nil {
opts.TLSConfig = tlsConfig
}
}
return m.mongos[url.Host]
}

func (m *MongoDB) gatherServer(server *Server, acc telegraf.Accumulator) error {
if server.Session == nil {
var dialAddrs []string
if server.URL.User != nil {
dialAddrs = []string{server.URL.String()}
} else {
dialAddrs = []string{server.URL.Host}
if opts.ReadPreference == nil {
opts.ReadPreference = readpref.Nearest()
}
dialInfo, err := mgo.ParseURL(dialAddrs[0])

client, err := mongo.Connect(ctx, opts)
if err != nil {
return fmt.Errorf("unable to parse URL %q: %s", dialAddrs[0], err.Error())
}
dialInfo.Direct = true
dialInfo.Timeout = 5 * time.Second

var tlsConfig *tls.Config

if m.Ssl.Enabled {
// Deprecated TLS config
tlsConfig = &tls.Config{}
if len(m.Ssl.CaCerts) > 0 {
roots := x509.NewCertPool()
for _, caCert := range m.Ssl.CaCerts {
ok := roots.AppendCertsFromPEM([]byte(caCert))
if !ok {
return fmt.Errorf("failed to parse root certificate")
}
}
tlsConfig.RootCAs = roots
} else {
tlsConfig.InsecureSkipVerify = true
}
} else {
tlsConfig, err = m.ClientConfig.TLSConfig()
if err != nil {
return err
}
return fmt.Errorf("unable to connect to MongoDB: %q", err)
}

// If configured to use TLS, add a dial function
if tlsConfig != nil {
dialInfo.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) {
conn, err := tls.Dial("tcp", addr.String(), tlsConfig)
if err != nil {
fmt.Printf("error in Dial, %s\n", err.Error())
}
return conn, err
}
err = client.Ping(ctx, opts.ReadPreference)
if err != nil {
return fmt.Errorf("unable to connect to MongoDB: %s", err)
}

sess, err := mgo.DialWithInfo(dialInfo)
if err != nil {
return fmt.Errorf("unable to connect to MongoDB(%s): %s", strings.Join(dialInfo.Addrs, ","), err.Error())
server := &Server{
client: client,
hostname: u.Host,
Log: m.Log,
}
server.Session = sess
m.clients = append(m.clients, server)
}
return server.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.ColStatsDbs)

return nil
}

// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
for _, client := range m.clients {
wg.Add(1)
go func(srv *Server) {
defer wg.Done()
err := srv.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.GatherTopStat, m.ColStatsDbs)
if err != nil {
m.Log.Errorf("failed to gather data: %q", err)
}
}(client)
}

wg.Wait()
return nil
}

func init() {
inputs.Add("mongodb", func() telegraf.Input {
return &MongoDB{
mongos: make(map[string]*Server),
GatherClusterStatus: true,
GatherPerdbStats: false,
GatherColStats: false,
GatherTopStat: false,
ColStatsDbs: []string{"local"},
}
})
Expand Down
Loading

0 comments on commit a54b64f

Please sign in to comment.