Skip to content

Commit

Permalink
Handle NetlinkDumpInterrupted, fix worker metrics going stale after e…
Browse files Browse the repository at this point in the history
…xceptions
  • Loading branch information
DasSkelett committed Mar 28, 2024
1 parent d06f8ea commit dbb77e1
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
17 changes: 12 additions & 5 deletions wgkex/worker/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
from wgkex.worker.msg_queue import q
from wgkex.worker.netlink import (
get_device_data,
link_handler,
get_connected_peers_count,
WireGuardClient,
)

_HOSTNAME = socket.gethostname()
Expand Down Expand Up @@ -206,9 +204,15 @@ def publish_metrics_loop(
topic = TOPIC_CONNECTED_PEERS.format(domain=domain, worker=_HOSTNAME)

while not exit_event.is_set():
publish_metrics(client, topic, domain)
# This drifts slightly over time, doesn't matter for us
exit_event.wait(_METRICS_SEND_INTERVAL)
try:
publish_metrics(client, topic, domain)
except Exception as e:
# Don't crash the thread when an exception is encountered
logger.error(f"Exception during publish metrics task for {domain}:")
logger.error(e)
finally:
# This drifts slightly over time, doesn't matter for us
exit_event.wait(_METRICS_SEND_INTERVAL)

# Set peers metric to -1 to mark worker as offline
# Use QoS 1 (at least once) to make sure the broker notices
Expand All @@ -228,6 +232,9 @@ def publish_metrics(client: mqtt.Client, topic: str, domain: str) -> None:
)
return
peer_count = get_connected_peers_count(iface)
if peer_count < 0:
# get_connected_peers_count() encountered some error, don't update metrics
return

# Publish metrics, retain it at MQTT broker so restarted wgkex broker has metrics right away
client.publish(topic, peer_count, retain=True)
Expand Down
18 changes: 14 additions & 4 deletions wgkex/worker/netlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
from textwrap import wrap
from typing import Any, Dict, List, Tuple

import pyroute2
import pyroute2.netlink
import pyroute2, pyroute2.netlink, pyroute2.netlink.exceptions

from wgkex.common.utils import mac2eui64
from wgkex.common import logger
Expand Down Expand Up @@ -218,12 +217,23 @@ def get_connected_peers_count(wg_interface: str) -> int:
wg_interface: The WireGuard interface to query.
Returns:
# The number of peers which have recently seen a handshake.
The number of peers which have recently seen a handshake.
-1 if an internal error has been encountered.
"""
three_mins_ago_in_secs = int((datetime.now() - timedelta(minutes=3)).timestamp())
logger.info("Counting connected wireguard peers for interface %s.", wg_interface)
with pyroute2.WireGuard() as wg:
msgs = wg.info(wg_interface)
try:
msgs = wg.info(wg_interface)
except pyroute2.netlink.exceptions.NetlinkDumpInterrupted:
# Normal behaviour, data has changed while it was being returned by netlink.
# Retry once, otherwise return -1 and let the caller handle it.
# See https://github.com/svinota/pyroute2/issues/874
try:
msgs = wg.info(wg_interface)
except pyroute2.netlink.exceptions.NetlinkDumpInterrupted:
return -1

logger.debug("Got infos for connected peers: %s.", msgs)
count = 0
for msg in msgs:
Expand Down

0 comments on commit dbb77e1

Please sign in to comment.