Skip to content

Commit

Permalink
add support for multiple flow monitoring protocols and for IPv6 endpo…
Browse files Browse the repository at this point in the history
…int in existing netflow test

- Existing netflow export test now supports netflow v5, ipfix, sflow.
- The original test only validated the export of netflow v5 (legacy netflow) data, which does not support IPv6 by design. In order to validate flow data export in an IPv6 cluster, we can now choose between IPFIX and sflow; also, the test now supports an IPv6 endpoint;
- For the time being we're only testing netflow v5 in IPv4 (as before) and sflow in IPv6, because:
  (1) The test itself is long (~5 minutes);
  (2) The current implementation of the flow collector container (cloudfare/goflow) doesn't support IPFIX data generated by OVS (cloudflare/goflow#99)

Signed-off-by: Riccardo Ravaioli <[email protected]>
  • Loading branch information
ricky-rav committed Oct 29, 2021
1 parent 24d24d5 commit adf3db7
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 89 deletions.
207 changes: 122 additions & 85 deletions test/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -2541,114 +2541,151 @@ var _ = ginkgo.Describe("e2e ingress gateway traffic validation", func() {
})
})

// This test validates OVS exports NetFlow data from br-int to an external collector
var _ = ginkgo.Describe("e2e br-int NetFlow export validation", func() {
// This test validates that OVS exports flow monitoring data from br-int to an external collector
var _ = ginkgo.Describe("e2e br-int flow monitoring export validation", func() {
type flowMonitoringProtocol string

const (
svcname string = "netflow-test"
ovnNs string = "ovn-kubernetes"
netFlowCollectorContainer string = "netflow-collector"
ciNetworkName string = "kind"
netflow_v5 flowMonitoringProtocol = "netflow"
ipfix flowMonitoringProtocol = "ipfix"
sflow flowMonitoringProtocol = "sflow"

svcname string = "netflow-test"
ovnNs string = "ovn-kubernetes"
collectorContainer string = "netflow-collector"
ciNetworkName string = "kind"
)

f := framework.NewDefaultFramework(svcname)
keywordInLogs := map[flowMonitoringProtocol]string{
netflow_v5: "NETFLOW_V5", ipfix: "IPFIX", sflow: "SFLOW_5"}

f := framework.NewDefaultFramework(svcname)
ginkgo.AfterEach(func() {
// tear down the NetFlow container
if cid, _ := runCommand("docker", "ps", "-qaf", fmt.Sprintf("name=%s", netFlowCollectorContainer)); cid != "" {
if _, err := runCommand("docker", "rm", "-f", netFlowCollectorContainer); err != nil {
framework.Logf("failed to delete the netFlow collector test container %s %v", netFlowCollectorContainer, err)
// tear down the collector container
if cid, _ := runCommand("docker", "ps", "-qaf", fmt.Sprintf("name=%s", collectorContainer)); cid != "" {
if _, err := runCommand("docker", "rm", "-f", collectorContainer); err != nil {
framework.Logf("failed to delete the collector test container %s %v",
collectorContainer, err)
}
}
})

ginkgo.It("Should validate NetFlow data of br-int is sent to an external gateway and unset NetFlow Targets", func() {
var (
ciNetworkFlag = "{{ .NetworkSettings.Networks.kind.IPAddress }}"
)
ginkgo.By("Starting a netflow collector container")
// start the NetFlow collector container that will receive data
_, err := runCommand("docker", "run", "-itd", "--privileged", "--network", ciNetworkName, "--name", netFlowCollectorContainer, "cloudflare/goflow", "-kafka=false")
if err != nil {
framework.Failf("failed to start NetFlow collector test container %s: %v", netFlowCollectorContainer, err)
}
// retrieve the container ip of the NetFlow collector container
netFlowCollectorIp, err := runCommand("docker", "inspect", "-f", ciNetworkFlag, netFlowCollectorContainer)
if err != nil {
framework.Failf("failed to start NetFlow collector test container: %v", err)
}
// trim newline from the inspect output
netFlowCollectorIp = strings.TrimSuffix(netFlowCollectorIp, "\n")
if ip := net.ParseIP(netFlowCollectorIp); ip == nil {
framework.Failf("Unable to retrieve a valid address from container %s with inspect output of %s", netFlowCollectorContainer, netFlowCollectorIp)
}
table.DescribeTable("Should validate flow data of br-int is sent to an external gateway",
func(protocol flowMonitoringProtocol, collectorPort uint16) {
protocolStr := string(protocol)
ipField := "IPAddress"
isIpv6 := IsIPv6Cluster(f.ClientSet)
if isIpv6 {
ipField = "GlobalIPv6Address"
}
ciNetworkFlag := fmt.Sprintf("{{ .NetworkSettings.Networks.kind.%s }}", ipField)

ginkgo.By("Configuring ovnkube-node to use the new netflow collector target")
framework.Logf("Setting OVN_NETFLOW_TARGETS environment variable value to NetFlow collector IP %s", netFlowCollectorIp)
framework.RunKubectlOrDie(ovnNs, "set", "env", "daemonset/ovnkube-node", "-c", "ovnkube-node", "OVN_NETFLOW_TARGETS="+netFlowCollectorIp+":2056")
ginkgo.By("Starting a flow collector container")
// start the collector container that will receive data
_, err := runCommand("docker", "run", "-itd", "--privileged", "--network", ciNetworkName,
"--name", collectorContainer, "cloudflare/goflow", "-kafka=false")
if err != nil {
framework.Failf("failed to start flow collector container %s: %v", collectorContainer, err)
}
ovnEnvVar := fmt.Sprintf("OVN_%s_TARGETS", strings.ToUpper(protocolStr))
// retrieve the ip of the collector container
collectorIP, err := runCommand("docker", "inspect", "-f", ciNetworkFlag, collectorContainer)
if err != nil {
framework.Failf("could not retrieve IP address of collector container: %v", err)
}
// trim newline from the inspect output
collectorIP = strings.TrimSpace(collectorIP)
if net.ParseIP(collectorIP) == nil {
framework.Failf("Unable to retrieve a valid address from container %s with inspect output of %s",
collectorContainer, collectorIP)
}
addressAndPort, err1 := formatAddressAndPort(collectorIP, collectorPort)
if err1 != nil {
framework.Failf("Unable to correctly format value for input env variable %s with "+
"IP address %s and port %d", ovnEnvVar, collectorIP, collectorPort)
}
ginkgo.By(fmt.Sprintf("Configuring ovnkube-node to use the new %s collector target", protocolStr))
framework.Logf("Setting %s environment variable to %s",
ovnEnvVar, addressAndPort)

// Make sure the updated daemonset has rolled out, verify it's completion 10 times
// TODO (Change this to use the exported upstream function)
err = waitForDaemonSetUpdate(f.ClientSet, ovnNs, "ovnkube-node", 0, dsRestartTimeout)
framework.ExpectNoError(err)
framework.RunKubectlOrDie(ovnNs, "set", "env", "daemonset/ovnkube-node", "-c", "ovnkube-node",
fmt.Sprintf("%s=%s", ovnEnvVar, addressAndPort))

ginkgo.By("Checking that the collector container received netflow")
netFlowCollectorContainerLogsTest := func() wait.ConditionFunc {
return func() (bool, error) {
netFlowCollectorContainerLogs, err := runCommand("docker", "logs", netFlowCollectorContainer)
if err != nil {
framework.Logf("failed to inspect logs in test container: %v", err)
// Make sure the updated daemonset has rolled out
// TODO (Change this to use the exported upstream function)
err = waitForDaemonSetUpdate(f.ClientSet, ovnNs, "ovnkube-node", 0, dsRestartTimeout)
framework.ExpectNoError(err)
ginkgo.By(fmt.Sprintf("Checking that the collector container received %s data", protocolStr))
keyword := keywordInLogs[protocol]
collectorContainerLogsTest := func() wait.ConditionFunc {
return func() (bool, error) {
collectorContainerLogs, err := runCommand("docker", "logs", collectorContainer)
if err != nil {
framework.Logf("failed to inspect logs in test container: %v", err)
return false, nil
}
collectorContainerLogs = strings.TrimSuffix(collectorContainerLogs, "\n")
logLines := strings.Split(collectorContainerLogs, "\n")
lastLine := logLines[len(logLines)-1]
// check that flow monitoring traffic has been logged
if strings.Contains(lastLine, keyword) {
framework.Logf("Successfully found string %s in last log line of"+
" the collector: %s", keyword, lastLine)
return true, nil
}
framework.Logf("%s not found in last log line: %s", keyword, lastLine)
return false, nil
}
netFlowCollectorContainerLogs = strings.TrimSuffix(netFlowCollectorContainerLogs, "\n")
logLines := strings.Split(netFlowCollectorContainerLogs, "\n")
lastLine := logLines[len(logLines)-1]
// check that NetFlow traffic has been logged.
if strings.Contains(lastLine, "NETFLOW_V5") {
framework.Logf("the NetFlow collector received NetFlow data, last logs: %s", logLines[len(logLines)-1])
return true, nil
}
return false, nil
}
}

err = wait.PollImmediate(retryInterval, retryTimeout, netFlowCollectorContainerLogsTest())
framework.ExpectNoError(err, "failed to verify that NetFlow collector container received NetFlow data from br-int")
err = wait.PollImmediate(retryInterval, retryTimeout, collectorContainerLogsTest())
framework.ExpectNoError(err, fmt.Sprintf("failed to verify that collector container "+
"received %s data from br-int: string %s not found in logs",
protocolStr, keyword))

ginkgo.By("Unsetting the OVN_NETFLOW_TARGETS variable in the ovnkube-node daemonset")
framework.RunKubectlOrDie(ovnNs, "set", "env", "daemonset/ovnkube-node", "-c", "ovnkube-node", "OVN_NETFLOW_TARGETS-")
ginkgo.By(fmt.Sprintf("Unsetting %s variable in ovnkube-node daemonset", ovnEnvVar))
framework.RunKubectlOrDie(ovnNs, "set", "env", "daemonset/ovnkube-node", "-c", "ovnkube-node",
fmt.Sprintf("%s-", ovnEnvVar))

// Make sure the updated daemonset has rolled out, verify it's completion 10 times
// TODO (Change this to use the exported upstream function)
err = waitForDaemonSetUpdate(f.ClientSet, ovnNs, "ovnkube-node", 0, dsRestartTimeout)
framework.ExpectNoError(err)
// Make sure the updated daemonset has rolled out
// TODO (Change this to use the exported upstream function)
err = waitForDaemonSetUpdate(f.ClientSet, ovnNs, "ovnkube-node", 0, dsRestartTimeout)
framework.ExpectNoError(err)

ovnKubeNodePods, err := f.ClientSet.CoreV1().Pods(ovnNs).List(context.TODO(), metav1.ListOptions{
LabelSelector: "name=ovnkube-node",
})
if err != nil {
framework.Failf("could not get ovnkube-node pods: %v", err)
}
ovnKubeNodePods, err := f.ClientSet.CoreV1().Pods(ovnNs).List(context.TODO(), metav1.ListOptions{
LabelSelector: "name=ovnkube-node",
})
if err != nil {
framework.Failf("could not get ovnkube-node pods: %v", err)
}

for _, ovnKubeNodePod := range ovnKubeNodePods.Items {
for _, ovnKubeNodePod := range ovnKubeNodePods.Items {

execOptions := framework.ExecOptions{
Command: []string{"ovs-vsctl", "find", "netflow"},
Namespace: ovnNs,
PodName: ovnKubeNodePod.Name,
ContainerName: "ovnkube-node",
CaptureStdout: true,
CaptureStderr: true,
}
execOptions := framework.ExecOptions{
Command: []string{"ovs-vsctl", "find", strings.ToLower(protocolStr)},
Namespace: ovnNs,
PodName: ovnKubeNodePod.Name,
ContainerName: "ovnkube-node",
CaptureStdout: true,
CaptureStderr: true,
}

targets, stderr, _ := f.ExecWithOptions(execOptions)
framework.Logf("execOptions are %v", execOptions)
if err != nil {
framework.Failf("could not lookup ovs netflow targets: %v", stderr)
targets, stderr, _ := f.ExecWithOptions(execOptions)
framework.Logf("execOptions are %v", execOptions)
if err != nil {
framework.Failf("could not lookup ovs %s targets: %v", protocolStr, stderr)
}
framework.ExpectEmpty(targets)
}
framework.ExpectEmpty(targets)
}
},
// This is a long test (~5 minutes per run), so let's just validate netflow v5
// in an IPv4 cluster and sflow in IPv6 cluster
table.Entry("with netflow v5", netflow_v5, uint16(2056)),
// goflow doesn't currently support OVS ipfix:
// https://github.com/cloudflare/goflow/issues/99
// table.Entry("ipfix", ipfix, uint16(2055)),
table.Entry("with sflow", sflow, uint16(6343)),
)

})
})

func getNodePodCIDR(nodeName string) (string, error) {
Expand Down
15 changes: 15 additions & 0 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,21 @@ func addressIsIP(address v1.NodeAddress) bool {
return true
}

func formatAddressAndPort(addr string, port uint16) (string, error) {
var (
err error
addrAndPort string
)
if utilnet.IsIPv4String(addr) {
addrAndPort = fmt.Sprintf("%s:%d", addr, port)
} else if utilnet.IsIPv6String(addr) {
addrAndPort = fmt.Sprintf("[%s]:%d", addr, port)
} else {
err = error(fmt.Errorf("IP address %s is not valid", addr))
}
return addrAndPort, err
}

// Returns pod's ipv4 and ipv6 addresses IN ORDER
func getPodAddresses(pod *v1.Pod) (string, string) {
var ipv4Res, ipv6Res string
Expand Down
16 changes: 12 additions & 4 deletions test/scripts/e2e-cp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,23 @@ export KUBERNETES_CONFORMANCE_TEST=y
export KUBECONFIG=${HOME}/admin.conf

# Skip tests which are not IPv6 ready yet (see description of https://github.com/ovn-org/ovn-kubernetes/pull/2276)
# (Note that netflow v5 is IPv4 only)
IPV6_SKIPPED_TESTS="Should be allowed by externalip services|\
should provide connection to external host by DNS name from a pod|\
Should validate NetFlow data of br-int is sent to an external gateway|\
Should validate flow data of br-int is sent to an external gateway with netflow v5|\
test tainting a node according to its defaults interface MTU size"

SKIPPED_TESTS=""
if [ "$KIND_IPV4_SUPPORT" == true ] && [ "$KIND_IPV6_SUPPORT" == true ]; then
# No support for these features in dual-stack yet
SKIPPED_TESTS="hybrid.overlay|external.gateway"

if [ "$KIND_IPV4_SUPPORT" == true ]; then
if [ "$KIND_IPV6_SUPPORT" == true ]; then
# No support for these features in dual-stack yet
SKIPPED_TESTS="hybrid.overlay|external.gateway"
else
# Skip sflow in IPv4 since it's a long test (~5 minutes)
# We're validating netflow v5 with an ipv4 cluster, sflow with an ipv6 cluster
SKIPPED_TESTS="Should validate flow data of br-int is sent to an external gateway with sflow"
fi
fi

if [ "$OVN_HA" == false ]; then
Expand Down

0 comments on commit adf3db7

Please sign in to comment.