Skip to content
This repository has been archived by the owner on Feb 15, 2023. It is now read-only.

[NETOBSERV-44] Consumming goflow2 as a library. #6

Merged
merged 7 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
FROM golang:alpine as builder
FROM registry.access.redhat.com/ubi8/go-toolset:1.15.14-10 as builder
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting / realizing that if we stick to the redhat-supported tool chain we have to use this older version of go. Generics will wait :)

ARG VERSION=""

RUN apk --update --no-cache add git build-base gcc
WORKDIR /opt/app-root/src

COPY go.mod ./
COPY go.sum ./

RUN go mod download
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will surely change in the future if we want to include vendors in sources as Mario did for the loki client. But we can do that as a follow-up.


COPY . .

COPY . /build
WORKDIR /build

RUN go build -ldflags "-X main.version=${VERSION}" -o kube-enricher cmd/kube-enricher/main.go

FROM netsampler/goflow2:latest
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4-210

COPY --from=builder /build/kube-enricher /
COPY --from=builder /opt/app-root/src/kube-enricher ./

ENTRYPOINT ["./goflow2"]
ENTRYPOINT ["./kube-enricher"]
59 changes: 43 additions & 16 deletions cmd/kube-enricher/main.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,42 @@
package main

import (
"context"
"flag"
"fmt"
"net/url"
"os"
"strconv"
"strings"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

"github.com/netobserv/goflow2-kube-enricher/pkg/format"
jsonFormat "github.com/netobserv/goflow2-kube-enricher/pkg/format/json"
nfFormat "github.com/netobserv/goflow2-kube-enricher/pkg/format/netflow"
pbFormat "github.com/netobserv/goflow2-kube-enricher/pkg/format/pb"
"github.com/netobserv/goflow2-kube-enricher/pkg/reader"

"github.com/sirupsen/logrus"
"k8s.io/client-go/rest"
)

const jsonFlagName = "json"
const pbFlagName = "pb"
const netflowScheme = "netflow"

var (
version = "unknown"
app = "kube-enricher"
fieldsMapping = flag.String("mapping", "SrcAddr=Src,DstAddr=Dst", "Mapping of fields containing IPs to prefixes for new fields")
kubeConfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
logLevel = flag.String("loglevel", "info", "Log level")
versionFlag = flag.Bool("v", false, "Print version")
log = logrus.WithField("module", app)
appVersion = fmt.Sprintf("%s %s", app, version)
sourceFormat = flag.String("sourceformat", "json", "format of the input string")
version = "unknown"
app = "kube-enricher"
listenAddress = flag.String("listen", "", "listen address, if empty, will listen to stdin")
fieldsMapping = flag.String("mapping", "SrcAddr=Src,DstAddr=Dst", "Mapping of fields containing IPs to prefixes for new fields")
kubeConfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
logLevel = flag.String("loglevel", "info", "Log level")
versionFlag = flag.Bool("v", false, "Print version")
log = logrus.WithField("module", app)
appVersion = fmt.Sprintf("%s %s", app, version)
stdinSourceFormat = flag.String("stdinsourceformat", "json", "format of the input string")
)

func main() {
Expand All @@ -53,13 +62,31 @@ func main() {
}

var in format.Format
switch *sourceFormat {
case "json":
in = jsonFormat.NewScanner(os.Stdin)
case "pb":
in = pbFormat.NewScanner(os.Stdin)
default:
log.Fatal("Unknown source format: ", sourceFormat)
if *listenAddress == "" {
switch *stdinSourceFormat {
case jsonFlagName:
in = jsonFormat.NewScanner(os.Stdin)
case pbFlagName:
in = pbFormat.NewScanner(os.Stdin)
default:
log.Fatal("Unknown source format: ", stdinSourceFormat)
}
} else {
listenAddrUrl, err := url.Parse(*listenAddress)
if err != nil {
log.Fatal(err)
}
if listenAddrUrl.Scheme == netflowScheme {
hostname := listenAddrUrl.Hostname()
port, err := strconv.ParseUint(listenAddrUrl.Port(), 10, 64)
if err != nil {
log.Fatal("Failed reading listening port: ", err)
}
ctx := context.Background()
in = nfFormat.StartDriver(ctx, hostname, int(port))
} else {
log.Fatal("Unknown listening protocol")
}
}

clientset, err := kubernetes.NewForConfig(loadKubeConfig())
Expand Down
2 changes: 1 addition & 1 deletion examples/goflow-kube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
- /bin/sh
- -c
# - /goflow2 -loglevel "info" | /kube-enricher -loglevel "info" -sourceformat "json"
- /goflow2 -transport.file.sep= -format=pb -format.protobuf.fixedlen=true -loglevel "info" | /kube-enricher -loglevel "info" -sourceformat "pb"
- /kube-enricher -loglevel "info" -stdinsourceformat "pb" -listen "netflow://:2055"
image: quay.io/netobserv/goflow2-kube:dev
imagePullPolicy: IfNotPresent
name: goflow
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ module github.com/netobserv/goflow2-kube-enricher
go 1.12

require (
github.com/golang/protobuf v1.4.3
github.com/mitchellh/mapstructure v1.1.2
github.com/netsampler/goflow2 v1.0.4
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
google.golang.org/protobuf v1.25.0
k8s.io/api v0.20.1
k8s.io/apimachinery v0.20.1
k8s.io/client-go v0.20.1
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
Expand Down Expand Up @@ -244,6 +246,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/libp2p/go-reuseport v0.0.2 h1:XSG94b1FJfGA01BUrT82imejHQyTxO4jEWqheyCXYvU=
github.com/libp2p/go-reuseport v0.0.2/go.mod h1:SPD+5RwGC7rcnzngoYC86GjPzjSywuQyMVAheVBD9nQ=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
Expand All @@ -254,6 +257,7 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
Expand Down Expand Up @@ -313,6 +317,7 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -323,23 +328,27 @@ github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.10.0 h1:/o0BDeWzLWXNZ+4q5gXltUvaMpJqckTa+jTNoB+z4cg=
github.com/prometheus/client_golang v1.10.0/go.mod h1:WJM3cc3yu7XKBKa/I8WeZm+V3eltZnBwfENSU7mdogU=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.18.0 h1:WCVKW7aL6LEe1uryfI9dnEc2ZqNB1Fn0ok930v0iL1Y=
github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
Expand Down
48 changes: 48 additions & 0 deletions pkg/format/netflow/netflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package netflow

import (
"context"
"log"

goflow2Format "github.com/netsampler/goflow2/format"
_ "github.com/netsampler/goflow2/format/protobuf"
"github.com/netsampler/goflow2/utils"
"github.com/sirupsen/logrus"
)

const channelSize = 5

type Driver struct {
in chan map[string]interface{}
}

// Start a new go routine to handle netflow connections
func StartDriver(ctx context.Context, hostname string, port int) *Driver {
gf := Driver{}
gf.in = make(chan map[string]interface{}, channelSize)

go func() {
transporter := NewWrapper(gf.in)

formatter, err := goflow2Format.FindFormat(ctx, "pb")
if err != nil {
log.Fatal(err)
}

sNF := &utils.StateNetFlow{
Format: formatter,
Transport: transporter,
Logger: logrus.StandardLogger(),
}
err = sNF.FlowRoutine(1, hostname, port, false)
log.Fatal(err)

}()

return &gf
}

func (gf *Driver) Next() (map[string]interface{}, error) {
msg := <-gf.in
return msg, nil
}
30 changes: 30 additions & 0 deletions pkg/format/netflow/wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package netflow

import (
pbFormat "github.com/netobserv/goflow2-kube-enricher/pkg/format/pb"
goflowpb "github.com/netsampler/goflow2/pb"
"google.golang.org/protobuf/proto"
)

// This is en implementation of the goflow2 transport interface
type TransportWrapper struct {
c chan map[string]interface{}
}

func NewWrapper(c chan map[string]interface{}) *TransportWrapper {
tw := TransportWrapper{c: c}
return &tw
}

func (w *TransportWrapper) Send(key, data []byte) error {
message := goflowpb.FlowMessage{}
err := proto.Unmarshal(data, &message)
if err != nil {
return err
}
renderedMsg, err := pbFormat.RenderMessage(&message)
if err == nil {
w.c <- renderedMsg
}
return err
}
44 changes: 44 additions & 0 deletions pkg/format/netflow/wrapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package netflow

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestWrapperSingleMessage(t *testing.T) {
assert := assert.New(t)
c := make(chan map[string]interface{}, 5)
wrapper := NewWrapper(c)
data := []byte{
0x08, 0x04, 0x10, 0xa6, 0x87, 0x91, 0x8b, 0x06, 0x20, 0x02, 0x32, 0x04,
0x0a, 0xf4, 0x02, 0x03, 0x3a, 0x04, 0x0a, 0xf4, 0x02, 0x02, 0x48, 0xc0,
0xa2, 0x01, 0x50, 0x90, 0x03, 0x5a, 0x04, 0x64, 0x40, 0x00, 0x03, 0x90,
0x01, 0x07, 0xa0, 0x01, 0x06, 0xa8, 0x01, 0xf5, 0x3f, 0xb0, 0x01, 0xb6,
0xda, 0x02, 0xd8, 0x01, 0x83, 0x84, 0xd0, 0xd7, 0x80, 0xcb, 0x02, 0xe0,
0x01, 0x8a, 0xf2, 0xac, 0xec, 0xae, 0xd9, 0x30, 0xf0, 0x01, 0x80, 0x10,
0x0a, 0x08, 0x04, 0x10, 0xe7, 0x87, 0x91, 0x8b, 0x06, 0x20,
}
err := wrapper.Send(nil, data)
assert.Nil(err)
message := <-c
assert.Contains(message, "SrcMac")
assert.Equal(message["SrcMac"], "0a:58:0a:f4:02:03")
assert.Contains(message, "SrcAddr")
assert.Equal(message["SrcAddr"], "10.244.2.3")
assert.Contains(message, "DstMac")
assert.Equal(message["DstMac"], "c2:ca:ed:8b:39:0a")
assert.Contains(message, "DstAddr")
assert.Equal(message["DstAddr"], "10.244.2.2")
}

func TestWrapperError(t *testing.T) {
assert := assert.New(t)
c := make(chan map[string]interface{}, 5)
wrapper := NewWrapper(c)
data := []byte{
0xff, 0xab, 0xcd, 0xef,
}
err := wrapper.Send(nil, data)
assert.Error(err)
}
11 changes: 6 additions & 5 deletions pkg/format/pb/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"io"
"net"

"github.com/golang/protobuf/proto"
ms "github.com/mitchellh/mapstructure"
goflowFormat "github.com/netsampler/goflow2/format/common"
goflowpb "github.com/netsampler/goflow2/pb"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
)

type PbFormat struct {
Expand All @@ -31,8 +32,8 @@ func (pbFormat *PbFormat) Next() (map[string]interface{}, error) {
if err != nil && err != io.EOF {
return nil, err
}
len, lenSize := proto.DecodeVarint(lenBuf)
if len == 0 {
len, lenSize := protowire.ConsumeVarint(lenBuf)
if lenSize < 0 {
return nil, errors.New("protobuf: Could not parse message length")
}

Expand All @@ -53,10 +54,10 @@ func (pbFormat *PbFormat) Next() (map[string]interface{}, error) {
if err != nil {
return nil, err
}
return renderMessage(message)
return RenderMessage(&message)
}

func renderMessage(message goflowpb.FlowMessage) (map[string]interface{}, error) {
func RenderMessage(message *goflowpb.FlowMessage) (map[string]interface{}, error) {
outputMap := make(map[string]interface{})
err := ms.Decode(message, &outputMap)
if err != nil {
Expand Down