Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

qemu_guest_agent: Add new api support 'guest-network-get-route' #4130

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions qemu/tests/cfg/qemu_guest_agent.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@
image_snapshot = no
cmd_disable_network = wmic path win32_networkadapter where "NetConnectionID='%s'" call disable
cmd_enable_network = wmic path win32_networkadapter where "NetConnectionID='%s'" call enable
- check_get_network_route:
only Linux
gagent_check_type = get_network_route
- check_fsfreeze:
gagent_fs_test_cmd = "echo foo > %s/foo"
delete_temp_file = "rm -rf ${mountpoint_def}/foo"
Expand Down
137 changes: 137 additions & 0 deletions qemu/tests/qemu_guest_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,143 @@ def ip_addr_check(session, mac_addr, ret_list, if_index, if_name):
if session_serial:
session_serial.close()

@error_context.context_aware
def gagent_check_get_network_route(self, test, params, env):
"""
Execute "guest-network-get-route" command to guest agent

Steps:
1) Login to guest via serial session.
2) Get route information from guest using the qga "guest-network-get-route" command.
3) Get the same route information from within the guest.
4) Compare both sets of route data to ensure they match.

:param test: kvm test object
:param params: Dictionary with the test parameters
:param env: Dictionary with test environment.
"""

def parse_proc_net_route():
"""
Parse /proc/net/route file and return a list of dictionaries
with route information.
"""
route_info_list = []
with open("/proc/net/route") as f:
next(f) # Skip header
for line in f:
fields = line.strip().split()
route_info = {
"iface": fields[0],
"destination": convert_hex_to_ip(fields[1]),
"gateway": convert_hex_to_ip(fields[2]),
"flags": int(fields[3], 16),
"mask": convert_hex_to_ip(fields[7]),
"metric": int(fields[6]),
}
route_info_list.append(route_info)
return route_info_list

def convert_hex_to_ip(hex_str):
"""
Convert hex string to dotted-decimal IP address.
"""
hex_str = hex_str.zfill(8) # Ensure it's 8 characters long
return ".".join(str(int(hex_str[i:i + 2], 16)) for i in range(6, -2, -2))

def parse_proc_net_ipv6_route():
"""
Parse /proc/net/ipv6_route file and return a list of dictionaries
with IPv6 route information.
"""
ipv6_route_info_list = []
with open("/proc/net/ipv6_route") as f:
for line in f:
fields = line.strip().split()
ipv6_route_info = {
"destination": convert_hex_to_ipv6(fields[0]),
"prefix_length": int(fields[1], 16),
"gateway": convert_hex_to_ipv6(fields[4]),
"metric": int(fields[6], 16),
"iface": fields[9],
}
ipv6_route_info_list.append(ipv6_route_info)
return ipv6_route_info_list

def convert_hex_to_ipv6(hex_str):
"""
Convert hex string to standard IPv6 address format.
"""
hex_str = hex_str.zfill(32)
return ":".join(hex_str[i:i + 4] for i in range(0, 32, 4))

def parse_route_output(route_str, version):
"""
Parse the route string from the guest's 'ip route' or 'ip -6 route' output.

:param route_str: Route string from the guest.
:param version: IP version (4 or 6).
:return: A dictionary with parsed route information.
"""
route_dict = {"version": version}
# This parsing logic can be customized according to the format of 'ip route' output
tokens = route_str.split()
route_dict["destination"] = tokens[0]
if "via" in tokens:
route_dict["gateway"] = tokens[tokens.index("via") + 1]
else:
route_dict["gateway"] = "0.0.0.0" if version == 4 else "::"

# More fields can be added as needed
return route_dict

session = self._get_session(params, self.vm)
self._open_session_list.append(session)

# Step 1: Get route info from guest-agent API
error_context.context("Getting route info via guest-agent API.", LOG_JOB.info)
self.gagent.get_network_route()
qga_route_info = self.gagent.get_network_route()

# Step 2: Get route info from inside the guest
error_context.context("Getting route info from inside the guest.", LOG_JOB.info)
ipv4_routes = parse_proc_net_route()
ipv6_routes = parse_proc_net_ipv6_route()

# Combine them into one list
combined_routes = ipv4_routes + ipv6_routes
print("++++++++++++++++++++++++++++++++++++++")
print("guest_route_info is: {}".format(combined_routes))

# Step 3: Compare the route info from guest-agent and guest
error_context.context("Comparing the route info from guest-agent and guest.", LOG_JOB.info)
# if qga_route_info != combined_routes:
# test.fail("Route information mismatch:\nQGA route info: %s\nGuest route info: %s"
# % (qga_route_info, combined_routes))
for qga_route in combined_routes:
qga_destination = qga_route["destination"]
matching_guest_route = None

# Find matching destination in guest_route_info
for guest_route in combined_routes:
if guest_route["destination"] == qga_destination:
matching_guest_route = guest_route
break

if matching_guest_route:
# Compare other fields like gateway, metric, etc.
if (matching_guest_route["gateway"] != qga_route["gateway"] or
matching_guest_route["metric"] != qga_route["metric"]):
print(f"-------------------------Mismatch for destination {qga_destination}:")
print(f"-------------------------Guest gateway: {matching_guest_route['gateway']}, "
f"QGA gateway: {qga_route['gateway']}")
print(f"-------------------------Guest metric: {matching_guest_route['metric']}, "
f"QGA metric: {qga_route['metric']}")
else:
print(f"-----------------------No matching destination found in guest routes for {qga_destination}")
if session:
session.close()

@error_context.context_aware
def gagent_check_reboot_shutdown(self, test, params, env):
"""
Expand Down
Loading