-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparse_massdns.py
executable file
·70 lines (60 loc) · 2.4 KB
/
parse_massdns.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/usr/bin/env python3
import csv
import io
import itertools
import json
import logging
import os.path
import sys
import pathlib
from typing import List
import click
from contextlib import ExitStack
@click.command()
@click.option('--output', type=click.Path(file_okay=False, dir_okay=True), required=True)
@click.argument('input-file', type=click.File('r'), default=sys.stdin)
def extract_domains(output: str, input_file: io.TextIOWrapper):
types = [('A', 'resolved-A.ipdomain'), ('AAAA', 'resolved-AAAA.ipdomain') ]
pathlib.Path(output).mkdir()
logging.basicConfig(filename=os.path.join(output, 'massdns_parse.log'))
with ExitStack() as stack:
out_files = {fname[0]: csv.writer(stack.enter_context(open(os.path.join(output, fname[1]), 'w'))) for fname in
types}
for line in input_file:
try:
parsed = json.loads(line)
parent_name = parsed['name']
if 'data' in parsed:
data = parsed['data']
if 'answers' in data:
answers = data['answers']
parse_records(out_files, answers, parent_name)
except Exception as e:
logging.exception(f'Exception occurred at {line}', exc_info=e)
def follow_cnames(name, cname, answers, out_files, stack=[]):
if cname not in stack:
stack.append(cname)
for answer in answers:
sub_name = answer['name'].rstrip('. ')
if cname == sub_name:
data = answer['data'].rstrip('. ')
t = answer['type']
if t in ['A', 'AAAA']:
out_files[t].writerow([data, name])
elif t == 'CNAME':
follow_cnames(name, data, answers, out_files, stack=stack + [])
else:
logging.error(f'Unknown type for {name}: {answer}')
def parse_records(out_files: List[csv.writer], answers, parent_name: str):
for answer in answers:
t = answer['type']
data = answer['data'].rstrip('. ')
name = answer['name'].rstrip('. ')
if t in ['A', 'AAAA']:
out_files[t].writerow([data, name])
elif t == 'CNAME':
follow_cnames(name, data, answers, out_files, stack=[name])
else:
logging.error(f'Unknown type for {name}: {answer}')
if __name__ == '__main__':
extract_domains()