Skip to content

Commit

Permalink
Merge branch 'antrea-io:main' into antreaTest
Browse files Browse the repository at this point in the history
  • Loading branch information
devc007 authored Jan 11, 2025
2 parents 62dbfce + 5ee28ec commit 9bd3f70
Show file tree
Hide file tree
Showing 8 changed files with 426 additions and 45 deletions.
20 changes: 10 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/google/btree v1.1.3
github.com/google/uuid v1.6.0
github.com/gopacket/gopacket v1.3.1
github.com/hashicorp/memberlist v0.5.1
github.com/hashicorp/memberlist v0.5.2
github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.3.0
github.com/k8snetworkplumbingwg/sriov-cni v2.1.0+incompatible
github.com/kevinburke/ssh_config v1.2.0
Expand All @@ -42,12 +42,12 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/onsi/ginkgo/v2 v2.22.2
github.com/onsi/gomega v1.36.2
github.com/osrg/gobgp/v3 v3.32.0
github.com/osrg/gobgp/v3 v3.33.0
github.com/pkg/sftp v1.13.7
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/common v0.61.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/afero v1.11.0
github.com/spf13/afero v1.12.0
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -96,7 +96,7 @@ require (
github.com/alexflint/go-filemutex v1.2.0 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/armon/go-metrics v0.4.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.4 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.12.12 // indirect
Expand Down Expand Up @@ -155,6 +155,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-metrics v0.5.4 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-sockaddr v1.0.0 // indirect
Expand Down Expand Up @@ -218,8 +219,8 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.14 // indirect
go.etcd.io/etcd/client/v3 v3.5.14 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 // indirect
Expand All @@ -231,14 +232,13 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/oauth2 v0.24.0 // indirect
golang.org/x/oauth2 v0.25.0 // indirect
golang.org/x/term v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.zx2c4.com/wireguard v0.0.0-20210427022245-097af6e1351b // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
62 changes: 42 additions & 20 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/flowaggregator/clickhouseclient/clickhouseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ const (
egressIP,
appProtocolName,
httpVals,
egressNodeName)
egressNodeName)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?)`
Expand Down
4 changes: 3 additions & 1 deletion pkg/flowaggregator/exporter/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func (e *IPFIXExporter) sendRecord(record ipfixentities.Record, isRecordIPv6 boo
if err != nil {
return err
}
klog.V(4).InfoS("Data set sent successfully", "bytes sent", sentBytes)
if klog.V(7).Enabled() {
klog.InfoS("Data set sent successfully", "bytes sent", sentBytes)
}
return nil
}

Expand Down
84 changes: 76 additions & 8 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type flowAggregator struct {
clusterUUID uuid.UUID
aggregatorTransportProtocol flowaggregatorconfig.AggregatorTransportProtocol
collectingProcess ipfix.IPFIXCollectingProcess
preprocessor *preprocessor
aggregationProcess ipfix.IPFIXAggregationProcess
activeFlowRecordTimeout time.Duration
inactiveFlowRecordTimeout time.Duration
Expand Down Expand Up @@ -175,13 +176,18 @@ func NewFlowAggregator(
APIServer: opt.Config.APIServer,
logTickerDuration: time.Minute,
}
err = fa.InitCollectingProcess()
if err != nil {
return nil, fmt.Errorf("error when creating collecting process: %v", err)
if err := fa.InitCollectingProcess(); err != nil {
return nil, fmt.Errorf("error when creating collecting process: %w", err)
}
err = fa.InitAggregationProcess()
if err != nil {
return nil, fmt.Errorf("error when creating aggregation process: %v", err)
// Use a buffered channel which ideally should be large enough to accommodate all the records
// included in a given IPFIX message. It would be unusual to have more than 128 records in
// an IPFIX message.
recordCh := make(chan ipfixentities.Record, 128)
if err := fa.InitPreprocessor(recordCh); err != nil {
return nil, fmt.Errorf("error when creating preprocessor: %w", err)
}
if err := fa.InitAggregationProcess(recordCh); err != nil {
return nil, fmt.Errorf("error when creating aggregation process: %w", err)
}
if opt.Config.ClickHouse.Enable {
var err error
Expand Down Expand Up @@ -261,15 +267,72 @@ func (fa *flowAggregator) InitCollectingProcess() error {
len(infoelements.AntreaFlowEndSecondsElementList) + len(infoelements.AntreaThroughputElementList) + len(infoelements.AntreaSourceThroughputElementList) + len(infoelements.AntreaDestinationThroughputElementList)
// clusterId
cpInput.NumExtraElements += 1
// Tell the collector to accept IEs which are not part of the IPFIX registry (hardcoded in
// the go-ipfix library). The preprocessor will take care of removing these elements.
cpInput.DecodingMode = collector.DecodingModeLenientKeepUnknown
var err error
fa.collectingProcess, err = collector.InitCollectingProcess(cpInput)
return err
}

func (fa *flowAggregator) InitAggregationProcess() error {
func (fa *flowAggregator) InitPreprocessor(recordCh chan<- ipfixentities.Record) error {
getInfoElementFromRegistry := func(ieName string, enterpriseID uint32) (*ipfixentities.InfoElement, error) {
ie, err := fa.registry.GetInfoElement(ieName, enterpriseID)
if err != nil {
return nil, fmt.Errorf("error when looking up IE %q in registry: %w", ieName, err)
}
return ie, err
}

getInfoElements := func(isIPv4 bool) ([]*ipfixentities.InfoElement, error) {
ianaInfoElements := infoelements.IANAInfoElementsIPv4
ianaReverseInfoElements := infoelements.IANAReverseInfoElements
antreaInfoElements := infoelements.AntreaInfoElementsIPv4
if !isIPv4 {
ianaInfoElements = infoelements.IANAInfoElementsIPv6
antreaInfoElements = infoelements.AntreaInfoElementsIPv6
}
infoElements := make([]*ipfixentities.InfoElement, 0)
for _, ieName := range ianaInfoElements {
ie, err := getInfoElementFromRegistry(ieName, ipfixregistry.IANAEnterpriseID)
if err != nil {
return nil, err
}
infoElements = append(infoElements, ie)
}
for _, ieName := range ianaReverseInfoElements {
ie, err := getInfoElementFromRegistry(ieName, ipfixregistry.IANAReversedEnterpriseID)
if err != nil {
return nil, err
}
infoElements = append(infoElements, ie)
}
for _, ieName := range antreaInfoElements {
ie, err := getInfoElementFromRegistry(ieName, ipfixregistry.AntreaEnterpriseID)
if err != nil {
return nil, err
}
infoElements = append(infoElements, ie)
}
return infoElements, nil
}

infoElementsIPv4, err := getInfoElements(true)
if err != nil {
return err
}
infoElementsIPv6, err := getInfoElements(false)
if err != nil {
return err
}
fa.preprocessor, err = newPreprocessor(infoElementsIPv4, infoElementsIPv6, fa.collectingProcess.GetMsgChan(), recordCh)
return err
}

func (fa *flowAggregator) InitAggregationProcess(recordCh <-chan ipfixentities.Record) error {
var err error
apInput := ipfixintermediate.AggregationInput{
MessageChan: fa.collectingProcess.GetMsgChan(),
RecordChan: recordCh,
WorkerNum: aggregationWorkerNum,
CorrelateFields: correlateFields,
ActiveExpiryTimeout: fa.activeFlowRecordTimeout,
Expand All @@ -293,6 +356,11 @@ func (fa *flowAggregator) Run(stopCh <-chan struct{}) {
fa.collectingProcess.Start()
}()
ipfixProcessesWg.Add(1)
go func() {
defer ipfixProcessesWg.Done()
fa.preprocessor.Run(stopCh)
}()
ipfixProcessesWg.Add(1)
go func() {
// Same comment as above.
defer ipfixProcessesWg.Done()
Expand Down
11 changes: 6 additions & 5 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ func TestFlowAggregator_Run(t *testing.T) {
activeFlowRecordTimeout: 1 * time.Hour,
logTickerDuration: 1 * time.Hour,
collectingProcess: mockCollectingProcess,
preprocessor: &preprocessor{},
aggregationProcess: mockAggregationProcess,
ipfixExporter: mockIPFIXExporter,
configWatcher: configWatcher,
Expand Down Expand Up @@ -858,12 +859,12 @@ func TestFlowAggregator_InitAggregationProcess(t *testing.T) {
activeFlowRecordTimeout: testActiveTimeout,
inactiveFlowRecordTimeout: testInactiveTimeout,
aggregatorTransportProtocol: flowaggregatorconfig.AggregatorTransportProtocolTCP,
registry: ipfix.NewIPFIXRegistry(),
}
err := fa.InitCollectingProcess()
require.NoError(t, err)

err = fa.InitAggregationProcess()
require.NoError(t, err)
require.NoError(t, fa.InitCollectingProcess())
recordCh := make(chan ipfixentities.Record)
require.NoError(t, fa.InitPreprocessor(recordCh))
require.NoError(t, fa.InitAggregationProcess(recordCh))
}

func TestFlowAggregator_fillK8sMetadata(t *testing.T) {
Expand Down
Loading

0 comments on commit 9bd3f70

Please sign in to comment.