-
Notifications
You must be signed in to change notification settings - Fork 4
/
minimizer.py
213 lines (196 loc) · 9.3 KB
/
minimizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
from burp import IBurpExtender, IContextMenuFactory, IContextMenuInvocation
from burp import IParameter, IRequestInfo
from java.net import URL, URLClassLoader
from java.lang import Thread as JavaThread
from javax.swing import JMenuItem
import array
import xmltodict
from threading import Thread
from functools import partial
import json
import time
import copy
import sys
import traceback
IGNORED_INVARIANTS = set(['last_modified_header'])
class Minimizer(object):
def __init__(self, callbacks, request):
self._cb = callbacks
self._helpers = callbacks.helpers
self._request = request[0]
self._httpServ = self._request.getHttpService()
def _fix_classloader_problems(self):
# Get path to jython jar
jython_jar = None
for path in sys.path:
if '.jar' in path and 'jython' in path.lower():
jython_jar = path[:path.index('.jar')+4]
if jython_jar is None:
raise Exception("Could not locate jython jar in path!")
classloader = URLClassLoader([URL("file://" + jython_jar)], JavaThread.currentThread().getContextClassLoader())
JavaThread.currentThread().setContextClassLoader(classloader);
def compare(self, etalon, response, etalon_invariant):
invariant = set(self._helpers.analyzeResponseVariations([etalon, response]).getInvariantAttributes())
print("Invariant", invariant)
print("diff", set(etalon_invariant) - set(invariant))
return len(set(etalon_invariant) - set(invariant)) == 0
def minimize(self, replace, event):
Thread(target=self._minimize, args=(replace,)).start()
def _fix_cookies(self, current_req):
""" Workaround for a bug in extender,
see https://support.portswigger.net/customer/portal/questions/17091600
"""
cur_request_info = self._helpers.analyzeRequest(current_req)
new_headers = []
rebuild = False
for header in cur_request_info.getHeaders():
if header.strip().lower() != 'cookie:':
new_headers.append(header)
else:
rebuild = True
if rebuild:
return self._helpers.buildHttpMessage(new_headers, current_req[cur_request_info.getBodyOffset():])
return current_req
def removeHeader(self, req, target_header):
req_info = self._helpers.analyzeRequest(req)
new_headers = []
headers = req_info.getHeaders()
print("DEBUG: GetHeaders(): ", headers)
for header in headers:
if header != target_header :
print("DEBUG: Header, target_Header", header, target_header)
new_headers.append(header)
return self._helpers.buildHttpMessage(new_headers, req[req_info.getBodyOffset():])
def _minimize(self, replace):
try:
self._fix_classloader_problems()
seen_json = seen_xml = False
request_info = self._helpers.analyzeRequest(self._request)
current_req = self._request.getRequest()
etalon = self._cb.makeHttpRequest(self._httpServ, current_req).getResponse()
etalon2 = self._cb.makeHttpRequest(self._httpServ, current_req).getResponse()
invariants = set(self._helpers.analyzeResponseVariations([etalon, etalon2]).getInvariantAttributes())
invariants -= IGNORED_INVARIANTS
print("Request invariants", invariants)
for param in request_info.getParameters():
param_type = param.getType()
if param_type in [IParameter.PARAM_URL, IParameter.PARAM_BODY, IParameter.PARAM_COOKIE]:
print("Trying", param_type, param.getName(), param.getValue())
req = self._helpers.removeParameter(current_req, param)
resp = self._cb.makeHttpRequest(self._httpServ, req).getResponse()
if self.compare(etalon, resp, invariants):
print("excluded:", param.getType(), param.getName(), param.getValue())
current_req = self._fix_cookies(req)
else:
if param_type == IParameter.PARAM_JSON:
seen_json = True
elif param_type == IParameter.PARAM_XML:
seen_xml = True
else:
print("Unsupported type:", param.getType())
# minimize headers
# do not remove GET/POST and Host header -> skip first 2 elements
for header in request_info.getHeaders()[2:]:
req = self.removeHeader(current_req, header)
resp = self._cb.makeHttpRequest(self._httpServ, req).getResponse()
if self.compare(etalon, resp, invariants):
print("excluded: Header ", header)
current_req = self._fix_cookies(req)
seen_json = (request_info.getContentType() == IRequestInfo.CONTENT_TYPE_JSON or seen_json)
seen_xml = (request_info.getContentType() == IRequestInfo.CONTENT_TYPE_XML or seen_xml)
if seen_json or seen_xml:
body_offset = request_info.getBodyOffset()
headers = self._request.getRequest()[:body_offset].tostring()
body = self._request.getRequest()[body_offset:].tostring()
if seen_json:
print('Minimizing json...')
dumpmethod = partial(json.dumps, indent=4)
loadmethod = json.loads
elif seen_xml:
print('Minimizing XML...')
dumpmethod = partial(xmltodict.unparse, pretty=True)
loadmethod = xmltodict.parse
# The minimization routine for both xml and json is the same,
# the only difference is with load and dump functions
def check(body):
if len(body) == 0 and not seen_json:
# XML with and no root node is invalid
return False
body = str(dumpmethod(body))
req = fix_content_type(headers, body)
resp = self._cb.makeHttpRequest(self._httpServ, req).getResponse()
if self.compare(etalon, resp, invariants):
print("Not changed: " + body)
return True
else:
print("Changed: " + body)
return False
body = loadmethod(body)
body = bf_search(body, check)
current_req = fix_content_type(headers, str(dumpmethod(body)))
if replace:
self._request.setRequest(current_req)
else:
self._cb.sendToRepeater(
self._httpServ.getHost(),
self._httpServ.getPort(),
self._httpServ.getProtocol() == 'https',
current_req,
"minimized"
)
except:
print traceback.format_exc()
def bf_search(body, check_func):
print('Starting to minimize', body)
if isinstance(body, dict):
to_test = body.items()
assemble = lambda l : dict(l)
elif type(body) == list:
to_test = zip(range(len(body)), body)
assemble = lambda l: list(zip(*sorted(l))[1] if len(l) else [])
#1. Test all sub-elements
tested = []
while len(to_test):
current = to_test.pop()
print('Trying to eliminate', current)
if not check_func(assemble(to_test+tested)):
tested.append(current)
#2. Recurse into remainig sub_items
to_test = tested
tested = []
while len(to_test):
key, value = to_test.pop()
if isinstance(value,list) or isinstance(value, dict):
def check_func_rec(body):
return check_func(assemble(to_test + tested + [(key, body)]))
value = bf_search(value, check_func_rec)
tested.append((key, value))
return assemble(tested)
def fix_content_type(headers, body):
headers = headers.split('\r\n')
for i in range(len(headers)):
if headers[i].lower().startswith('content-length'):
headers[i] = 'Content-Length: ' + str(len(body))
return array.array('b', '\r\n'.join(headers) + body)
class BurpExtender(IBurpExtender, IContextMenuFactory):
def registerExtenderCallbacks(self, callbacks):
callbacks.setExtensionName("Request minimizer")
callbacks.registerContextMenuFactory(self)
self._callbacks = callbacks
def createMenuItems(self, invocation):
if invocation.getInvocationContext() == IContextMenuInvocation.CONTEXT_MESSAGE_EDITOR_REQUEST:
return [JMenuItem(
"Minimize in current tab",
actionPerformed=partial(
Minimizer(self._callbacks, invocation.getSelectedMessages()).minimize,
True
)
),
JMenuItem(
"Minimize in a new tab",
actionPerformed=partial(
Minimizer(self._callbacks, invocation.getSelectedMessages()).minimize,
False
)
),
]