-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathaqm-alerts
executable file
·284 lines (244 loc) · 12.2 KB
/
aqm-alerts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
#!/usr/bin/env python
import datetime as dt, contextlib as cl, collections as cs, functools as ft
import os, sys, re, time, socket, struct, logging, json, base64, signal
import zmq # pyzmq
err_fmt = lambda err: f'[{err.__class__.__name__}] {err}'
_tds = dict(
y=365.2422 * 86400, yr=365.2422 * 86400, year=365.2422 * 86400,
mo=30.5 * 86400, month=30.5 * 86400,
w=7 * 86400, wk=7 * 86400, week=7 * 86400, d=1 * 86400, day=1 * 86400,
h=3600, hr=3600, hour=3600,
m=60, min=60, minute=60, s=1, sec=1, second=1 )
_td_re = re.compile('(?i)^[-+]?' + ''.join( fr'(?P<{k}>\d+{k}s?\s*)?' for k, v in
sorted(_tds.items(), key=lambda kv: (kv[1], len(kv[0])), reverse=True) ) + '$')
def td_parse_sec(td):
'Parse [[HH:]MM:]SS or e.g. 5h30m and such time-deltas to seconds'
if isinstance(td, (int, float)): return td
if not ((m := _td_re.search(td.strip())) and any(m.groups())):
h, m, s = (v.strip(': ') for v in ('::' + td).rsplit(':', 2))
return float(s or 0) + float(m or 0) * 60 + float(h or 0) * 3600
delta = 0
for k, v in _tds.items():
if not m.group(k): continue
delta += v * int(''.join(filter(str.isdigit, m.group(k))) or 1)
return delta
def rate_limit_iter(spec):
'''This is not token-bucket, i.e. tokens don't pile-up in-between checks.
spec = N / M[smhd], e.g. 10 / 15m(inutes). while next(err_rate_limit): try: ...'''
burst, span = map(str.strip, spec.split('/', 1))
span = td_parse_sec(span)
n, tokens = 0, [-span] * (burst := int(burst) + 1)
while True:
ts = tokens[n] = time.monotonic()
yield ts - tokens[n := (n + 1) % burst] > span
def pkt_enc(s):
s = base64.urlsafe_b64encode(s).decode()
if len(s) > 80: s = s[:72] + f' ...[{len(s):,d}B]'
return s
def pkt_crc16(s, crc=0):
# CRC-16F/5 CRC-16-OpenSafety-A {241,241,241,35,10,8,3}
# See https://users.ece.cmu.edu/~koopman/crc/crc16.html
for c in s:
m = 0x100
while m := m >> 1:
bit = bool(crc & 0x8000) ^ bool(c & m)
crc <<= 1
if bit: crc ^= 0x5935
crc &= 0xffff
return crc & 0xffff
# pm10, pm25, pm40, pm100, rh, t, voc, nox = data
pkt_sample = ( lambda rx,
_ks=(10, 10, 10, 10, 100, 200, 10, 10),
_nx=(0xffff, 0xffff, 0xffff, 0xffff, 0x7fff, 0x7fff, 0x7fff, 0x7fff):
tuple( (v / k if v != nx else None) for v, k, nx in
zip(struct.unpack('>HHHHhhhh', rx), _ks, _nx) ) )
pkt_fake_src = '[src-ip-addr]'
def run_proxy_loop( sock, zmq_send, td_snooze,
rate_threshold=None, rate_threshold_addrs=32, td_exit=0, pkt_queue=None ):
log = logging.getLogger('net')
sample_keys = 'PM1 PM2.5 PM4 PM10 RH% T°C VOC NOx'.split()
sample_keys_alert = ['pm']*4 + 'rh t voc nox'.split()
sample_keys_fmt = ['{:.1f}']*4 + ['{:.0f}', '{:.1f}', '{:.0f}', '{:.0f}']
signal.signal(signal.SIGALRM, lambda sig, frm: sys.exit(0))
if rate_threshold:
rate_threshold = ft.lru_cache(maxsize=rate_threshold_addrs)(
lambda addr,_spec=rate_threshold: rate_limit_iter(_spec) )
while True:
if td_exit > 0: signal.alarm(td_exit)
if pkt_queue: pkt, addr = pkt_queue.pop()
else: pkt, addr = sock.recvfrom(2048)
log.debug(f'Packet from {addr} [ {len(pkt):,d} B]')
try:
if pkt_crc16(s := pkt[:-2]) != int.from_bytes(pkt[-2:], 'big'):
log.debug(' - dropped: crc mismatch :: %s', pkt_enc(pkt))
continue
sample, errs = pkt_sample(s[:16]), s[16:].decode()
except Exception as err:
log.debug(' - dropped: format mismatch - %s :: %s', err_fmt(err), pkt_enc(pkt))
continue
if rate_threshold and next(rate_threshold(addr[0])):
log.debug(' - recv-count-rate skipped [ %s ]: %s', errs, sample)
continue
else: log.debug(' - parsed [ %s ]: %s', errs, sample)
summary = f'AQM Threshold Alert [ <b>{errs.upper()}</b> ]'
errs_set, vals, body = errs.split(), cs.defaultdict(list), [
f'Over-threshold parameter(s): <b>{errs.upper()}</b>',
f'Date/time [local]: <tt>{dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</tt>',
f'Device address: {addr[0]}:{addr[1]}', 'Last sample:' ]
for k, k_err, fmt, v in zip(sample_keys, sample_keys_alert, sample_keys_fmt, sample):
if v is None: continue
v = f'{k}=[ {fmt.format(v)} ]'
if k_err == 'pm': vals[k_err].append(v)
else:
if k_err in errs_set: v = f'<b>{v}</b>'
vals['other'].append(v)
if v := ', '.join(vals['pm']):
if 'pm' in errs_set: v = f'<b>{v}</b>'
body.append(f' {v}')
if v := vals['other']: body.append(' ' + ', '.join(v))
body = '\n'.join(body)
log.debug(' - dispatching notification')
zmq_send(summary, body)
pkt = struct.pack('>d', td_snooze) + errs.encode()
pkt += pkt_crc16(pkt).to_bytes(2, 'big')
log.debug( ' - snooze-packet back to %s:%s'
f' [for {td_snooze:,.0f}s, {len(pkt):,d} B]', addr[0], addr[1] )
if addr[0] != pkt_fake_src:
try: sock.sendto(pkt, addr)
except OSError as err:
log.error( 'Failed to send reply packet to'
' %s:%s - %s', addr[0], addr[1], err_fmt(err) )
def main(args=None):
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
usage='%(prog)s -u/--udp [listen-ip:]port -z/--zmq-sock dst-ip:port [opts]',
formatter_class=argparse.RawTextHelpFormatter, description=dd('''
Receiver for UDP notifications from AQM device
to send those to some notification-thing daemon over zmq pub/sub sockets.
AQM project: https://github.com/mk-fg/rp2040-sen5x-air-quality-webui-monitor
notification-thing daemon: https://github.com/mk-fg/notification-thing'''))
group = parser.add_argument_group('Alert receive/filter/snooze UDP-source options')
group.add_argument('-u', '--udp',
metavar='[ip:]port', help=dd('''
UDP socket to bind/listen on for incoming notifications.
Required, unless socket is received from systemd via LISTEN_FDS mechanism.'''))
group.add_argument('-s', '--udp-snooze',
metavar='duration', default='1h', help=dd('''
Snooze-interval to send in response to alerts,
suppressing repeated ones with same thresholds for that duration.
Can be a human duration spec like "30s", "10min", "1h 20m", or [[HH:]MM:]SS.
Use 0 value to disable this. Default: %(default)s'''))
group.add_argument('-c', '--recv-rate-threshold', metavar='N/M[smhd]', help=dd('''
Rate-limiting filter specification (N within timespan) for alert count that must be
exceeded (for each source IP), in order for alert to be passed through as notification.
Can be used to skip one-off alerts due to e.g. someone smoking near the sensor.
Filter should be specified as "N / M[smhd]", for example
"3 / 10m" to only react to >3 alert packets within last 10 minutes.
Disabled by default - every received alert will generate notification.'''))
group = parser.add_argument_group('ZMQ destination')
group.add_argument('-z', '--zmq-sock',
action='append', metavar='host:port', help=dd('''
Notification-thing (desktop notification daemon) zmq sub socket address -
can be either host:port (assumed to be tcp socket, e.g. 1.2.3.4:5678),
or a full zmq url (e.g. tcp://1.2.3.4:5678).
Can be specified multiple times to deliver message to more than one peer.
Must be specified at least once, unless running in some kind of test-mode.'''))
group.add_argument('-w', '--wait-connect',
type=float, metavar='seconds', default=0.5, help=dd('''
Timeout to wait for peer connections to
establish and unsent messages to linger. Default: %(default)ss'''))
group = parser.add_argument_group('Notification object attributes')
group.add_argument('-n', '--hostname',
metavar='name', default=os.uname()[1],
help='Source name to use for dispatched message. Default: local hostname [%(default)s]')
group.add_argument('-U', '--urgency', metavar='low/normal/critical', help=dd('''
Urgency hint to use for the message - can be either integer
in 0-2 range (level id) or symbolic level name - low/normal/critical.'''))
group.add_argument('-t', '--expire-time', metavar='duration',
help='Timeout (same format as -s/--udp-snooze) to hide generated notification bubbles.')
group.add_argument('-a', '--app-name', metavar='name', default='notify-net',
help='App name for the icon. Default: %(default)s')
group.add_argument('-i', '--icon', action='append', metavar='icon', default=list(),
help='Icon name, path or alias. Can be specified multiple times (for fallback icon names).')
group = parser.add_argument_group('Misc daemon settings')
group.add_argument('-x', '--exit-inactivity-timeout', metavar='duration', default=0, help=dd('''
Exit after specified timeout of inactivity (same format as -s/--udp-snooze).
Useful when starting the script from systemd socket unit file.
Should be higher than -c/--recv-rate-threshold timespan (if any) to track it correctly.'''))
group.add_argument('-T', '--test',
action='store_true', help='Dispatch fake test-alert notification on startup.')
group.add_argument('-X', '--test-send', metavar='host:port',
help='Generate/send single alert packet to specified UDP socket and exit.')
group.add_argument('-d', '--debug', action='store_true', help='Verbose operation mode.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
logging.basicConfig(level=logging.DEBUG if opts.debug else logging.WARNING)
log = logging.getLogger('main')
note_opts = dict()
if opts.urgency:
try: urgency = int(opts.urgency)
except ValueError:
try: urgency = ['low', 'normal', 'critical'].index(opts.urgency.lower())
except IndexError:
parser.error(f'Unrecognized urgency level name: {opts.urgency}')
else:
if not 0 <= urgency <= 2:
parser.error(f'Urgency level id must be in 0-2 range: {opts.urgency}')
note_opts['urgency'] = urgency
note_opts = dict( hints=note_opts,
app_name=opts.app_name, icon=','.join(opts.icon or list()) )
if opts.expire_time: note_opts['timeout'] = td_parse_sec(opts.expire_time) * 1000
td_snooze = td_parse_sec(opts.udp_snooze)
td_exit = td_parse_sec(opts.exit_inactivity_timeout)
pkt_test = list()
if opts.test or opts.test_send: # generate test-packet
pkt = struct.pack( '>HHHHhhhh',
*map(int, '23 28 31 32 3604 4643 2500 32767'.split()) )
pkt += b'pm voc'
pkt += pkt_crc16(pkt).to_bytes(2, 'big')
if not opts.test_send: pkt_test.append((pkt, (pkt_fake_src, 1234)))
else:
host, port = opts.test_send.split(':')
dst = socket.getaddrinfo( host, int(port),
proto=socket.IPPROTO_UDP, type=socket.SOCK_DGRAM )[0][-1]
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.sendto(pkt, dst)
return log.debug('Dispatched %sB packet to address: %s', len(pkt), dst)
if not opts.zmq_sock: parser.error('At least one -z/--zmq-sock option must be specified')
zmq_ctx, zmq_addr = zmq.Context(), lambda addr: (
f'tcp://{addr}' if not re.search(r'^\w+://', addr) else addr )
sd_pid, sd_fds = (int(os.environ.get(f'LISTEN_{k}', 0)) for k in ['PID', 'FDS'])
if sd_pid == os.getpid() and sd_fds:
if sd_fds > 1:
return log.error( 'More than one socket passed from'
' systemd, exiting (pid=%s fds=%s)', sd_pid, sd_fds ) or 1
log.debug('Using UDP socket received from systemd')
sock_opts = dict(fileno=3)
elif not opts.udp:
parser.error('-u/--udp option is required if socket is not provided by systemd')
else:
bind, _, port = opts.udp.rpartition(':')
bind = socket.getaddrinfo( bind or '0.0.0.0', int(port),
proto=socket.IPPROTO_UDP, type=socket.SOCK_DGRAM )[0][-1]
sock_opts = dict(family=socket.AF_INET, type=socket.SOCK_DGRAM)
with socket.socket(**sock_opts) as sock, \
cl.closing(zmq_ctx.socket(zmq.PUB)) as pub:
if 'fileno' not in sock_opts:
log.debug('Binding listening UDP socket to %s:%s', *bind[:2])
sock.bind(bind)
if hasattr(zmq, 'IPV4ONLY'): pub.setsockopt(zmq.IPV4ONLY, False)
pub.setsockopt(zmq.RECONNECT_IVL_MAX, int(300 * 1000))
pub.setsockopt(zmq.LINGER, int(opts.wait_connect * 1000))
pub.setsockopt(zmq.SNDHWM, 50)
def zmq_send(summary, body):
msg = dict(note_opts, summary=summary, body=body)
msg = '\1' + json.dumps([opts.hostname, time.time(), msg])
try: pub.send_string(msg, zmq.DONTWAIT)
except zmq.ZMQError as err:
if err.errno != zmq.EAGAIN: raise
log.debug('Connecting to %s ZMQ peer(s)', len(opts.zmq_sock))
for addr in opts.zmq_sock: pub.connect(zmq_addr(addr))
time.sleep(opts.wait_connect)
run_proxy_loop( sock, zmq_send, td_snooze,
rate_threshold=opts.recv_rate_threshold, td_exit=round(td_exit), pkt_queue=pkt_test )
if __name__ == '__main__': sys.exit(main())