Skip to content
This repository has been archived by the owner on Feb 15, 2023. It is now read-only.

Commit

Permalink
[NETOBSERV-44] Consumming goflow2 as a library.
Browse files Browse the repository at this point in the history
Using goflow2 as a library to directly receive netflow event with the kube-enricher
  • Loading branch information
OlivierCazade committed Oct 8, 2021
1 parent 4921807 commit bc26bad
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 19 deletions.
53 changes: 37 additions & 16 deletions cmd/kube-enricher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package main
import (
"flag"
"fmt"
"net/url"
"os"
"strconv"
"strings"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

"github.com/netobserv/goflow2-kube-enricher/pkg/format"
jsonFormat "github.com/netobserv/goflow2-kube-enricher/pkg/format/json"
nfFormat "github.com/netobserv/goflow2-kube-enricher/pkg/format/netflow"
pbFormat "github.com/netobserv/goflow2-kube-enricher/pkg/format/pb"
"github.com/netobserv/goflow2-kube-enricher/pkg/reader"

Expand All @@ -19,15 +22,16 @@ import (
)

var (
version = "unknown"
app = "kube-enricher"
fieldsMapping = flag.String("mapping", "SrcAddr=Src,DstAddr=Dst", "Mapping of fields containing IPs to prefixes for new fields")
kubeConfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
logLevel = flag.String("loglevel", "info", "Log level")
versionFlag = flag.Bool("v", false, "Print version")
log = logrus.WithField("module", app)
appVersion = fmt.Sprintf("%s %s", app, version)
sourceFormat = flag.String("sourceformat", "json", "format of the input string")
version = "unknown"
app = "kube-enricher"
listenAddress = flag.String("listen", "", "listen address, if empty, will listen to stdin")
fieldsMapping = flag.String("mapping", "SrcAddr=Src,DstAddr=Dst", "Mapping of fields containing IPs to prefixes for new fields")
kubeConfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
logLevel = flag.String("loglevel", "info", "Log level")
versionFlag = flag.Bool("v", false, "Print version")
log = logrus.WithField("module", app)
appVersion = fmt.Sprintf("%s %s", app, version)
stdinSourceFormat = flag.String("stdinsourceformat", "json", "format of the input string")
)

func main() {
Expand All @@ -53,13 +57,30 @@ func main() {
}

var in format.Format
switch *sourceFormat {
case "json":
in = jsonFormat.NewScanner(os.Stdin)
case "pb":
in = pbFormat.NewScanner(os.Stdin)
default:
log.Fatal("Unknown source format: ", sourceFormat)
if *listenAddresses == "" {
switch *stdinSourceFormat {
case "json":
in = jsonFormat.NewScanner(os.Stdin)
case "pb":
in = pbFormat.NewScanner(os.Stdin)
default:
log.Fatal("Unknown source format: ", stdinSourceFormat)
}
} else {
listenAddrUrl, err := url.Parse(*listenAddress)
if err != nil {
log.Fatal(err)
}
if listenAddrUrl.Scheme == "netflow" {
hostname := listenAddrUrl.Hostname()
port, err := strconv.ParseUint(listenAddrUrl.Port(), 10, 64)
if err != nil {
log.Fatal("Failed reading listening port: ", err)
}
in = nfFormat.NewDriver(hostname, int(port))
} else {
log.Fatal("Unknown listening protocol")
}
}

clientset, err := kubernetes.NewForConfig(loadKubeConfig())
Expand Down
2 changes: 1 addition & 1 deletion examples/goflow-kube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
- /bin/sh
- -c
# - /goflow2 -loglevel "info" | /kube-enricher -loglevel "info" -sourceformat "json"
- /goflow2 -transport.file.sep= -format=pb -format.protobuf.fixedlen=true -loglevel "info" | /kube-enricher -loglevel "info" -sourceformat "pb"
- /kube-enricher -loglevel "info" -stdinsourceformat "pb" -listen "netflow://:2055"
image: quay.io/netobserv/goflow2-kube:dev
imagePullPolicy: IfNotPresent
name: goflow
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
Expand Down Expand Up @@ -244,6 +246,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/libp2p/go-reuseport v0.0.2 h1:XSG94b1FJfGA01BUrT82imejHQyTxO4jEWqheyCXYvU=
github.com/libp2p/go-reuseport v0.0.2/go.mod h1:SPD+5RwGC7rcnzngoYC86GjPzjSywuQyMVAheVBD9nQ=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
Expand All @@ -254,6 +257,7 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
Expand Down Expand Up @@ -313,6 +317,7 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -323,23 +328,27 @@ github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.10.0 h1:/o0BDeWzLWXNZ+4q5gXltUvaMpJqckTa+jTNoB+z4cg=
github.com/prometheus/client_golang v1.10.0/go.mod h1:WJM3cc3yu7XKBKa/I8WeZm+V3eltZnBwfENSU7mdogU=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.18.0 h1:WCVKW7aL6LEe1uryfI9dnEc2ZqNB1Fn0ok930v0iL1Y=
github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
Expand Down
47 changes: 47 additions & 0 deletions pkg/format/netflow/netflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package netflow

import (
"context"
"log"

goflow2Format "github.com/netsampler/goflow2/format"
_ "github.com/netsampler/goflow2/format/protobuf"
"github.com/netsampler/goflow2/utils"
"github.com/sirupsen/logrus"
)

type GoflowDriver struct {
in chan map[string]interface{}
}

func NewDriver(hostname string, port int) *GoflowDriver {
gf := GoflowDriver{}
gf.in = make(chan map[string]interface{})

go func(in chan map[string]interface{}) {
ctx := context.Background()

transporter := NewWrapper(in)

formatter, err := goflow2Format.FindFormat(ctx, "pb")
if err != nil {
log.Fatal(err)
}

sNF := &utils.StateNetFlow{
Format: formatter,
Transport: transporter,
Logger: logrus.StandardLogger(),
}
err = sNF.FlowRoutine(1, hostname, port, false)
log.Fatal(err)

}(gf.in)

return &gf
}

func (gf *GoflowDriver) Next() (map[string]interface{}, error) {
msg := <-gf.in
return msg, nil
}
30 changes: 30 additions & 0 deletions pkg/format/netflow/wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package netflow

import (
"github.com/golang/protobuf/proto"
pbFormat "github.com/netobserv/goflow2-kube-enricher/pkg/format/pb"
goflowpb "github.com/netsampler/goflow2/pb"
)

// This is en implementation of the goflow2 transport interface
type TransportWrapper struct {
c chan map[string]interface{}
}

func NewWrapper(c chan map[string]interface{}) *TransportWrapper {
tw := TransportWrapper{c: c}
return &tw
}

func (w *TransportWrapper) Send(key, data []byte) error {
message := goflowpb.FlowMessage{}
err := proto.Unmarshal(data, &message)
if err != nil {
return err
}
renderedMsg, err := pbFormat.RenderMessage(message)
if err == nil {
w.c <- renderedMsg
}
return err
}
4 changes: 2 additions & 2 deletions pkg/format/pb/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func (pbFormat *PbFormat) Next() (map[string]interface{}, error) {
if err != nil {
return nil, err
}
return renderMessage(message)
return RenderMessage(message)
}

func renderMessage(message goflowpb.FlowMessage) (map[string]interface{}, error) {
func RenderMessage(message goflowpb.FlowMessage) (map[string]interface{}, error) {
outputMap := make(map[string]interface{})
err := ms.Decode(message, &outputMap)
if err != nil {
Expand Down

0 comments on commit bc26bad

Please sign in to comment.