From dbb77e18de5a918dd2cd50cd0c47dc152e75a5fb Mon Sep 17 00:00:00 2001 From: DasSkelett Date: Thu, 28 Mar 2024 22:18:48 +0000 Subject: [PATCH] Handle NetlinkDumpInterrupted, fix worker metrics going stale after exceptions --- wgkex/worker/mqtt.py | 17 ++++++++++++----- wgkex/worker/netlink.py | 18 ++++++++++++++---- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/wgkex/worker/mqtt.py b/wgkex/worker/mqtt.py index d5941cd..2f4854d 100644 --- a/wgkex/worker/mqtt.py +++ b/wgkex/worker/mqtt.py @@ -20,9 +20,7 @@ from wgkex.worker.msg_queue import q from wgkex.worker.netlink import ( get_device_data, - link_handler, get_connected_peers_count, - WireGuardClient, ) _HOSTNAME = socket.gethostname() @@ -206,9 +204,15 @@ def publish_metrics_loop( topic = TOPIC_CONNECTED_PEERS.format(domain=domain, worker=_HOSTNAME) while not exit_event.is_set(): - publish_metrics(client, topic, domain) - # This drifts slightly over time, doesn't matter for us - exit_event.wait(_METRICS_SEND_INTERVAL) + try: + publish_metrics(client, topic, domain) + except Exception as e: + # Don't crash the thread when an exception is encountered + logger.error(f"Exception during publish metrics task for {domain}:") + logger.error(e) + finally: + # This drifts slightly over time, doesn't matter for us + exit_event.wait(_METRICS_SEND_INTERVAL) # Set peers metric to -1 to mark worker as offline # Use QoS 1 (at least once) to make sure the broker notices @@ -228,6 +232,9 @@ def publish_metrics(client: mqtt.Client, topic: str, domain: str) -> None: ) return peer_count = get_connected_peers_count(iface) + if peer_count < 0: + # get_connected_peers_count() encountered some error, don't update metrics + return # Publish metrics, retain it at MQTT broker so restarted wgkex broker has metrics right away client.publish(topic, peer_count, retain=True) diff --git a/wgkex/worker/netlink.py b/wgkex/worker/netlink.py index 057e110..1197b5f 100644 --- a/wgkex/worker/netlink.py +++ b/wgkex/worker/netlink.py @@ -9,8 +9,7 @@ from textwrap import wrap from typing import Any, Dict, List, Tuple -import pyroute2 -import pyroute2.netlink +import pyroute2, pyroute2.netlink, pyroute2.netlink.exceptions from wgkex.common.utils import mac2eui64 from wgkex.common import logger @@ -218,12 +217,23 @@ def get_connected_peers_count(wg_interface: str) -> int: wg_interface: The WireGuard interface to query. Returns: - # The number of peers which have recently seen a handshake. + The number of peers which have recently seen a handshake. + -1 if an internal error has been encountered. """ three_mins_ago_in_secs = int((datetime.now() - timedelta(minutes=3)).timestamp()) logger.info("Counting connected wireguard peers for interface %s.", wg_interface) with pyroute2.WireGuard() as wg: - msgs = wg.info(wg_interface) + try: + msgs = wg.info(wg_interface) + except pyroute2.netlink.exceptions.NetlinkDumpInterrupted: + # Normal behaviour, data has changed while it was being returned by netlink. + # Retry once, otherwise return -1 and let the caller handle it. + # See https://github.com/svinota/pyroute2/issues/874 + try: + msgs = wg.info(wg_interface) + except pyroute2.netlink.exceptions.NetlinkDumpInterrupted: + return -1 + logger.debug("Got infos for connected peers: %s.", msgs) count = 0 for msg in msgs: