-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlogparser.py
174 lines (124 loc) · 4.49 KB
/
logparser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import argparse
from collections import deque
import re
# Partition functions
def _all(fd):
return fd.read().splitlines()
def _first(fd, n):
return [fd.readline()[:-1] for _ in range(n)]
def _last(fd, n):
q = deque(maxlen=n)
for line in fd:
q.append(line if line[-1] != '\n' else line[:-1])
return q
def partition(fd, f, l):
if f and l:
raise argparse.ArgumentTypeError('I think this is not the expected usage.')
elif f:
lines = _first(fd, f)
elif l:
lines = _last(fd, l)
else:
lines = _all(fd)
return lines
# Input selection
def _get_fd(path=None):
if path:
return open(path, 'r')
else:
return sys.stdin
# Regex
_ipv4_regex = r"\b(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\."\
r"(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\."\
r"(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\."\
r"(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\b"
_ipv6_regex = r'([0-f]{4}:){7}[0-f]{4}'
# Filters
def _filter_timestamp(line):
s = r'(0[0-9]|1[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]'
return re.search(s, line) is not None
def _filter_ipv4(line):
return re.search(_ipv4_regex, line) is not None
def _filter_ipv6(line):
return re.search(_ipv6_regex, line) is not None
def _predicates_list(filters):
predicates = []
for f in filters:
if f == 't':
predicates.append(_filter_timestamp)
if f == 'i':
predicates.append(_filter_ipv4)
if f == 'I':
predicates.append(_filter_ipv6)
return predicates
# Highlighters
def _highlight_str(color, s):
return f'\033[{color}m{s}\033[m'
def _red_highlight(s):
return _highlight_str(';31', s)
def _highlight(regex, line):
for iter in re.finditer(regex, line):
line = re.sub(iter[0], _red_highlight(iter[0]), line)
return line
def _highlight_ipv4(line):
return _highlight(_ipv4_regex, line)
def _highlight_ipv6(line):
return _highlight(_ipv6_regex, line)
def _highlighters_list(highs):
highlighters = []
for h in highs:
if h == 'i':
highlighters.append(_highlight_ipv4)
if h == 'I':
highlighters.append(_highlight_ipv6)
return highlighters
# Main
def highlight_log(lines, highlighters):
if not highlighters:
return lines
highlighted_lines = []
for line in lines:
highlighted_line = line
for highlight in highlighters:
highlighted_line = highlight(highlighted_line)
highlighted_lines.append(highlighted_line)
return highlighted_lines
def filter_log(lines, predicates):
return [line for line in lines if all([pred(line) for pred in predicates])]
def parse_log(file_, f, l, filters):
fd = _get_fd(file_)
lines = partition(fd, f, l)
filtered_lines = filter_log(lines, _predicates_list(filters))
highlighted_lines = highlight_log(filtered_lines, _highlighters_list(filters))
return highlighted_lines
# Args Handlers
def positive_int(value):
v = int(value)
if v > 0:
return v
else:
raise argparse.ArgumentTypeError('only positive numbers.')
def parse_args(args):
parser = argparse.ArgumentParser(description='Parse logs.')
parser.add_argument('file', metavar='FILE', nargs='?',
help='an integer for the accumulator')
parser.add_argument('-f','--first', metavar='NUM', type=positive_int,
help='Print first NUM lines')
parser.add_argument('-l','--last', metavar='NUM', type=positive_int,
help='Print last NUM lines')
parser.add_argument('-t','--timestamps', dest='filters', action='append_const', const='t',
help='Print lines that contain a timestamp in HH:MM:SS format')
parser.add_argument('-i','--ipv4', dest='filters', action='append_const', const='i',
help='Print lines that contain an IPv4 address, matching IPs are highlighted')
parser.add_argument('-I','--ipv6', dest='filters', action='append_const', const='I',
help='Print lines that contain an IPv6 address (standard notation), matching IPs are highlighted')
parser.set_defaults(filters = [])
return parser.parse_args(args)
def main():
args = parse_args(sys.argv[1:])
print('\n'.join(parse_log(args.file, args.first, args.last, args.filters)))
if __name__ == '__main__':
main()