diff --git a/.dockerignore b/.dockerignore index 667960d..8710772 100644 --- a/.dockerignore +++ b/.dockerignore @@ -2,3 +2,4 @@ # SPDX-License-Identifier: Apache-2.0 * +!requirements.txt \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 5710ee7..cc7496e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,7 +23,8 @@ FROM ubuntu:20.04 as runtime ARG TREX_EXT_LIBS ARG TREX_LIBS RUN apt update && apt install -y dumb-init python3-dev build-essential python3-pip -RUN pip3 install scipy==1.5.4 numpy==1.19.4 matplotlib==3.3.3 +ADD requirements.txt / +RUN pip3 install -r requirements.txt ENV TREX_EXT_LIBS=${TREX_EXT_LIBS} ENV PYTHONPATH=/workspace/trex-scripts:${TREX_EXT_LIBS}:${TREX_LIBS} COPY --from=builder /output / diff --git a/requirements.txt b/requirements.txt index ffc89a4..316685d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ scapy==2.4.3 scipy==1.5.4 numpy==1.19.4 matplotlib==3.3.3 +p4runtime==1.3.0 diff --git a/trex-scripts/control.py b/trex-scripts/control.py index 05f4b4f..03053ae 100755 --- a/trex-scripts/control.py +++ b/trex-scripts/control.py @@ -220,6 +220,7 @@ def main() -> int: trex_client.stop() trex_client.release() trex_client.disconnect() + test.stop() except ConnectionRefusedError: logging.error( "Unable to connect to server %s.\n" + "Did you start the Trex daemon?", diff --git a/trex-scripts/lib/base_test.py b/trex-scripts/lib/base_test.py index 5bef3f6..0c74322 100644 --- a/trex-scripts/lib/base_test.py +++ b/trex-scripts/lib/base_test.py @@ -19,6 +19,17 @@ def start(self, args: dict = {}) -> None: """ pass + @abstractmethod + def stop(self) -> None: + """ + Stop the test, will always be called after the test. + + :parameters: + args: dict + The test arguments + """ + pass + @abstractclassmethod def test_type(cls) -> str: """ diff --git a/trex-scripts/lib/fabric_tna.py b/trex-scripts/lib/fabric_tna.py new file mode 100644 index 0000000..93fb04f --- /dev/null +++ b/trex-scripts/lib/fabric_tna.py @@ -0,0 +1,299 @@ +# SPDX-FileCopyrightText: Copyright 2020-present Open Networking Foundation. +# SPDX-License-Identifier: Apache-2.0 + +# Utilities to generate table entries and groups for fabric-tna pipeline + +from argparse import ArgumentParser + +from lib.p4r import P4RuntimeTest +from lib.utils import ipv4_to_binary, mac_to_binary, stringify + +DEFAULT_PRIORITY = 10 +MAC_MASK = ":".join(["ff"] * 6) +ETH_TYPE_IPV4 = 0x0800 +ETH_TYPE_MPLS_UNICAST = 0x8847 +FORWARDING_TYPE_UNICAST_IPV4 = 2 +MIRROR_TYPE_INT_REPORT = 1 +BRIDGED_MD_TYPE_EGRESS_MIRROR = 2 +INT_REPORT_TYPE_LOCAL = 1 + + +class FabricTnaTest(P4RuntimeTest): + @classmethod + def setup_subparser(cls, parser: ArgumentParser) -> None: + P4RuntimeTest.setup_subparser(parser) + + def connect( + self, + grpc_addr="localhost:9339", + device_id=1, + p4info=None, + election_id=1, + pipeline_config=None, + ): + super().connect( + grpc_addr=grpc_addr, + device_id=device_id, + p4info=p4info, + election_id=election_id, + pipeline_config=pipeline_config, + ) + self.next_mbr_id = 1 + + def set_ingress_port_vlan( + self, + ingress_port, + vlan_valid=False, + vlan_id=0, + internal_vlan_id=0, + inner_vlan_id=None, + priority=DEFAULT_PRIORITY, + ): + ingress_port_ = stringify(ingress_port, 2) + vlan_valid_ = b"\x01" if vlan_valid else b"\x00" + vlan_id_ = stringify(vlan_id, 2) + vlan_id_mask_ = stringify(4095 if vlan_valid else 0, 2) + new_vlan_id_ = stringify(internal_vlan_id, 2) + action_name = "permit" if vlan_valid else "permit_with_internal_vlan" + action_params = [] if vlan_valid else [("vlan_id", new_vlan_id_)] + matches = [ + self.Exact("ig_port", ingress_port_), + self.Exact("vlan_is_valid", vlan_valid_), + ] + if vlan_id_mask_ != b"\x00\x00": + matches.append(self.Ternary("vlan_id", vlan_id_, vlan_id_mask_)) + if inner_vlan_id is not None: + # Match on inner_vlan, only when explicitly requested + inner_vlan_id_ = stringify(inner_vlan_id, 2) + inner_vlan_id_mask_ = stringify(4095, 2) + matches.append( + self.Ternary("inner_vlan_id", inner_vlan_id_, inner_vlan_id_mask_) + ) + + return self.send_request_add_entry_to_action( + "filtering.ingress_port_vlan", + matches, + "filtering." + action_name, + action_params, + priority, + ) + + def set_egress_vlan_pop(self, egress_port, vlan_id): + egress_port = stringify(egress_port, 2) + vlan_id = stringify(vlan_id, 2) + self.send_request_add_entry_to_action( + "egress_next.egress_vlan", + [self.Exact("vlan_id", vlan_id), self.Exact("eg_port", egress_port),], + "egress_next.pop_vlan", + [], + ) + + def set_up_port( + self, port_id, vlan_id, tagged=False, double_tagged=False, inner_vlan_id=0, + ): + if double_tagged: + self.set_ingress_port_vlan( + ingress_port=port_id, + vlan_id=vlan_id, + vlan_valid=True, + inner_vlan_id=inner_vlan_id, + ) + elif tagged: + self.set_ingress_port_vlan( + ingress_port=port_id, vlan_id=vlan_id, vlan_valid=True + ) + else: + self.set_ingress_port_vlan( + ingress_port=port_id, vlan_valid=False, internal_vlan_id=vlan_id, + ) + self.set_egress_vlan_pop(egress_port=port_id, vlan_id=vlan_id) + + def set_forwarding_type( + self, + ingress_port, + eth_dstAddr, + eth_dstMask=MAC_MASK, + ethertype=ETH_TYPE_IPV4, + fwd_type=FORWARDING_TYPE_UNICAST_IPV4, + ): + ingress_port_ = stringify(ingress_port, 2) + eth_dstAddr_ = mac_to_binary(eth_dstAddr) + eth_mask_ = mac_to_binary(eth_dstMask) + if ethertype == ETH_TYPE_IPV4: + ethertype_ = stringify(0, 2) + ethertype_mask_ = stringify(0, 2) + ip_eth_type = stringify(ethertype, 2) + elif ethertype == ETH_TYPE_MPLS_UNICAST: + ethertype_ = stringify(ETH_TYPE_MPLS_UNICAST, 2) + ethertype_mask_ = stringify(0xFFFF, 2) + # FIXME: this will work only for MPLS+IPv4 traffic + ip_eth_type = stringify(ETH_TYPE_IPV4, 2) + else: + # TODO: what should we match on? I should never reach this point. + return + fwd_type_ = stringify(fwd_type, 1) + matches = [ + self.Exact("ig_port", ingress_port_), + self.Ternary("eth_dst", eth_dstAddr_, eth_mask_), + self.Exact("ip_eth_type", ip_eth_type), + ] + if ethertype_mask_ != b"\x00\x00": + matches.append(self.Ternary("eth_type", ethertype_, ethertype_mask_)) + self.send_request_add_entry_to_action( + "filtering.fwd_classifier", + matches, + "filtering.set_forwarding_type", + [("fwd_type", fwd_type_)], + priority=DEFAULT_PRIORITY, + ) + + def add_forwarding_routing_v4_entry(self, ipv4_dstAddr, ipv4_pLen, next_id): + ipv4_dstAddr_ = ipv4_to_binary(ipv4_dstAddr) + next_id_ = stringify(next_id, 4) + self.send_request_add_entry_to_action( + "forwarding.routing_v4", + [self.Lpm("ipv4_dst", ipv4_dstAddr_, ipv4_pLen)], + "forwarding.set_next_id_routing_v4", + [("next_id", next_id_)], + ) + + def add_next_routing(self, next_id, egress_port, smac, dmac): + egress_port_ = stringify(egress_port, 2) + smac_ = mac_to_binary(smac) + dmac_ = mac_to_binary(dmac) + self.add_next_hashed_group_action( + next_id, + egress_port, + [ + [ + "next.routing_hashed", + [("port_num", egress_port_), ("smac", smac_), ("dmac", dmac_)], + ] + ], + ) + + def add_next_vlan(self, next_id, new_vlan_id): + next_id_ = stringify(next_id, 4) + vlan_id_ = stringify(new_vlan_id, 2) + self.send_request_add_entry_to_action( + "next.next_vlan", + [self.Exact("next_id", next_id_)], + "next.set_vlan", + [("vlan_id", vlan_id_)], + ) + + def set_up_report_flow( + self, src_mac, mon_mac, src_ip, mon_ip, mon_port, mon_label=None + ): + action = "do_report_encap" + action_params = [ + ("src_mac", mac_to_binary(src_mac)), + ("mon_mac", mac_to_binary(mon_mac)), + ("src_ip", ipv4_to_binary(src_ip)), + ("mon_ip", ipv4_to_binary(mon_ip)), + ("mon_port", stringify(mon_port, 2)), + ] + if mon_label: + action = "do_report_encap_mpls" + action_params.append(("mon_label", stringify(mon_label, 3))) + + self.send_request_add_entry_to_action( + "report", + [ + self.Exact("bmd_type", stringify(BRIDGED_MD_TYPE_EGRESS_MIRROR, 1)), + self.Exact("mirror_type", stringify(MIRROR_TYPE_INT_REPORT, 1)), + self.Exact("int_report_type", stringify(INT_REPORT_TYPE_LOCAL, 1)), + ], + action, + action_params, + ) + + def set_up_report_mirror_flow(self, mirror_id, port): + self.add_clone_group(mirror_id, [port]) + + def set_up_flow_report_filter_config(self, hop_latency_mask, timestamp_mask): + self.send_request_add_entry_to_action( + "FabricEgress.int_egress.flow_report_filter.config", + [], + "FabricEgress.int_egress.flow_report_filter.set_config", + [ + ("hop_latency_mask", stringify(hop_latency_mask, 4)), + ("timestamp_mask", stringify(timestamp_mask, 6)), + ], + ) + + def set_up_watchlist_flow( + self, + ipv4_src, + ipv4_dst, + ipv4_src_mask="255.255.255.255", + ipv4_dst_mask="255.255.255.255", + sport=None, + dport=None, + ): + ipv4_src_ = ipv4_to_binary(ipv4_src) + ipv4_dst_ = ipv4_to_binary(ipv4_dst) + ipv4_src_mask_ = ipv4_to_binary(ipv4_src_mask) + ipv4_dst_mask_ = ipv4_to_binary(ipv4_dst_mask) + # Use full range of TCP/UDP ports by default. + sport_low = stringify(0, 2) + sport_high = stringify(0xFFFF, 2) + dport_low = stringify(0, 2) + dport_high = stringify(0xFFFF, 2) + + if sport: + sport_low = stringify(sport, 2) + sport_high = stringify(sport, 2) + + if dport: + dport_low = stringify(dport, 2) + dport_high = stringify(dport, 2) + + self.send_request_add_entry_to_action( + "watchlist", + [ + self.Ternary("ipv4_src", ipv4_src_, ipv4_src_mask_), + self.Ternary("ipv4_dst", ipv4_dst_, ipv4_dst_mask_), + self.Range("l4_sport", sport_low, sport_high), + self.Range("l4_dport", dport_low, dport_high), + ], + "mark_to_report", + [], + priority=DEFAULT_PRIORITY, + ) + + def set_up_int_mirror_flow(self, switch_id, report_type=INT_REPORT_TYPE_LOCAL): + switch_id_ = stringify(switch_id, 4) + report_type_ = stringify(report_type, 1) + self.send_request_add_entry_to_action( + "int_metadata", + [self.Exact("int_report_type", report_type_),], + "set_metadata", + [("switch_id", switch_id_)], + ) + + # actions is a tuple (action_name, param_tuples) + # params_tuples contains a tuple for each param (param_name, param_value) + def add_next_hashed_group_action(self, next_id, grp_id, actions=()): + next_id_ = stringify(next_id, 4) + mbr_ids = [] + for action in actions: + mbr_id = self.get_next_mbr_id() + mbr_ids.append(mbr_id) + self.send_request_add_member( + "FabricIngress.next.hashed_profile", mbr_id, *action + ) + self.send_request_add_group( + "FabricIngress.next.hashed_profile", + grp_id, + grp_size=len(mbr_ids), + mbr_ids=mbr_ids, + ) + self.send_request_add_entry_to_group( + "next.hashed", [self.Exact("next_id", next_id_)], grp_id + ) + + def get_next_mbr_id(self): + mbr_id = self.next_mbr_id + self.next_mbr_id += 1 + return mbr_id diff --git a/trex-scripts/lib/p4r.py b/trex-scripts/lib/p4r.py new file mode 100644 index 0000000..e7c397f --- /dev/null +++ b/trex-scripts/lib/p4r.py @@ -0,0 +1,722 @@ +# SPDX-FileCopyrightText: Copyright 2020-present Open Networking Foundation. +# SPDX-License-Identifier: Apache-2.0 + +# A module that includes all P4Runtime related utilities. + +import logging +import os +import queue +import threading +import time +from argparse import ArgumentParser +from collections import Counter +from functools import partial + +import grpc +from google.protobuf import text_format +from google.rpc import code_pb2, status_pb2 +from lib.utils import stringify +from p4.config.v1 import p4info_pb2 +from p4.v1 import p4runtime_pb2, p4runtime_pb2_grpc + +log = logging.getLogger("P4Runtime Client") + + +# Used to indicate that the gRPC error Status object returned by the server has +# an incorrect format. +class P4RuntimeErrorFormatException(Exception): + def __init__(self, message): + super(P4RuntimeErrorFormatException, self).__init__(message) + + +# Used to iterate over the p4.Error messages in a gRPC error Status object +class P4RuntimeErrorIterator: + def __init__(self, grpc_error): + assert grpc_error.code() == grpc.StatusCode.UNKNOWN + self.grpc_error = grpc_error + + error = None + # The gRPC Python package does not have a convenient way to access the + # binary details for the error: they are treated as trailing metadata. + for meta in self.grpc_error.trailing_metadata(): + if meta[0] == "grpc-status-details-bin": + error = status_pb2.Status() + error.ParseFromString(meta[1]) + break + if error is None: + raise P4RuntimeErrorFormatException("No binary details field") + + # if len(error.details) == 0: + # raise P4RuntimeErrorFormatException( + # "Binary details field has empty Any details repeated field") + self.errors = error.details + self.idx = 0 + + def __iter__(self): + return self + + def __next__(self): + while self.idx < len(self.errors): + p4_error = p4runtime_pb2.Error() + one_error_any = self.errors[self.idx] + if not one_error_any.Unpack(p4_error): + raise P4RuntimeErrorFormatException( + "Cannot convert Any message to p4.Error" + ) + if p4_error.canonical_code == code_pb2.OK: + continue + v = self.idx, p4_error + self.idx += 1 + return v + raise StopIteration + + +# P4Runtime uses a 3-level message in case of an error during the processing of +# a write batch. This means that if we do not wrap the grpc.RpcError inside a +# custom exception, we can end-up with a non-helpful exception message in case +# of failure as only the first level will be printed. In this custom exception +# class, we extract the nested error message (one for each operation included +# in the batch) in order to print error code + user-facing message. +# See P4 Runtime documentation for more details on error-reporting. +class P4RuntimeException(Exception): + def __init__(self, grpc_error): + assert grpc_error.code() == grpc.StatusCode.UNKNOWN + super(P4RuntimeException, self).__init__() + self.grpc_error = grpc_error + self.errors = [] + try: + error_iterator = P4RuntimeErrorIterator(grpc_error) + for error_tuple in error_iterator: + self.errors.append(error_tuple) + except P4RuntimeErrorFormatException: + raise # just propagate exception for now + + def __str__(self): + message = "Error(s) during RPC: {} {}\n".format( + self.grpc_error.code(), self.grpc_error.details() + ) + for idx, p4_error in self.errors: + code_name = code_pb2._CODE.values_by_number[p4_error.canonical_code].name + message += "\t* At index {}: {}, '{}'\n".format( + idx, code_name, p4_error.message + ) + return message + + +class P4RuntimeTest: + @classmethod + def setup_subparser(cls, parser: ArgumentParser) -> None: + parser.add_argument( + "--set-up-flows", + help="Set up flows on the switch", + action="store_true", + default=False, + ) + parser.add_argument( + "--switch-addr", + type=str, + help="P4Runtime server address", + default="localhost:9339", + ) + parser.add_argument("--p4info", type=str, help="P4Info file", default="") + parser.add_argument( + "--pipeline-config", type=str, help="Pipeline config file", default="" + ) + + def connect( + self, + grpc_addr="localhost:9339", + device_id=1, + p4info=None, + election_id=1, + pipeline_config=None, + ): + if not os.path.exists(p4info): + raise RuntimeError("P4Info file: %s does not exists.", p4info) + if not os.path.exists(pipeline_config): + raise RuntimeError( + "Pipeline config file: %s does not exists.", pipeline_config + ) + self.p4info = p4info_pb2.P4Info() + with open(p4info, "rb") as fin: + text_format.Merge(fin.read(), self.p4info) + self.channel = grpc.insecure_channel(grpc_addr) + self.stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel) + self.device_id = device_id + self.election_id = election_id + + # Set up stream + self.stream_out_q = queue.Queue() + self.stream_in_q = queue.Queue() + + def stream_req_iterator(): + while True: + p = self.stream_out_q.get() + if p is None: + break + yield p + + def stream_recv(stream): + for p in stream: + self.stream_in_q.put(p) + + self.stream = self.stub.StreamChannel(stream_req_iterator()) + self.stream_recv_thread = threading.Thread( + target=stream_recv, args=(self.stream,) + ) + self.stream_recv_thread.start() + + # Handshake with device + req = p4runtime_pb2.StreamMessageRequest() + arbitration = req.arbitration + arbitration.device_id = self.device_id + election_id = arbitration.election_id + election_id.high = 0 + election_id.low = self.election_id + self.stream_out_q.put(req) + + rep = self.get_stream_packet("arbitration", timeout=2) + if rep is None: + self.stop() + raise RuntimeError("Failed to establish handshake") + + # Push pipeline config + req = p4runtime_pb2.SetForwardingPipelineConfigRequest() + req.device_id = device_id + req.election_id.high = 0 + req.election_id.low = self.election_id + req.config.p4info.CopyFrom(self.p4info) + req.config.p4_device_config = b"" + with open(pipeline_config, "rb") as pipeline_config_f: + req.config.p4_device_config += pipeline_config_f.read() + req.action = p4runtime_pb2.SetForwardingPipelineConfigRequest.VERIFY_AND_COMMIT + + try: + self.stub.SetForwardingPipelineConfig(req) + except Exception as e: + self.stop() + raise RuntimeError("Failed to push pipeline config") + + self.import_p4info_names() + + def disconnect(self): + self.stream_out_q.put(None) + self.stream_recv_thread.join() + + def import_p4info_names(self): + """ + Import all name and ID from given p4info + """ + self.p4info_obj_map = {} + self.p4info_id_to_name = {} + suffix_count = Counter() + for p4_obj_type in [ + "tables", + "action_profiles", + "actions", + "counters", + "direct_counters", + ]: + for obj in getattr(self.p4info, p4_obj_type): + pre = obj.preamble + suffix = None + for s in reversed(pre.name.split(".")): + suffix = s if suffix is None else s + "." + suffix + key = (p4_obj_type, suffix) + self.p4info_obj_map[key] = obj + suffix_count[key] += 1 + self.p4info_id_to_name[pre.id] = pre.name + for key, c in suffix_count.items(): + if c > 1: + del self.p4info_obj_map[key] + + # Add p4info object and object id "getters" for each object type; these are + # just wrappers around self.get_obj and self.get_obj_id. + # For example: self.get_table(x) and self.get_table_id(x) respectively call + # get_obj("tables", x) and get_obj_id("tables", x) + for obj_type, nickname in [ + ("tables", "table"), + ("action_profiles", "ap"), + ("actions", "action"), + ("counters", "counter"), + ("direct_counters", "direct_counter"), + ]: + name = "_".join(["get", nickname]) + setattr(self, name, partial(self.get_obj, obj_type)) + name = "_".join(["get", nickname, "id"]) + setattr(self, name, partial(self.get_obj_id, obj_type)) + + def get_obj(self, p4_obj_type, p4_name): + key = (p4_obj_type, p4_name) + obj = self.p4info_obj_map.get(key, None) + if obj is None: + raise Exception( + "Unable to find {} '{}' in p4info".format(p4_obj_type, p4_name) + ) + return obj + + def get_obj_id(self, p4_obj_type, p4_name): + obj = self.get_obj(p4_obj_type, p4_name) + return obj.preamble.id + + def get_obj_name_from_id(self, p4info_id): + return self.p4info_id_to_name[p4info_id] + + def get_param_id(self, action_name, param_name): + a = self.get_obj("actions", action_name) + for p in a.params: + if p.name == param_name: + return p.id + raise Exception( + "Param '%s' not found in action '%s'" % (param_name, action_name) + ) + + def get_mf_id(self, table_name, mf_name): + t = self.get_obj("tables", table_name) + if t is None: + return None + for mf in t.match_fields: + if mf.name == mf_name: + return mf.id + raise Exception( + "Match field '%s' not found in table '%s'" % (mf_name, table_name) + ) + + def get_stream_packet(self, type_, timeout=1): + start = time.time() + try: + while True: + remaining = timeout - (time.time() - start) + if remaining < 0: + break + msg = self.stream_in_q.get(timeout=remaining) + if not msg.HasField(type_): + continue + return msg + except Exception: # timeout expired + pass + return None + + # These are attempts at convenience functions aimed at making writing + # P4Runtime PTF tests easier. + + class MF(object): + def __init__(self, mf_name): + self.name = mf_name + + class Exact(MF): + def __init__(self, mf_name, v): + super(P4RuntimeTest.Exact, self).__init__(mf_name) + self.v = v + + def add_to(self, mf_id, mk): + mf = mk.add() + mf.field_id = mf_id + mf.exact.value = self.v + + class Lpm(MF): + def __init__(self, mf_name, v, pLen): + super(P4RuntimeTest.Lpm, self).__init__(mf_name) + self.v = v + self.pLen = pLen + + def add_to(self, mf_id, mk): + # P4Runtime mandates that the match field should be omitted for + # "don't care" LPM matches (i.e. when prefix length is zero) + if self.pLen == 0: + return + mf = mk.add() + mf.field_id = mf_id + mf.lpm.prefix_len = self.pLen + mf.lpm.value = b"" + + # P4Runtime now has strict rules regarding ternary matches: in the + # case of LPM, trailing bits in the value (after prefix) must be set + # to 0. + first_byte_masked = self.pLen // 8 + for i in range(first_byte_masked): + mf.lpm.value += stringify(self.v[i], 1) + if first_byte_masked == len(self.v): + return + r = self.pLen % 8 + mf.lpm.value += stringify(self.v[first_byte_masked] & (0xFF << (8 - r)), 1) + for i in range(first_byte_masked + 1, len(self.v)): + mf.lpm.value += b"\x00" + + class Ternary(MF): + def __init__(self, mf_name, v, mask): + super(P4RuntimeTest.Ternary, self).__init__(mf_name) + self.v = v + self.mask = mask + + def add_to(self, mf_id, mk): + # P4Runtime mandates that the match field should be omitted for + # "don't care" ternary matches (i.e. when mask is zero) + if all(c == b"\x00" for c in self.mask): + return + mf = mk.add() + mf.field_id = mf_id + assert len(self.mask) == len(self.v) + mf.ternary.mask = self.mask + mf.ternary.value = b"" + # P4Runtime now has strict rules regarding ternary matches: in the + # case of Ternary, "don't-care" bits in the value must be set to 0 + for i in range(len(self.mask)): + mf.ternary.value += stringify(self.v[i] & self.mask[i], 1) + + class Range(MF): + def __init__(self, mf_name, low, high): + super(P4RuntimeTest.Range, self).__init__(mf_name) + self.low = low + self.high = high + + def add_to(self, mf_id, mk): + # P4Runtime mandates that the match field should be omitted for + # "don't care" range matches (i.e. when all possible values are + # included in the range) + # TODO(antonin): negative values? + low_is_zero = all(c == b"\x00" for c in self.low) + high_is_max = all(c == b"\xff" for c in self.high) + if low_is_zero and high_is_max: + return + mf = mk.add() + mf.field_id = mf_id + assert len(self.high) == len(self.low) + mf.range.low = self.low + mf.range.high = self.high + + # Sets the match key for a p4::TableEntry object. mk needs to be an + # iterable object of MF instances + def set_match_key(self, table_entry, t_name, mk): + for mf in mk: + mf_id = self.get_mf_id(t_name, mf.name) + mf.add_to(mf_id, table_entry.match) + + def set_action(self, action, a_name, params): + action.action_id = self.get_action_id(a_name) + for p_name, v in params: + param = action.params.add() + param.param_id = self.get_param_id(a_name, p_name) + param.value = v + + # Sets the action & action data for a p4::TableEntry object. params needs + # to be an iterable object of 2-tuples (, ). + def set_action_entry(self, table_entry, a_name, params): + self.set_action(table_entry.action.action, a_name, params) + + def _write(self, req): + try: + return self.stub.Write(req) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.UNKNOWN: + raise e + raise P4RuntimeException(e) + + def read_request(self, req): + entities = [] + try: + for resp in self.stub.Read(req): + entities.extend(resp.entities) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.UNKNOWN: + raise e + raise P4RuntimeException(e) + return entities + + def write_request(self, req): + rep = self._write(req) + return rep + + def get_new_write_request(self): + req = p4runtime_pb2.WriteRequest() + req.device_id = self.device_id + election_id = req.election_id + election_id.high = 0 + election_id.low = self.election_id + return req + + def get_new_read_request(self): + req = p4runtime_pb2.ReadRequest() + req.device_id = self.device_id + return req + + def get_new_read_response(self): + resp = p4runtime_pb2.ReadResponse() + return resp + + # + # Convenience functions to build and send P4Runtime write requests + # + + def _push_update_member(self, req, ap_name, mbr_id, a_name, params, update_type): + update = req.updates.add() + update.type = update_type + ap_member = update.entity.action_profile_member + ap_member.action_profile_id = self.get_ap_id(ap_name) + ap_member.member_id = mbr_id + self.set_action(ap_member.action, a_name, params) + + def push_update_add_member(self, req, ap_name, mbr_id, a_name, params): + self._push_update_member( + req, ap_name, mbr_id, a_name, params, p4runtime_pb2.Update.INSERT + ) + + def push_update_modify_member(self, req, ap_name, mbr_id, a_name, params): + self._push_update_member( + req, ap_name, mbr_id, a_name, params, p4runtime_pb2.Update.MODIFY + ) + + def push_update_modify_group(self, req, ap_name, grp_id, grp_size, mbr_ids): + update = req.updates.add() + update.type = p4runtime_pb2.Update.MODIFY + ap_group = update.entity.action_profile_group + ap_group.action_profile_id = self.get_ap_id(ap_name) + ap_group.group_id = grp_id + for mbr_id in mbr_ids: + member = ap_group.members.add() + member.member_id = mbr_id + member.weight = 1 + ap_group.max_size = grp_size + + def push_update_add_group(self, req, ap_name, grp_id, grp_size=32, mbr_ids=()): + update = req.updates.add() + update.type = p4runtime_pb2.Update.INSERT + ap_group = update.entity.action_profile_group + ap_group.action_profile_id = self.get_ap_id(ap_name) + ap_group.group_id = grp_id + ap_group.max_size = grp_size + for mbr_id in mbr_ids: + member = ap_group.members.add() + member.member_id = mbr_id + member.weight = 1 + + def push_update_set_group_membership(self, req, ap_name, grp_id, mbr_ids=()): + update = req.updates.add() + update.type = p4runtime_pb2.Update.MODIFY + ap_group = update.entity.action_profile_group + ap_group.action_profile_id = self.get_ap_id(ap_name) + ap_group.group_id = grp_id + for mbr_id in mbr_ids: + member = ap_group.members.add() + member.member_id = mbr_id + + def push_update_add_entry_to_action( + self, req, t_name, mk, a_name, params, priority=0 + ): + update = req.updates.add() + table_entry = update.entity.table_entry + table_entry.table_id = self.get_table_id(t_name) + table_entry.priority = priority + if mk is None or len(mk) == 0: + table_entry.is_default_action = True + update.type = p4runtime_pb2.Update.MODIFY + else: + update.type = p4runtime_pb2.Update.INSERT + self.set_match_key(table_entry, t_name, mk) + self.set_action_entry(table_entry, a_name, params) + + def push_update_add_entry_to_member(self, req, t_name, mk, mbr_id): + update = req.updates.add() + update.type = p4runtime_pb2.Update.INSERT + table_entry = update.entity.table_entry + table_entry.table_id = self.get_table_id(t_name) + self.set_match_key(table_entry, t_name, mk) + table_entry.action.action_profile_member_id = mbr_id + + def push_update_add_entry_to_group(self, req, t_name, mk, grp_id): + update = req.updates.add() + update.type = p4runtime_pb2.Update.INSERT + table_entry = update.entity.table_entry + table_entry.table_id = self.get_table_id(t_name) + self.set_match_key(table_entry, t_name, mk) + table_entry.action.action_profile_group_id = grp_id + + def send_request_add_member(self, ap_name, mbr_id, a_name, params): + req = self.get_new_write_request() + self.push_update_add_member(req, ap_name, mbr_id, a_name, params) + return req, self.write_request(req) + + def send_request_modify_member(self, ap_name, mbr_id, a_name, params): + req = self.get_new_write_request() + self.push_update_modify_member(req, ap_name, mbr_id, a_name, params) + return req, self.write_request(req, store=False) + + def send_request_modify_group(self, ap_name, grp_id, grp_size=32, mbr_ids=()): + req = self.get_new_write_request() + self.push_update_modify_group(req, ap_name, grp_id, grp_size, mbr_ids) + return req, self.write_request(req, store=False) + + def send_request_add_group(self, ap_name, grp_id, grp_size=32, mbr_ids=()): + req = self.get_new_write_request() + self.push_update_add_group(req, ap_name, grp_id, grp_size, mbr_ids) + return req, self.write_request(req) + + def send_request_set_group_membership(self, ap_name, grp_id, mbr_ids=()): + req = self.get_new_write_request() + self.push_update_set_group_membership(req, ap_name, grp_id, mbr_ids) + return req, self.write_request(req, store=False) + + def send_request_add_entry_to_action(self, t_name, mk, a_name, params, priority=0): + req = self.get_new_write_request() + self.push_update_add_entry_to_action(req, t_name, mk, a_name, params, priority) + return req, self.write_request(req) + + def send_request_add_entry_to_member(self, t_name, mk, mbr_id): + req = self.get_new_write_request() + self.push_update_add_entry_to_member(req, t_name, mk, mbr_id) + return req, self.write_request(req) + + def send_request_add_entry_to_group(self, t_name, mk, grp_id): + req = self.get_new_write_request() + self.push_update_add_entry_to_group(req, t_name, mk, grp_id) + return req, self.write_request(req) + + def read_direct_counter(self, table_entry): + req = self.get_new_read_request() + entity = req.entities.add() + direct_counter_entry = entity.direct_counter_entry + direct_counter_entry.table_entry.CopyFrom(table_entry) + + for entity in self.read_request(req): + if entity.HasField("direct_counter_entry"): + return entity.direct_counter_entry + return None + + def write_direct_counter(self, table_entry, byte_count, packet_count): + req = self.get_new_write_request() + update = req.updates.add() + update.type = p4runtime_pb2.Update.MODIFY + direct_counter_entry = update.entity.direct_counter_entry + direct_counter_entry.table_entry.CopyFrom(table_entry) + direct_counter_entry.data.byte_count = byte_count + direct_counter_entry.data.packet_count = packet_count + return req, self.write_request(req, store=False) + + def read_indirect_counter(self, c_name, c_index, typ): + # Check counter type with P4Info + counter = self.get_counter(c_name) + counter_type_unit = p4info_pb2.CounterSpec.Unit.items()[counter.spec.unit][0] + if counter_type_unit != "BOTH" and counter_type_unit != typ: + raise Exception( + "Counter " + + c_name + + " is of type " + + counter_type_unit + + ", but requested: " + + typ + ) + req = self.get_new_read_request() + entity = req.entities.add() + counter_entry = entity.counter_entry + c_id = self.get_counter_id(c_name) + counter_entry.counter_id = c_id + index = counter_entry.index + index.index = c_index + + for entity in self.read_request(req): + if entity.HasField("counter_entry"): + return entity.counter_entry + return None + + def write_indirect_counter( + self, c_name, c_index, byte_count=None, packet_count=None + ): + # Get counter type with P4Info + counter = self.get_counter(c_name) + counter_type_unit = p4info_pb2.CounterSpec.Unit.items()[counter.spec.unit][0] + + req = self.get_new_write_request() + update = req.updates.add() + update.type = p4runtime_pb2.Update.MODIFY + counter_entry = update.entity.counter_entry + + c_id = self.get_counter_id(c_name) + counter_entry.counter_id = c_id + index = counter_entry.index + index.index = c_index + + counter_data = counter_entry.data + + if counter_type_unit == "BOTH" or counter_type_unit == "BYTES": + if byte_count is None: + raise Exception( + "Counter " + + c_name + + " is of type " + + counter_type_unit + + ", byte_count cannot be None" + ) + counter_data.byte_count = byte_count + if counter_type_unit == "BOTH" or counter_type_unit == "PACKETS": + if packet_count is None: + raise Exception( + "Counter " + + c_name + + " is of type " + + counter_type_unit + + ", packet_count cannot be None" + ) + counter_data.packet_count = packet_count + return req, self.write_request(req, store=False) + + def read_table_entry(self, t_name, mk, priority=0): + req = self.get_new_read_request() + entity = req.entities.add() + table_entry = entity.table_entry + table_entry.table_id = self.get_table_id(t_name) + table_entry.priority = priority + if mk is None or len(mk) == 0: + table_entry.is_default_action = True + else: + self.set_match_key(table_entry, t_name, mk) + + for entity in self.read_request(req): + if entity.HasField("table_entry"): + return entity.table_entry + return None + + def read_action_profile_member(self, ap_name, mbr_id): + req = self.get_new_read_request() + entity = req.entities.add() + action_profile_member = entity.action_profile_member + action_profile_member.action_profile_id = self.get_ap_id(ap_name) + action_profile_member.member_id = mbr_id + + for entity in self.read_request(req): + if entity.HasField("action_profile_member"): + return entity.action_profile_member + return None + + def read_action_profile_group(self, ap_name, grp_id): + req = self.get_new_read_request() + entity = req.entities.add() + action_profile_member = entity.action_profile_group + action_profile_member.action_profile_id = self.get_ap_id(ap_name) + action_profile_member.group_id = grp_id + + for entity in self.read_request(req): + if entity.HasField("action_profile_group"): + return entity.action_profile_group + return None + + def is_default_action_update(self, update): + return ( + update.type == p4runtime_pb2.Update.MODIFY + and update.entity.WhichOneof("entity") == "table_entry" + and update.entity.table_entry.is_default_action + ) + + def add_clone_group(self, clone_id, ports): + req = self.get_new_write_request() + update = req.updates.add() + update.type = p4runtime_pb2.Update.INSERT + pre_entry = update.entity.packet_replication_engine_entry + clone_entry = pre_entry.clone_session_entry + clone_entry.session_id = clone_id + clone_entry.class_of_service = 0 + clone_entry.packet_length_bytes = 0 + for port in ports: + replica = clone_entry.replicas.add() + replica.egress_port = port + replica.instance = 0 # set to 0 because we don't support it yet. + return req, self.write_request(req) diff --git a/trex-scripts/lib/utils.py b/trex-scripts/lib/utils.py index 0d81f89..9df3033 100644 --- a/trex-scripts/lib/utils.py +++ b/trex-scripts/lib/utils.py @@ -117,3 +117,16 @@ def __call__(self, parser, namespace, value, option_string=None): key = value.split("=")[0] val = value.split("=")[1] namespace.test_args[key] = val + + +# Convert integer (with length) to binary byte string +def stringify(n, length): + return n.to_bytes(length, byteorder="big") + + +def ipv4_to_binary(addr): + return socket.inet_aton(addr) + + +def mac_to_binary(addr): + return bytes.fromhex(addr.replace(":", "")) diff --git a/trex-scripts/tests/int_single_flow.py b/trex-scripts/tests/int_single_flow.py index 099d55a..10dfb0b 100644 --- a/trex-scripts/tests/int_single_flow.py +++ b/trex-scripts/tests/int_single_flow.py @@ -6,6 +6,7 @@ from datetime import datetime from lib.base_test import StatelessTest +from lib.fabric_tna import * from lib.gtpu import GTPU from lib.utils import list_port_status from lib.xnt import analysis_report_pcap @@ -14,17 +15,28 @@ SOURCE_MAC = "00:00:00:00:00:01" DEST_MAC = "00:00:00:00:00:03" +COL_MAC = "00:00:00:00:00:04" +COL_IP = "192.168.40.1" +SWITCH_MAC = "c0:ff:ee:c0:ff:ee" SOURCE_IP = "192.168.10.1" DEST_IP = "192.168.30.1" +SWITCH_IP = "192.168.40.254" INNER_SRC_IP = "10.240.0.1" INNER_DEST_IP = "8.8.8.8" +IP_PREFIX = 32 SENDER_PORTS = [0] INT_COLLECTPR_PORTS = [3] +SWITCH_PORTS = [272, 280, 256, 264] # 29, 30, 31, 32 +DEFAULT_VLAN = 10 +SWITCH_ID = 1 +INT_REPORT_MIRROR_IDS = [300, 301, 302, 303] +RECIRC_PORTS = [68, 196, 324, 452] -class IntSingleFlow(StatelessTest): +class IntSingleFlow(StatelessTest, FabricTnaTest): @classmethod def setup_subparser(cls, parser: ArgumentParser) -> None: + FabricTnaTest.setup_subparser(parser) parser.add_argument("--duration", type=int, help="Test duration", default=5) parser.add_argument( "--mult", type=str, help="Traffic multiplier", default="1pps" @@ -33,10 +45,15 @@ def setup_subparser(cls, parser: ArgumentParser) -> None: def get_sample_packet(self, pkt_type): if pkt_type == "tcp": - return Ether() / IP(src=SOURCE_IP, dst=DEST_IP) / TCP() / ("*" * 1500) + return ( + Ether(src=SOURCE_MAC, dst=SWITCH_MAC) + / IP(src=SOURCE_IP, dst=DEST_IP) + / TCP() + / ("*" * 1500) + ) elif pkt_type == "gtpu-udp": return ( - Ether() + Ether(src=SOURCE_MAC, dst=SWITCH_MAC) / IP(src=SOURCE_IP, dst=DEST_IP) / UDP() / GTPU() @@ -45,9 +62,51 @@ def get_sample_packet(self, pkt_type): / ("*" * 1500) ) else: - return Ether() / IP(src=SOURCE_IP, dst=DEST_IP) / UDP() / ("*" * 1500) + # UDP + return ( + Ether(src=SOURCE_MAC, dst=SWITCH_MAC) + / IP(src=SOURCE_IP, dst=DEST_IP) + / UDP() + / ("*" * 1500) + ) + + def set_up_flows(self) -> None: + # Filtering rules + for i in range(0, 4): + self.set_up_port(SWITCH_PORTS[i], DEFAULT_VLAN) + self.set_forwarding_type( + SWITCH_PORTS[i], + SWITCH_MAC, + ethertype=ETH_TYPE_IPV4, + fwd_type=FORWARDING_TYPE_UNICAST_IPV4, + ) + # Forwarding rules + self.add_forwarding_routing_v4_entry(DEST_IP, IP_PREFIX, 100) + self.add_forwarding_routing_v4_entry(COL_IP, IP_PREFIX, 101) + + # Next rules + # Send to the dest host + self.add_next_routing(100, SWITCH_PORTS[1], SWITCH_MAC, DEST_MAC) + # Send to the collector + self.add_next_routing(101, SWITCH_PORTS[3], SWITCH_MAC, COL_MAC) + self.add_next_vlan(100, DEFAULT_VLAN) + self.add_next_vlan(101, DEFAULT_VLAN) + # INT rules + self.set_up_watchlist_flow(SOURCE_IP, DEST_IP) + self.set_up_int_mirror_flow(SWITCH_ID) + self.set_up_report_flow(SWITCH_MAC, COL_MAC, SWITCH_IP, COL_IP, SWITCH_PORTS[3]) + + for i in range(0, 4): + self.set_up_report_mirror_flow(INT_REPORT_MIRROR_IDS[i], RECIRC_PORTS[i]) def start(self, args) -> None: + if args.set_up_flows: + self.connect( + grpc_addr=args.switch_addr, + p4info=args.p4info, + pipeline_config=args.pipeline_config, + ) + self.set_up_flows() pkt = self.get_sample_packet(args.pkt_type) if not pkt: return 1 @@ -82,3 +141,6 @@ def start(self, args) -> None: self.client.stop_capture(capture["id"], output) analysis_report_pcap(output) list_port_status(self.client.get_stats()) + + def stop(self): + self.disconnect() diff --git a/trex-scripts/tests/remote_pcap.py b/trex-scripts/tests/remote_pcap.py index 54b5752..ebdcd20 100644 --- a/trex-scripts/tests/remote_pcap.py +++ b/trex-scripts/tests/remote_pcap.py @@ -7,14 +7,27 @@ from datetime import datetime from lib.base_test import StatelessTest +from lib.fabric_tna import * from lib.utils import list_port_status from lib.xnt import analysis_report_pcap +UPSTREAM_ROUTER_MAC = "00:00:00:00:00:03" +COLLECTOR_MAC = "00:00:00:00:00:04" +COLLECTOR_IP = "192.168.40.1" +SWITCH_MAC = "c0:ff:ee:c0:ff:ee" +SOURCE_IP = "192.168.10.1" +SWITCH_IP = "192.168.40.254" + SENDER_PORTS = [0] INT_COLLECTPR_PORTS = [3] +SWITCH_PORTS = [272, 280, 256, 264] # 29, 30, 31, 32 +DEFAULT_VLAN = 10 +SWITCH_ID = 1 +INT_REPORT_MIRROR_IDS = [300, 301, 302, 303] +RECIRC_PORTS = [68, 196, 324, 452] -class RemotePcap(StatelessTest): +class RemotePcap(StatelessTest, FabricTnaTest): # setup_subparser is an optional class method # You can implement this method if you want to add additional command line @@ -23,6 +36,7 @@ class RemotePcap(StatelessTest): # "args" argument. @classmethod def setup_subparser(cls, parser: ArgumentParser) -> None: + FabricTnaTest.setup_subparser(parser) parser.add_argument( "--remote-pcap-file-dir", type=str, @@ -51,8 +65,59 @@ def setup_subparser(cls, parser: ArgumentParser) -> None: + "analysis the accuracy score", ) + def set_up_flows(self) -> None: + # Filtering rules + for i in range(0, 4): + self.set_up_port(SWITCH_PORTS[i], DEFAULT_VLAN) + self.set_forwarding_type( + SWITCH_PORTS[i], + SWITCH_MAC, + ethertype=ETH_TYPE_IPV4, + fwd_type=FORWARDING_TYPE_UNICAST_IPV4, + ) + # Forwarding rules + self.add_forwarding_routing_v4_entry("0.0.0.0", 1, 100) + self.add_forwarding_routing_v4_entry("128.0.0.0", 1, 100) + self.add_forwarding_routing_v4_entry(COLLECTOR_IP, 32, 101) + + # Next rules + # Send to the upstream router + self.add_next_routing(100, SWITCH_PORTS[2], SWITCH_MAC, UPSTREAM_ROUTER_MAC) + # Send to the collector + self.add_next_routing(101, SWITCH_PORTS[3], SWITCH_MAC, COLLECTOR_MAC) + self.add_next_vlan(100, DEFAULT_VLAN) + self.add_next_vlan(101, DEFAULT_VLAN) + # INT rules + self.set_up_watchlist_flow( + ipv4_src="0.0.0.0", + ipv4_src_mask="128.0.0.0", + ipv4_dst="0.0.0.0", + ipv4_dst_mask="128.0.0.0", + ) + self.set_up_watchlist_flow( + ipv4_src="128.0.0.0", + ipv4_src_mask="128.0.0.0", + ipv4_dst="128.0.0.0", + ipv4_dst_mask="128.0.0.0", + ) + self.set_up_int_mirror_flow(SWITCH_ID) + self.set_up_report_flow( + SWITCH_MAC, COLLECTOR_MAC, SWITCH_IP, COLLECTOR_IP, SWITCH_PORTS[3] + ) + + for i in range(0, 4): + self.set_up_report_mirror_flow(INT_REPORT_MIRROR_IDS[i], RECIRC_PORTS[i]) + # The entrypoint of a test def start(self, args: dict) -> None: + if args.set_up_flows: + self.connect( + grpc_addr=args.switch_addr, + p4info=args.p4info, + pipeline_config=args.pipeline_config, + ) + self.set_up_flows() + logging.info( "Start capturing first %s RX packet from INT collector", args.capture_limit ) @@ -94,3 +159,6 @@ def start(self, args: dict) -> None: logging.info("INT report pcap file stored in {}".format(output)) logging.info("Analyzing report pcap file...") analysis_report_pcap(output, args.total_flows) + + def stop(self): + self.disconnect()