-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathload-check-logger
executable file
·248 lines (221 loc) · 11.5 KB
/
load-check-logger
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
#!/usr/bin/env python
import os, sys, re, time, math, enum, heapq, signal, pathlib as pl
_td_days = dict(
y=365.2422, yr=365.2422, year=365.2422,
mo=30.5, month=30.5, w=7, week=7, d=1, day=1 )
_td_s = dict( h=3600, hr=3600, hour=3600,
m=60, min=60, minute=60, s=1, sec=1, second=1 )
_td_usort = lambda d: sorted(
d.items(), key=lambda kv: (kv[1], len(kv[0])), reverse=True )
_td_re = re.compile('(?i)^[-+]?' + ''.join( fr'(?P<{k}>\d+{k}\s*)?'
for k, v in [*_td_usort(_td_days), *_td_usort(_td_s)] ) + '$')
def td_parse(td_str):
try: return float(td_str)
except: td = 0
if (m := _td_re.search(td_str)) and any(m.groups()):
# Short time offset like "3d 5h"
for n, units in enumerate((_td_days, _td_s)):
tdx = 0
for k, v in units.items():
if not m.group(k): continue
tdx += v * int(''.join(filter(str.isdigit, m.group(k))) or 1)
td += (24*3600)**(1-n) * tdx
return td
if m := re.search(r'^\d{1,2}:\d{2}(?::\d{2}(?P<us>\.\d+)?)?$', td_str):
# [[HH:]MM:]SS where seconds can be fractional
return sum(n*float(m) for n,m in zip((3600, 60, 1), td_str.split(':')))
raise ValueError(f'Failed to parse time-delta spec: {td_str}')
def td_repr(delta, units_max=2, units_res=None, _units=dict(
h=3600, m=60, s=1, y=365.2422*86400, mo=30.5*86400, w=7*86400, d=1*86400 )):
res, s, n_last = list(), abs(delta), units_max - 1
units = sorted(_units.items(), key=lambda v: v[1], reverse=True)
for unit, unit_s in units:
val = math.floor(s / unit_s)
if not val:
if units_res == unit: break
continue
if len(res) == n_last or units_res == unit:
val, n_last = round(s / unit_s), True
res.append(f'{val:.0f}{unit}')
if n_last is True: break
s -= val * unit_s
return ' '.join(res) if res else '<1s'
def token_bucket(spec, negative_tokens=False):
'''Spec: { td: float | float_a/float_b }[:burst_float]
Examples: 1/4:5 (td=0.25s, rate=4/s, burst=5), 5, 0.5:10, 20:30.
Expects a number of tokens (can be float, def=1),
and subtracts these, borrowing below zero with negative_tokens set.
Yields either None if there's enough tokens
or delay (in seconds, float) until when there will be.
Simple "0" value means "always None", or "inf" for "infinite delays".'''
try:
try: interval, burst = spec.rsplit(':', 1)
except (ValueError, AttributeError): interval, burst = spec, 1.0
else: burst = float(burst)
if isinstance(interval, str):
try: a, b = interval.split('/', 1)
except ValueError: interval = td_parse(interval)
else: interval = td_parse(a) / float(b)
if min(interval, burst) < 0: raise ValueError
except: raise ValueError(f'Invalid format for rate-limit: {spec!r}')
if interval <= 0:
while True: yield None
elif interval == math.inf:
while True: yield 2**31 # safe "infinite delay" value
tokens, rate, ts_sync = max(0, burst - 1), interval**-1, time.monotonic()
val = (yield) or 1
while True:
ts = time.monotonic()
ts_sync, tokens = ts, min(burst, tokens + (ts - ts_sync) * rate)
val, tokens = ( (None, tokens - val) if tokens >= val else
((val - tokens) / rate, (tokens - val) if negative_tokens else tokens) )
val = (yield val) or 1
p = lambda *a,**kw: print(*a, **kw, flush=True)
p_load = lambda *a,**kw: print('LOAD ::', *a, **kw, file=sys.stderr, flush=True)
p_err = lambda *a,**kw: print('ERROR:', *a, **kw, file=sys.stderr, flush=True) or 1
err_fmt = lambda err: f'[{err.__class__.__name__}] {err}'
class adict(dict):
def __init__(self, *args, **kws):
super().__init__(*args, **kws)
self.__dict__ = self
ct = enum.Enum('CheckType', 'la nproc nft_cnt')
def run_check_la( ts, caches, t, td, cap, tbf, _p_src=pl.Path('/proc/loadavg'),
_la_fmt=lambda las: '/'.join(f'{la or 0:.2f}'.rstrip('0').rstrip('.') for la in las) ):
if (la := caches.get(_p_src)) and la[0] == ts: la = la[1]
else: caches[_p_src] = ts, (la := _p_src.read_text())
for c, v in zip(cap, la := list(float(v) for v in la.split(' ', 4)[:3])):
if v > c: break
else: return
if not tbf or not next(tbf): p_load( 'load-averages above'
f' 1/5/15 {"tbf-" if tbf else ""}caps: {_la_fmt(la)} > {_la_fmt(cap)}' )
def run_check_nproc(ts, caches, t, td, cap, tbf, _p_src=pl.Path('/proc/loadavg')):
if (la := caches.get(_p_src)) and la[0] == ts: la = la[1]
else: caches[_p_src] = ts, (la := _p_src.read_text())
if (n := int(la.split(' ', 4)[3].split('/', 1)[-1])) <= cap: return
if not tbf or not next(tbf): p_load(
f'total process-count above {"tbf-" if tbf else ""}cap: {n:,d} > {cap:,d}' )
def run_check_nft(ts, caches, t, td, counters, _cmd='nft -j list counters'.split()):
if (nft := caches.get('nft')) and nft[0] == ts: nft = nft[1]
else: caches.nft = ts, (nft := json.loads(sp.run( _cmd,
check=True, stdout=sp.PIPE ).stdout.decode())['nftables'])
cnt_last = caches.setdefault('nft_cnt', dict())
for cnt in nft:
if not ((cnt := cnt.get('counter')) and (cnt := adict(cnt))): continue
if not (checks := counters.get(cnt.name)): continue
for chk in checks:
if chk.table and chk.table != cnt.table: continue
if chk.af and chk.af != cnt.family: continue
a = cnt_last.get(chk.id); b = cnt_last[chk.id] = cnt[chk.type]
if a and (inc := b - a) > chk.cap and (not chk.tbf or not next(chk.tbf)): p_load(
f'nft counter increment within {td_repr(td)} above {"tbf-" if chk.tbf else ""}cap'
f' for [ {cnt.table} {cnt.family} {cnt.name} ]: {inc:,d} > {chk.cap:,d} {chk.type}' )
def run_checks(td_checks, iter_timeout=5):
signal.signal(signal.SIGALRM, lambda sig,frm: (
p_err(f'Check-loop iteration hang-timeout ({iter_timeout}s)'), sys.exit(1) ))
chk_funcs = { ct.la: run_check_la,
ct.nproc: run_check_nproc, ct.nft_cnt: run_check_nft }
q, ts, caches = list(), time.monotonic(), adict()
for td in td_checks: heapq.heappush(q, (ts + td, td))
while True:
ts_chk, td = q[0]
if delay := max(0, ts_chk - time.monotonic()):
p(f'--- delay until next check: {delay:.1f}'); time.sleep(delay)
heapq.heappushpop(q, ((ts := time.monotonic()) + td, td))
signal.alarm(iter_timeout)
for chk in td_checks[td]: chk_funcs[chk.t](ts, caches, **chk)
signal.alarm(0)
def main(argv=None):
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
usage='%(prog)s [opts] check-opts...',
formatter_class=argparse.RawTextHelpFormatter, description=dd('''
Check various system load values and simply log to stderr if above thresholds.
Most threshold options can be specified multiple times, to be checked separately.
All sampling intervals, or other otherwise-undocumented intervals can be specified
using short time-delta notation (e.g. "30s", "10min", "1h 20m", "1d12h5m"),
or the usual [[HH:]MM:]SS where seconds can be fractional too.'''))
group = parser.add_argument_group('Optional thresholds to check')
group.add_argument('-l', '--loadavg', action='append',
metavar='sample-interval:[max1]/[max5]/[max15][:tbf-interval[/rate][:burst]]', help=dd('''
Thresholds for any /proc/loadavg numbers, to be checked for each sample-interval.
If token-bucket parameters are specified at the end,
load warnings are only logged when that rate-limit is exceeded,
i.e. load consistently is above thresholds for some time.
"tbf-interval[/rate]" should represent an interval (in seconds or as time-delta spec),
e.g.: 1/4:5 (interval=0.25s, rate=4/s, burst=5), 5 (burst=1), 0.5:10, 1h30m/8:30, etc.'''))
group.add_argument('-p', '--nproc', action='append',
metavar='sample-interval:max-procs[:tbf-interval[/rate][:burst]]', help=dd('''
Check count of total pids in /proc/loadavg for each sample-interval.
Same optional token-bucket rate-threshold and format as in -l/--loadavg.'''))
group.add_argument('-n', '--nft-count-inc', action='append',
metavar='sample-interval:[table/family/]'
'name[=units]:max-count-inc[:tbf-interval[/rate][:burst]]', help=dd('''
Query named nftables counter increments over
specified interval and log if it's more than specified max value.
"table/family/" prefix can be something like "filter/inet/",
and "=units" suffix is either "=packets" or "=bytes" (default=packets).
Example: 10m:cnt.bots:100 (>100 new packets in cnt.bots counter in last 10m)
Same optional token-bucket rate-threshold and format as in -l/--loadavg.'''))
group = parser.add_argument_group('General options')
group.add_argument('-r', '--max-rate-tbf', metavar='interval[/rate][:burst]', help=dd('''
Max rate of emitted notifications, where any ones exceeding it will be dropped.
This is applied across all over-threshold warnings from this script,
so is a fairly crude filter, where spammy ones can easily drown-out the rest.
Token-bucket filter parameters are same as in various checks, see e.g. -l/--loadavg.'''))
group.add_argument('-d', '--debug', action='store_true', help='Verbose operation mode')
opts = parser.parse_args(argv)
if not opts.debug: global p; p = lambda *a,**kw: None
if opts.max_rate_tbf:
global p_load; p_load_tbf = token_bucket(opts.max_rate_tbf)
p_load_raw, p_load = p_load, lambda *a,**kw: next(p_load_tbf) or p_load_raw(*a,**kw)
checks = dict()
for chk in opts.loadavg or list():
try:
td, s, cap = chk.partition(':'); cap, s, tbf = cap.partition(':')
td, tbf = td_parse(td), tbf and token_bucket(tbf)
cap = list(float(n) if n else 0 for n in cap.split('/'))
except Exception as err:
parser.error(f'Failed to parse -l/--loadavg check [ {chk!r} ]: {err_fmt(err)}')
checks.setdefault(td, list()).append(adict(t=ct.la, td=td, cap=cap, tbf=tbf))
for chk in opts.nproc or list():
try:
td, s, cap = chk.partition(':'); cap, s, tbf = cap.partition(':')
td, cap, tbf = td_parse(td), int(cap), tbf and token_bucket(tbf)
except Exception as err:
parser.error(f'Failed to parse -p/--nproc check [ {chk!r} ]: {err_fmt(err)}')
checks.setdefault(td, list()).append(adict(t=ct.nproc, td=td, cap=cap, tbf=tbf))
nft_counters = set()
for chk_id, chk in enumerate(opts.nft_count_inc or list()):
try:
td, cnt, cap = chk.split(':', 2); cap, s, tbf = cap.partition(':')
table, s, cnt = cnt.rpartition('/'); cnt, s, cnt_type = cnt.partition('=')
table, af = table.split('/') if table else ('', '')
td, cap, tbf = td_parse(td), int(cap), tbf and token_bucket(tbf)
if cnt_type and cnt_type not in ('packets', 'bytes'): raise ValueError(cnt_type)
except Exception as err:
parser.error(f'Failed to parse -n/--nft-count-inc check [ {chk!r} ]: {err_fmt(err)}')
for chk in checks.setdefault(td, list()):
if chk.t is ct.nft_cnt: break
else: checks[td].append(chk := adict(t=ct.nft_cnt, td=td, counters=dict()))
chk.counters.setdefault(cnt, list()).append(adict( id=chk_id,
table=table, af=af, type=cnt_type or 'packets', cap=cap, tbf=tbf ))
nft_counters.add((table, af, cnt))
if nft_counters:
p(f'Checking that {len(nft_counters):,d} nftables counter(s) exist...')
global json, sp; import json, subprocess as sp, itertools as it
for cnt in json.loads(sp.run( 'nft -j list counters'.split(),
check=True, stdout=sp.PIPE ).stdout.decode())['nftables']:
if not ((cnt := cnt.get('counter')) and (cnt := adict(cnt))): continue
for table, af in it.product(['', cnt.table], ['', cnt.family]):
nft_counters.discard((table, af, cnt.name))
if nft_counters:
nft_counters = ' '.join(( '/'.join(cnt)
if all(cnt) else cnt[2] ) for cnt in sorted(nft_counters))
p_err(f'specified nftables counter(s) not found - {nft_counters}')
if not checks: parser.error('No thresholds/checks specified')
p(f'Monitoring {sum(len(td_chks) for td_chks in checks.values()):,d} check(s)...')
run_checks(checks)
p('Finished')
if __name__ == '__main__': sys.exit(main())