From 4163a9049b2f6382ba657378cf25e0a1b1ce625b Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Thu, 7 Oct 2021 16:34:05 +0200 Subject: [PATCH 1/7] [NETOBSERV-44] Consumming goflow2 as a library. Using goflow2 as a library to directly receive netflow event with the kube-enricher --- cmd/kube-enricher/main.go | 53 ++++++++++++++++++++++++----------- examples/goflow-kube.yaml | 2 +- go.sum | 9 ++++++ pkg/format/netflow/netflow.go | 47 +++++++++++++++++++++++++++++++ pkg/format/netflow/wrapper.go | 30 ++++++++++++++++++++ pkg/format/pb/pb.go | 4 +-- 6 files changed, 126 insertions(+), 19 deletions(-) create mode 100644 pkg/format/netflow/netflow.go create mode 100644 pkg/format/netflow/wrapper.go diff --git a/cmd/kube-enricher/main.go b/cmd/kube-enricher/main.go index 34550826..b7c43e60 100644 --- a/cmd/kube-enricher/main.go +++ b/cmd/kube-enricher/main.go @@ -3,7 +3,9 @@ package main import ( "flag" "fmt" + "net/url" "os" + "strconv" "strings" "k8s.io/client-go/kubernetes" @@ -11,6 +13,7 @@ import ( "github.com/netobserv/goflow2-kube-enricher/pkg/format" jsonFormat "github.com/netobserv/goflow2-kube-enricher/pkg/format/json" + nfFormat "github.com/netobserv/goflow2-kube-enricher/pkg/format/netflow" pbFormat "github.com/netobserv/goflow2-kube-enricher/pkg/format/pb" "github.com/netobserv/goflow2-kube-enricher/pkg/reader" @@ -19,15 +22,16 @@ import ( ) var ( - version = "unknown" - app = "kube-enricher" - fieldsMapping = flag.String("mapping", "SrcAddr=Src,DstAddr=Dst", "Mapping of fields containing IPs to prefixes for new fields") - kubeConfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") - logLevel = flag.String("loglevel", "info", "Log level") - versionFlag = flag.Bool("v", false, "Print version") - log = logrus.WithField("module", app) - appVersion = fmt.Sprintf("%s %s", app, version) - sourceFormat = flag.String("sourceformat", "json", "format of the input string") + version = "unknown" + app = "kube-enricher" + listenAddress = flag.String("listen", "", "listen address, if empty, will listen to stdin") + fieldsMapping = flag.String("mapping", "SrcAddr=Src,DstAddr=Dst", "Mapping of fields containing IPs to prefixes for new fields") + kubeConfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") + logLevel = flag.String("loglevel", "info", "Log level") + versionFlag = flag.Bool("v", false, "Print version") + log = logrus.WithField("module", app) + appVersion = fmt.Sprintf("%s %s", app, version) + stdinSourceFormat = flag.String("stdinsourceformat", "json", "format of the input string") ) func main() { @@ -53,13 +57,30 @@ func main() { } var in format.Format - switch *sourceFormat { - case "json": - in = jsonFormat.NewScanner(os.Stdin) - case "pb": - in = pbFormat.NewScanner(os.Stdin) - default: - log.Fatal("Unknown source format: ", sourceFormat) + if *listenAddress == "" { + switch *stdinSourceFormat { + case "json": + in = jsonFormat.NewScanner(os.Stdin) + case "pb": + in = pbFormat.NewScanner(os.Stdin) + default: + log.Fatal("Unknown source format: ", stdinSourceFormat) + } + } else { + listenAddrUrl, err := url.Parse(*listenAddress) + if err != nil { + log.Fatal(err) + } + if listenAddrUrl.Scheme == "netflow" { + hostname := listenAddrUrl.Hostname() + port, err := strconv.ParseUint(listenAddrUrl.Port(), 10, 64) + if err != nil { + log.Fatal("Failed reading listening port: ", err) + } + in = nfFormat.NewDriver(hostname, int(port)) + } else { + log.Fatal("Unknown listening protocol") + } } clientset, err := kubernetes.NewForConfig(loadKubeConfig()) diff --git a/examples/goflow-kube.yaml b/examples/goflow-kube.yaml index 42db3a35..ab538cc7 100644 --- a/examples/goflow-kube.yaml +++ b/examples/goflow-kube.yaml @@ -20,7 +20,7 @@ spec: - /bin/sh - -c # - /goflow2 -loglevel "info" | /kube-enricher -loglevel "info" -sourceformat "json" - - /goflow2 -transport.file.sep= -format=pb -format.protobuf.fixedlen=true -loglevel "info" | /kube-enricher -loglevel "info" -sourceformat "pb" + - /kube-enricher -loglevel "info" -stdinsourceformat "pb" -listen "netflow://:2055" image: quay.io/netobserv/goflow2-kube:dev imagePullPolicy: IfNotPresent name: goflow diff --git a/go.sum b/go.sum index 83e5d7be..30a59374 100644 --- a/go.sum +++ b/go.sum @@ -57,11 +57,13 @@ github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -244,6 +246,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/libp2p/go-reuseport v0.0.2 h1:XSG94b1FJfGA01BUrT82imejHQyTxO4jEWqheyCXYvU= github.com/libp2p/go-reuseport v0.0.2/go.mod h1:SPD+5RwGC7rcnzngoYC86GjPzjSywuQyMVAheVBD9nQ= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= @@ -254,6 +257,7 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= @@ -313,6 +317,7 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -323,23 +328,27 @@ github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= +github.com/prometheus/client_golang v1.10.0 h1:/o0BDeWzLWXNZ+4q5gXltUvaMpJqckTa+jTNoB+z4cg= github.com/prometheus/client_golang v1.10.0/go.mod h1:WJM3cc3yu7XKBKa/I8WeZm+V3eltZnBwfENSU7mdogU= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= +github.com/prometheus/common v0.18.0 h1:WCVKW7aL6LEe1uryfI9dnEc2ZqNB1Fn0ok930v0iL1Y= github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= +github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= diff --git a/pkg/format/netflow/netflow.go b/pkg/format/netflow/netflow.go new file mode 100644 index 00000000..500bc956 --- /dev/null +++ b/pkg/format/netflow/netflow.go @@ -0,0 +1,47 @@ +package netflow + +import ( + "context" + "log" + + goflow2Format "github.com/netsampler/goflow2/format" + _ "github.com/netsampler/goflow2/format/protobuf" + "github.com/netsampler/goflow2/utils" + "github.com/sirupsen/logrus" +) + +type GoflowDriver struct { + in chan map[string]interface{} +} + +func NewDriver(hostname string, port int) *GoflowDriver { + gf := GoflowDriver{} + gf.in = make(chan map[string]interface{}) + + go func(in chan map[string]interface{}) { + ctx := context.Background() + + transporter := NewWrapper(in) + + formatter, err := goflow2Format.FindFormat(ctx, "pb") + if err != nil { + log.Fatal(err) + } + + sNF := &utils.StateNetFlow{ + Format: formatter, + Transport: transporter, + Logger: logrus.StandardLogger(), + } + err = sNF.FlowRoutine(1, hostname, port, false) + log.Fatal(err) + + }(gf.in) + + return &gf +} + +func (gf *GoflowDriver) Next() (map[string]interface{}, error) { + msg := <-gf.in + return msg, nil +} diff --git a/pkg/format/netflow/wrapper.go b/pkg/format/netflow/wrapper.go new file mode 100644 index 00000000..5390f270 --- /dev/null +++ b/pkg/format/netflow/wrapper.go @@ -0,0 +1,30 @@ +package netflow + +import ( + "github.com/golang/protobuf/proto" + pbFormat "github.com/netobserv/goflow2-kube-enricher/pkg/format/pb" + goflowpb "github.com/netsampler/goflow2/pb" +) + +// This is en implementation of the goflow2 transport interface +type TransportWrapper struct { + c chan map[string]interface{} +} + +func NewWrapper(c chan map[string]interface{}) *TransportWrapper { + tw := TransportWrapper{c: c} + return &tw +} + +func (w *TransportWrapper) Send(key, data []byte) error { + message := goflowpb.FlowMessage{} + err := proto.Unmarshal(data, &message) + if err != nil { + return err + } + renderedMsg, err := pbFormat.RenderMessage(message) + if err == nil { + w.c <- renderedMsg + } + return err +} diff --git a/pkg/format/pb/pb.go b/pkg/format/pb/pb.go index b8a5fe2f..2bb8f87d 100644 --- a/pkg/format/pb/pb.go +++ b/pkg/format/pb/pb.go @@ -53,10 +53,10 @@ func (pbFormat *PbFormat) Next() (map[string]interface{}, error) { if err != nil { return nil, err } - return renderMessage(message) + return RenderMessage(message) } -func renderMessage(message goflowpb.FlowMessage) (map[string]interface{}, error) { +func RenderMessage(message goflowpb.FlowMessage) (map[string]interface{}, error) { outputMap := make(map[string]interface{}) err := ms.Decode(message, &outputMap) if err != nil { From 137fbcff1641154c414e9d56e249f2779604367b Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Mon, 11 Oct 2021 16:56:35 +0200 Subject: [PATCH 2/7] Added transport wrapper unit tests --- pkg/format/netflow/wrapper_test.go | 44 ++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 pkg/format/netflow/wrapper_test.go diff --git a/pkg/format/netflow/wrapper_test.go b/pkg/format/netflow/wrapper_test.go new file mode 100644 index 00000000..a3e9724a --- /dev/null +++ b/pkg/format/netflow/wrapper_test.go @@ -0,0 +1,44 @@ +package netflow + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWrapperSingleMessage(t *testing.T) { + assert := assert.New(t) + c := make(chan map[string]interface{}, 5) + wrapper := NewWrapper(c) + data := []byte{ + 0x08, 0x04, 0x10, 0xa6, 0x87, 0x91, 0x8b, 0x06, 0x20, 0x02, 0x32, 0x04, + 0x0a, 0xf4, 0x02, 0x03, 0x3a, 0x04, 0x0a, 0xf4, 0x02, 0x02, 0x48, 0xc0, + 0xa2, 0x01, 0x50, 0x90, 0x03, 0x5a, 0x04, 0x64, 0x40, 0x00, 0x03, 0x90, + 0x01, 0x07, 0xa0, 0x01, 0x06, 0xa8, 0x01, 0xf5, 0x3f, 0xb0, 0x01, 0xb6, + 0xda, 0x02, 0xd8, 0x01, 0x83, 0x84, 0xd0, 0xd7, 0x80, 0xcb, 0x02, 0xe0, + 0x01, 0x8a, 0xf2, 0xac, 0xec, 0xae, 0xd9, 0x30, 0xf0, 0x01, 0x80, 0x10, + 0x0a, 0x08, 0x04, 0x10, 0xe7, 0x87, 0x91, 0x8b, 0x06, 0x20, + } + err := wrapper.Send(nil, data) + assert.Nil(err) + message := <-c + assert.Contains(message, "SrcMac") + assert.Equal(message["SrcMac"], "0a:58:0a:f4:02:03") + assert.Contains(message, "SrcAddr") + assert.Equal(message["SrcAddr"], "10.244.2.3") + assert.Contains(message, "DstMac") + assert.Equal(message["DstMac"], "c2:ca:ed:8b:39:0a") + assert.Contains(message, "DstAddr") + assert.Equal(message["DstAddr"], "10.244.2.2") +} + +func TestWrapperError(t *testing.T) { + assert := assert.New(t) + c := make(chan map[string]interface{}, 5) + wrapper := NewWrapper(c) + data := []byte{ + 0xff, 0xab, 0xcd, 0xef, + } + err := wrapper.Send(nil, data) + assert.Error(err) +} From cb1d18fa0426dec74317cd455615549191e76ec3 Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Mon, 11 Oct 2021 17:31:55 +0200 Subject: [PATCH 3/7] Dockerfile update after consumming goflow2 as a library --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index ecfe1f9e..f3039c5e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,7 +8,7 @@ WORKDIR /build RUN go build -ldflags "-X main.version=${VERSION}" -o kube-enricher cmd/kube-enricher/main.go -FROM netsampler/goflow2:latest +FROM alpine:latest COPY --from=builder /build/kube-enricher / From cf1866b0fa02b5987bf945723cba50ed24b8e8e1 Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Tue, 12 Oct 2021 12:08:35 +0200 Subject: [PATCH 4/7] Fixing linter warning due to Message flow containing a lock --- pkg/format/netflow/wrapper.go | 2 +- pkg/format/pb/pb.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/format/netflow/wrapper.go b/pkg/format/netflow/wrapper.go index 5390f270..6df6ed82 100644 --- a/pkg/format/netflow/wrapper.go +++ b/pkg/format/netflow/wrapper.go @@ -22,7 +22,7 @@ func (w *TransportWrapper) Send(key, data []byte) error { if err != nil { return err } - renderedMsg, err := pbFormat.RenderMessage(message) + renderedMsg, err := pbFormat.RenderMessage(&message) if err == nil { w.c <- renderedMsg } diff --git a/pkg/format/pb/pb.go b/pkg/format/pb/pb.go index 2bb8f87d..678545f3 100644 --- a/pkg/format/pb/pb.go +++ b/pkg/format/pb/pb.go @@ -53,10 +53,10 @@ func (pbFormat *PbFormat) Next() (map[string]interface{}, error) { if err != nil { return nil, err } - return RenderMessage(message) + return RenderMessage(&message) } -func RenderMessage(message goflowpb.FlowMessage) (map[string]interface{}, error) { +func RenderMessage(message *goflowpb.FlowMessage) (map[string]interface{}, error) { outputMap := make(map[string]interface{}) err := ms.Decode(message, &outputMap) if err != nil { From b46e471b38b9589c6e1cb3d04231dfe65def20db Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Wed, 13 Oct 2021 11:45:48 +0200 Subject: [PATCH 5/7] Updated protobuf package --- go.mod | 2 +- pkg/format/netflow/wrapper.go | 2 +- pkg/format/pb/pb.go | 7 ++++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index ec13afe0..5833272a 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,11 @@ module github.com/netobserv/goflow2-kube-enricher go 1.12 require ( - github.com/golang/protobuf v1.4.3 github.com/mitchellh/mapstructure v1.1.2 github.com/netsampler/goflow2 v1.0.4 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.0 + google.golang.org/protobuf v1.25.0 k8s.io/api v0.20.1 k8s.io/apimachinery v0.20.1 k8s.io/client-go v0.20.1 diff --git a/pkg/format/netflow/wrapper.go b/pkg/format/netflow/wrapper.go index 6df6ed82..9f967eee 100644 --- a/pkg/format/netflow/wrapper.go +++ b/pkg/format/netflow/wrapper.go @@ -1,9 +1,9 @@ package netflow import ( - "github.com/golang/protobuf/proto" pbFormat "github.com/netobserv/goflow2-kube-enricher/pkg/format/pb" goflowpb "github.com/netsampler/goflow2/pb" + "google.golang.org/protobuf/proto" ) // This is en implementation of the goflow2 transport interface diff --git a/pkg/format/pb/pb.go b/pkg/format/pb/pb.go index 678545f3..3e6bf2da 100644 --- a/pkg/format/pb/pb.go +++ b/pkg/format/pb/pb.go @@ -7,10 +7,11 @@ import ( "io" "net" - "github.com/golang/protobuf/proto" ms "github.com/mitchellh/mapstructure" goflowFormat "github.com/netsampler/goflow2/format/common" goflowpb "github.com/netsampler/goflow2/pb" + "google.golang.org/protobuf/encoding/protowire" + "google.golang.org/protobuf/proto" ) type PbFormat struct { @@ -31,8 +32,8 @@ func (pbFormat *PbFormat) Next() (map[string]interface{}, error) { if err != nil && err != io.EOF { return nil, err } - len, lenSize := proto.DecodeVarint(lenBuf) - if len == 0 { + len, lenSize := protowire.ConsumeVarint(lenBuf) + if lenSize < 0 { return nil, errors.New("protobuf: Could not parse message length") } From 7346b8e498ee4281e4f1e76e603cfab7dcde3951 Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Thu, 14 Oct 2021 11:42:06 +0200 Subject: [PATCH 6/7] Different code improvements : - literal strings are now defined as constants - netflow format internal channel is now buffered - rename of GoflowDriver to Driver (already in netflow package) - rename of NewDriver function to StartDriver - StartDriver now also has a context parameter instead of creating one - simplification of the internal function of StartDriver --- cmd/kube-enricher/main.go | 14 ++++++++++---- pkg/format/netflow/netflow.go | 21 +++++++++++---------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/cmd/kube-enricher/main.go b/cmd/kube-enricher/main.go index b7c43e60..24e19d03 100644 --- a/cmd/kube-enricher/main.go +++ b/cmd/kube-enricher/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "net/url" @@ -21,6 +22,10 @@ import ( "k8s.io/client-go/rest" ) +const jsonFlagName = "json" +const pbFlagName = "pb" +const netflowScheme = "netflow" + var ( version = "unknown" app = "kube-enricher" @@ -59,9 +64,9 @@ func main() { var in format.Format if *listenAddress == "" { switch *stdinSourceFormat { - case "json": + case jsonFlagName: in = jsonFormat.NewScanner(os.Stdin) - case "pb": + case pbFlagName: in = pbFormat.NewScanner(os.Stdin) default: log.Fatal("Unknown source format: ", stdinSourceFormat) @@ -71,13 +76,14 @@ func main() { if err != nil { log.Fatal(err) } - if listenAddrUrl.Scheme == "netflow" { + if listenAddrUrl.Scheme == netflowScheme { hostname := listenAddrUrl.Hostname() port, err := strconv.ParseUint(listenAddrUrl.Port(), 10, 64) if err != nil { log.Fatal("Failed reading listening port: ", err) } - in = nfFormat.NewDriver(hostname, int(port)) + ctx := context.Background() + in = nfFormat.StartDriver(ctx, hostname, int(port)) } else { log.Fatal("Unknown listening protocol") } diff --git a/pkg/format/netflow/netflow.go b/pkg/format/netflow/netflow.go index 500bc956..d88bf4fc 100644 --- a/pkg/format/netflow/netflow.go +++ b/pkg/format/netflow/netflow.go @@ -10,18 +10,19 @@ import ( "github.com/sirupsen/logrus" ) -type GoflowDriver struct { +const channelSize = 5 + +type Driver struct { in chan map[string]interface{} } -func NewDriver(hostname string, port int) *GoflowDriver { - gf := GoflowDriver{} - gf.in = make(chan map[string]interface{}) - - go func(in chan map[string]interface{}) { - ctx := context.Background() +// Start a new go routine to handle netflow connections +func StartDriver(ctx context.Context, hostname string, port int) *Driver { + gf := Driver{} + gf.in = make(chan map[string]interface{}, channelSize) - transporter := NewWrapper(in) + go func() { + transporter := NewWrapper(gf.in) formatter, err := goflow2Format.FindFormat(ctx, "pb") if err != nil { @@ -36,12 +37,12 @@ func NewDriver(hostname string, port int) *GoflowDriver { err = sNF.FlowRoutine(1, hostname, port, false) log.Fatal(err) - }(gf.in) + }() return &gf } -func (gf *GoflowDriver) Next() (map[string]interface{}, error) { +func (gf *Driver) Next() (map[string]interface{}, error) { msg := <-gf.in return msg, nil } From d75e6e30b7c0044cdf398d756a07f22cc19c230d Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Thu, 14 Oct 2021 15:22:34 +0200 Subject: [PATCH 7/7] Changing Dockerfile to use ubi8 images --- Dockerfile | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/Dockerfile b/Dockerfile index f3039c5e..d470b584 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,15 +1,20 @@ -FROM golang:alpine as builder +FROM registry.access.redhat.com/ubi8/go-toolset:1.15.14-10 as builder ARG VERSION="" -RUN apk --update --no-cache add git build-base gcc +WORKDIR /opt/app-root/src + +COPY go.mod ./ +COPY go.sum ./ + +RUN go mod download + +COPY . . -COPY . /build -WORKDIR /build RUN go build -ldflags "-X main.version=${VERSION}" -o kube-enricher cmd/kube-enricher/main.go -FROM alpine:latest +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4-210 -COPY --from=builder /build/kube-enricher / +COPY --from=builder /opt/app-root/src/kube-enricher ./ -ENTRYPOINT ["./goflow2"] +ENTRYPOINT ["./kube-enricher"]