-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathznc-log-reader
executable file
·421 lines (369 loc) · 16.3 KB
/
znc-log-reader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
#!/usr/bin/env python
import subprocess as sp, contextlib as cl, datetime as dt, functools as ft
import os, sys, io, re, time, logging, fnmatch, pathlib as pl, collections as cs
def rel_path(p):
p0, p = pl.Path('.').resolve(), p.resolve()
return p.relative_to(p0) if p.is_relative_to(p0) else p
def chan_name(name):
return name.lstrip('#&').lower()
_td_days = dict(
y=365.2422, yr=365.2422, year=365.2422,
mo=30.5, month=30.5, w=7, week=7, d=1, day=1 )
_td_s = dict( h=3600, hr=3600, hour=3600,
m=60, min=60, minute=60, s=1, sec=1, second=1 )
_td_usort = lambda d: sorted(
d.items(), key=lambda kv: (kv[1], len(kv[0])), reverse=True )
_td_re = re.compile('(?i)^[-+]?' + ''.join( fr'(?P<{k}>\d+{k}\s*)?'
for k, v in [*_td_usort(_td_days), *_td_usort(_td_s)] ) + '$')
def parse_timestamp(ts_str):
ts, ts_str = None, ts_str.replace('_', ' ')
if not ts and ( # short time offset like "3d 5h"
(m := _td_re.search(ts_str)) and any(m.groups()) ):
delta = list()
for units in _td_days, _td_s:
val = 0
for k, v in units.items():
if not m.group(k): continue
val += v * int(''.join(filter(str.isdigit, m.group(k))) or 1)
delta.append(val)
ts = dt.datetime.now() - dt.timedelta(*delta)
if not ts and (m := re.search( # common BE format
r'^(?P<date>(?:\d{2}|(?P<Y>\d{4}))-\d{2}-\d{2})'
r'(?:[ T](?P<time>\d{2}(?::\d{2}(?::\d{2})?)?)?)?$', ts_str )):
tpl = 'y' if not m.group('Y') else 'Y'
tpl, tss = f'%{tpl}-%m-%d', m.group('date')
if m.group('time'):
tpl_time = ['%H', '%M', '%S']
tss += ' ' + ':'.join(tss_time := m.group('time').split(':'))
tpl += ' ' + ':'.join(tpl_time[:len(tss_time)])
try: ts = dt.datetime.strptime(tss, tpl)
except ValueError: pass
if not ts and (m := re.search( # just time without AM/PM - treat as 24h format
r'^\d{1,2}:\d{2}(?::\d{2}(?P<us>\.\d+)?)?$', ts_str )):
us, tpl = 0, ':'.join(['%H', '%M', '%S'][:len(ts_str.split(':'))])
if m.group('us'):
ts_str, us = ts_str.rsplit('.', 1)
us = us[:6] + '0'*max(0, 6 - len(us))
try: ts = dt.datetime.strptime(ts_str, tpl)
except ValueError: pass
else:
ts = dt.datetime.now().replace( hour=ts.hour,
minute=ts.minute, second=ts.second, microsecond=int(us) )
if not ts: # coreutils' "date" parses everything, but is more expensive to use
while True:
res = sp.run( ['date', '+%s', '-d', ts_str],
stdout=sp.PIPE, stderr=sp.DEVNULL )
if res.returncode:
if ',' in ts_str: ts_str = ts_str.replace(',', ' '); continue
else: ts = dt.datetime.fromtimestamp(int(res.stdout.strip()))
break
if ts: return ts
raise ValueError(f'Failed to parse date/time string: {ts_str}')
class IterLogError(Exception): pass
class IterLogLine(str):
__slots__ = 'ts',
def __new__(cls, line, ts):
line = line.rstrip('\0\r\n')
(s := super().__new__(cls, line)).ts = ts
return s
class IterLog:
'Iterator for lines-within-time-range in maybe-compressed ZNC logs'
xz_kill_timeout = 1.0
peek_bs = 100 * 2**10 # 100 KiB
src = xz = ts0 = ts1 = ts_line = None
def __init__(self, p, p_date, ts_a=None, ts_b=None):
self.p, self.date, self.ts_a, self.ts_b = p, p_date, ts_a, ts_b
self.dt = dt.datetime(self.date.year, self.date.month, self.date.day)
self.log = logging.getLogger('zlr.iter')
def open(self):
if self.src: return
self.src = self.p.open('rb')
if self.p.name.endswith('.xz'):
self.xz = sp.Popen(['xz', '-d'], stdin=self.src, stdout=sp.PIPE)
self.src = self.xz.stdout
def close(self):
if self.xz:
try:
if (err := self.xz.poll()) is None:
self.xz.terminate()
try: self.xz.wait(self.xz_kill_timeout)
except sp.TimeoutExpired: self.xz.kill()
err = self.xz.poll()
except ProcessLookupError: pass
finally:
self.xz = None
if err > 0:
self.log.warning(f'xz failed (exit={err}) for path [ {rel_path(self.p)} ]')
if self.src: self.src = self.src.close()
def __enter__(self): self.open(); return self
def __exit__(self, *err): self.close()
def __del__(self): self.close()
def get_iter(self):
'Return iterator for lines within time-range or None if there arent any'
proc, p_log = self.open(), rel_path(self.p)
try:
if not (d := self.date).monthly: # daily logs can have diff timestamps
self.ts0 = dt.datetime(d.year, d.month, d.day)
self.ts1 = self.ts0.replace(
hour=23, minute=59, second=59, microsecond=int(1e6-1) )
else:
self.ts0 = self.get_first_ts(
self.src.peek(self.peek_bs)[:self.peek_bs],
lambda buff: buff.split(b'\n', 1) )
if self.ts_b:
if not self.ts0: raise IterLogError(f'Failed to match BOF timestamp [ {p_log} ]')
if self.ts_a:
buff = self.src.read(sz := 512 * 2**20) # read/xzcat whole file into buffer
if len(buff) >= sz: raise IOError(f'File-peek failed - file too large [ {p_log} ]')
self.src.close()
self.src, buff = io.BytesIO(buff), buff[-self.peek_bs:]
self.ts1 = self.get_first_ts(buff, lambda buff: buff.rsplit(b'\n', 1)[::-1])
if not self.ts1: raise IterLogError(f'Failed to match EOF timestamp [ {p_log} ]')
skip = (
' skip' if (self.ts_b and self.ts0 > self.ts_b)
or (self.ts_a and self.ts1 < self.ts_a) else '' )
self.log.debug(f'File{skip} [ {p_log} ]: [ {self.ts0} ] - [ {self.ts1} ]')
if skip: return
proc = True
finally:
if not proc: self.close()
return iter(self)
def match_ts(self, line):
try: ts = re.search(r'^[\s\0]*\[([^]]+)\][\s\0]+', line).group(1)
except: return
try: return dt.datetime.strptime(ts, '%y-%m-%d %H:%M:%S')
except ValueError:
ts = dt.datetime.strptime(ts, '%H:%M:%S')
return self.dt.replace(hour=ts.hour, minute=ts.minute, second=ts.second)
def get_first_ts(self, buff, line_split_func):
while buff:
line = (split := line_split_func(buff))[0].strip(b'\0').decode()
if ts := self.match_ts(line): return ts
buff = split[1] if len(split) > 1 else b''
def __iter__(self):
self._proc_st = None # None -> True -> False
if not isinstance(self.src, io.TextIOWrapper):
self.src.seek(0)
self.src = io.TextIOWrapper(self.src, errors='backslashreplace')
return self
def __next__(self):
if self._proc_st is False: raise StopIteration
try: line = self._next_line()
except StopIteration: self._proc_st = False; raise StopIteration
return line
def _next_line(self):
line = None
if self._proc_st is None: # seek to first line within timespan
if self.ts_a:
for line in self.src:
if (ts := self.match_ts(line)) and ts >= self.ts_a: break
else: raise StopIteration
self._proc_st = True
if line is not None: line = IterLogLine(line, ts)
if not line:
ts = self.match_ts(line := next(self.src))
if self.ts_b and ts > self.ts_b: raise StopIteration
line = IterLogLine(line, ts)
if not line.ts: line.ts = self.ts_line
self.ts_line = line.ts
return line
class LogDate(dt.date): monthly = False
def find_logs(p_znc, p_logs, log_fn_tpl):
logs = dict()
def logs_append(user, net, chan, ts_str, p):
if net == 'default': net = user
if net not in logs: logs[net] = dict()
if chan not in logs[net]: logs[net][chan] = list()
ts = LogDate(*map(int, [
ts_str[:4], ts_str[4:6], (day := int(ts_str[6:8])) or 1 ]))
ts.monthly = not day
if not (len(ts_str) == 8 and 2050 > ts.year >= 2000):
raise AssertionError(ts_str)
logs[net][chan].append((ts, p.resolve()))
def re_chk(re_str, p, *groups):
if not (m := re.search(re_str, str(p))): raise AssertionError(re_str, p)
if groups: return (m.group(k) for k in groups)
return m
# Prehistoric logs
for p in p_znc.glob('users/*/moddata/log/*.log'):
net, chan, ts = re_chk(
r'/users/(?P<net>[^/]+)/moddata/'
r'log/(?P<chan>[^/]+)_(?P<ts>\d{8})\.log$',
p, 'net', 'chan', 'ts' )
logs_append(None, net, chan, ts, p)
# Logs for old-style setup with user=network, multi-network users
for p in p_znc.glob('moddata/log/*.log'):
user, net, chan, ts = re_chk(
r'/moddata/log/(?P<user>[^/]+?)'
r'_(?P<net>[^/]+?)_(?P<chan>[^/]+)_(?P<ts>\d{8})\.log$',
p, 'user', 'net', 'chan', 'ts' )
if '_' in f'{user}{net}': raise AssertionError(net, p)
logs_append(user, net, chan, ts, p)
# Modern logs enabled globally for multi-network users
# Can also be enabled by user: users/*/moddata/log/*/*/*.log
# Can also be enabled by network: users/*/networks/*/moddata/log/*/*.log
# All these are redundant with globally-enabled module, so not used here
for p in p_znc.glob('moddata/log/*/*/*/*.log'):
user, net, chan, ts = re_chk(
r'/moddata/log/(?P<user>[^/]+?)/'
r'(?P<net>[^/]+?)/(?P<chan>[^/]+)/(?P<ts>\d{4}-\d{2}-\d{2})\.log$',
p, 'user', 'net', 'chan', 'ts' )
if '_' in f'{user}{net}': raise AssertionError(net, p)
logs_append(user, net, chan, ts.replace('-', ''), p)
# Archived logs
log_fn_re = fnmatch.translate(log_fn_tpl).replace(r'\{', '{').replace(r'\}', '}')
log_fn_re = re.compile(log_fn_re.format( net=r'(?P<net>[^/]+?)',
chan=r'(?P<chan>[^/]+)', yy=r'(?P<yy>\d{2})', mm=r'(?P<mm>\d{2})' ))
log_fn_wc = log_fn_tpl.format(net='*', chan='*', yy='[0-9][0-9]', mm='[0-9][0-9]')
for p in p_logs.glob(log_fn_wc):
mg = log_fn_re.search(str(p.relative_to(p_logs))).groupdict()
net, chan, yy, mm = (mg.get(k) for k in ['net', 'chan', 'yy', 'mm'])
logs_append(None, net, chan, f'20{int(yy or 00):02d}{int(mm or 0):02d}00', p)
return logs
def open_log_iterators(ctx, log_map, ts_a, ts_b):
'''Filters/opens and returns a list of
(net, chan, line_iter) net/chan line-iterators, to print in that order.'''
log = logging.getLogger('zlr.open')
logs, log_nets, log_chans = list(), set(), set()
for net, chans in log_map.items():
for chan, files in chans.items():
chan = chan_name(chan)
for ts, p in reversed(sorted(files)): # newest-first
# Discard all further way-too-old xz archives (should hold one month at most)
if ts_a and (ts + dt.timedelta(63)) < ts_a:
log.debug(f'Skipping too-old logs: {ts} - [ {rel_path(p)} ] and older')
break
# Skip way-too-new xz archives
if ts_b and (ts - dt.timedelta(63)) > ts_b:
log.debug(f'Skipping too-new logfile: {ts} [ {rel_path(p)} ]')
continue
# Create line-by-line iterators on file(s)
if not (line_iter := IterLog(p, ts, ts_a, ts_b).get_iter()): continue
ctx.enter_context(line_iter)
# id value is there to make sure sort never reaches iterators
logs.append((net or '', chan or '', line_iter.ts0, id(line_iter), line_iter))
logs.sort()
return list((net, chan, line_iter) for net, chan, ts0, iter_id, line_iter in logs)
def main(argv=None):
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
usage='%(prog)s [opts] [channels...]',
description='Concatenate and print ZNC logs for matching nets/chans to stdout.')
parser.add_argument('channels', nargs='*', help=dd('''
Channel(s) to print logs for. Parts of names by default, without -x/--exact option.
No names - find/read logs for all channels (see also -d/--conf option).'''))
group = parser.add_argument_group('Selection options')
group.add_argument('-n', '--network', metavar='net',
help='IRC network to filter logs by (only needed on channel name conflicts).')
group.add_argument('-d', '--conf', action='store_true', help=dd('''
Use/match only non-detached channels in znc.conf
These are added to a list of channels specified as arguments, if any.'''))
group.add_argument('-x', '--exact', action='store_true',
help='Channel name must match given string(s) exactly.')
group.add_argument('-s', '--service-msgs', action='store_true',
help='Do not remove service messages (renames, joins/leaves, etc).')
group = parser.add_argument_group('Limits/ranges')
group.add_argument('-c', '--count', metavar='n', type=int,
help='Max number of latest lines to print per channel.')
group.add_argument('-f', '--ts-from', metavar='date/time', default='18h',
help='Print lines since that time (can be relative, default: %(default)s).')
group.add_argument('-t', '--ts-to', metavar='date/time',
help='Print lines until that time (can be relative, default: now).')
group = parser.add_argument_group('Paths')
group.add_argument('--znc-home', metavar='dir', default='~znc',
help='Path to ZNC home directory (default: %(default)s).')
group.add_argument('--log-dir', metavar='dir', default='~znc/logs',
help='Path to destination directory to store aggregated logs to (default: %(default)s).')
group.add_argument('--log-dir-file-pat',
metavar='pat', default='{net}/*/{chan}__{yy}-{mm}.log.xz', help=dd('''
fnmatch/glob pattern for relative paths of archived files in --log-dir.
Should use following python str.format keys: net, chan, yy, mm.
Any missing keys will return values that won't match filtering opts for those.
Default: %(default)s'''))
parser.add_argument('--debug', action='store_true', help='Verbose operation mode.')
opts = parser.parse_args()
logging.basicConfig(
format='-- %(name)s %(levelname)s :: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG if opts.debug else logging.INFO )
log = logging.getLogger('zlr.main')
p_znc, p_logs = (
pl.Path(p).expanduser().resolve()
for p in [opts.znc_home, opts.log_dir] )
os.chdir(p_znc) # strips it for rel_path logging
## Full net-chan-files mapping
log_map = find_logs(p_znc, p_logs, opts.log_dir_file_pat)
## Filter by network
if opts.network:
try: log_map = {opts.network: log_map[opts.network]}
except KeyError:
parser.error(f'No logs for specified network: {opts.network}')
## Filter channels
if not opts.channels: chans = None
else: chans = list(chan_name(name) for name in opts.channels)
if opts.conf: # find subset of channels to work with, if requested
chans, conf = chans or list(), pl.Path('configs/znc.conf').read_text()
for chan, conf in re.findall(r'<Chan (?P<name>\S+)>(.*?)</Chan>', conf, re.DOTALL):
if not re.search(r'\bDetached\s*=\s*true\b', conf): chans.append(chan_name(chan))
if chans:
chans = sorted(set(chans))
log.debug(f'Channels: {" ".join(chans)}')
if chans is not None:
for net, net_chans in log_map.items():
for chan in list(net_chans):
name = chan_name(chan)
if opts.exact:
if name not in chans: name = None
elif not any(pat in name for pat in chans): name = None
if not name: del net_chans[chan]
if not log_map: parser.error('No logs found matching channel/network filters')
## Parse from/to timestamps
def _parse_ts_arg(k, ts_str):
if not ts_str: return
ts = parse_timestamp(ts_str)
log.debug(f'Timestamp option {k} [ {ts_str} ] parsed to [ {ts} ]')
return ts
ts_a, ts_b = (_parse_ts_arg(k, getattr( opts, k.split('/', 1)[-1]
.strip('-').replace('-', '_') )) for k in ['-f/--ts-from', '-t/--ts-to'])
if ts_a and ts_b and ts_a > ts_b:
parser.error(f'Parsed -f/--ts-from [ {ts_a} ] value is after -t/--ts-to [ {ts_b} ]')
with cl.ExitStack() as ctx:
## Build a list of relevant net/chan line-iterators to print in order
logs = open_log_iterators(ctx, log_map, ts_a, ts_b)
## Check if net/chan prefix is needed
print_net, print_chan = set(), set()
for net, chan, log_iter in logs:
print_net.add(net); print_chan.add(chan)
print_net, print_chan = len(print_net) > 1, len(print_chan) > 1
## Print lines from iterators
src, src_prefix = None, ''
if opts.count: # buffer "last N lines per channel" and print separately below
chan_buffs = cs.defaultdict(lambda: cs.deque(maxlen=opts.count))
for net, chan, line_iter in logs:
if src != (net, chan) and (print_net or print_chan):
src_prefix = '/'.join([*[net]*print_net, *[chan]*print_chan]) + ' '
log.debug(f'Processing log-file [{net} {chan}]: {rel_path(line_iter.p)}')
sys.stderr.flush()
for line in line_iter:
# Skip service msgs
if not opts.service_msgs and re.search(r'^\s*\[[^]]+\]\s+[*]{3}\s+', line): continue
# Prepend full date, if missing in timestamp
if not re.search(r'^\s*\[\d{2}-\d{2}-\d{2}\s+[^]]+\]\s+', line) and (ts := line.ts):
line, n = re.subn( r'^(\s*\[)(\d{2}:\d{2}:\d{2}\]\s+)',
fr'\g<1>{ts.year%100:02}-{ts.month:02}-{ts.day:02} \g<2>', line )
if not n: line = f'[{ts.strftime("%y-%m-%d %H:%M:%S")}] {line}'
line = f'{src_prefix}{line}'
if not opts.count: print(line)
else: chan_buffs[net, chan].append(line)
sys.stdout.flush()
if opts.count:
for buff in chan_buffs.values():
for line in buff: print(line)
if __name__ == '__main__':
try: sys.exit(main())
except BrokenPipeError: # stdout pipe closed
os.dup2(os.open(os.devnull, os.O_WRONLY), sys.stdout.fileno())
sys.exit(1)