-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstructurer.py
226 lines (205 loc) · 5.67 KB
/
structurer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# -*- coding: utf-8 -*-
from aggregator import Aggregator
from structures import *
import sys
import logging
class Structurer(Aggregator):
"""构建器"""
def __init__(self, binary):
"""
构建器初始化
:param binary:字节码
"""
logging.info("构建器初始化")
Aggregator.__init__(self, binary) # 聚合初始化
for func in self.get_all_functions(): # 遍历函数
self.__analyze_function(func)
def __analyze_function(self, func):
"""
分析函数
:param func:函数
:return:
"""
logging.info("构建器:分析函数开始:"+'{:#x}'.format(func.signature))
# if func.signature != 0xd9f5aed: # mixGenes(uint256,uint256,uint256)
# return
# func.visualize_function()
graph = func.graph
if self.__has_indirect_jumps(graph): # 如果有间接跳转
return
sorted_ids = graph.depth_first_search(func.entry_id) # 深度优先搜索
for block_id in sorted_ids:
self.__match_structures(block_id, graph) # 匹配结构
entry_id = func.entry_id
for block in graph:
result = block.get_block(entry_id)
if result is not None:
func.entry_id = block.get_id()
func.visualize_function() # 可视化函数
def __has_indirect_jumps(self, graph):
"""
有间接跳转
:param graph:图
:return:Boolean
"""
logging.info("构建器:有间接跳转")
indirect_jumps = set()
for block in graph:
block_id = block.get_id()
if block.check_exit_expression("JUMP") \
and len(graph.get_successor_ids(block)) > 1:
indirect_jumps.add(block_id)
return len(indirect_jumps) != 0
def __match_structures(self, block_id, graph):
"""
匹配结构
:param block_id:块编号
:param graph: 图
:return:
"""
logging.info("构建器:匹配结构体")
if not graph.has_block(block_id):
return
original_id, cur_id = block_id, -1
while cur_id != block_id:
cur_id = block_id
block_id = self.__match_ifthen(block_id, graph)
block_id = self.__match_sequence(block_id, graph)
block_id = self.__match_ifthenelse(block_id, graph)
block_id = self.__match_loop(block_id, graph)
def __match_sequence(self, a0, graph):
"""
匹配序列
:param a0:
:param graph:
:return:
"""
logging.info("构建器:匹配序列")
sequence = [a0]
prev_id = a0
while True:
cur_id = graph.get_single_successor(prev_id)
if cur_id is None:
break
if graph.get_single_predecessor(cur_id) != prev_id:
break
if graph.has_edge(cur_id, prev_id):
break
sequence.append(cur_id)
prev_id = cur_id
# print(sequence)
if len(sequence) == 1:
return a0
an = sequence[-1]
new_id = graph.allocate_id()
blocks = [graph[i] for i in sequence]
block = Seq(new_id, graph[an].get_exit_address(), blocks)
graph.add_block(block)
graph.transfer_predecessors(a0, new_id)
graph.transfer_successors(an, new_id)
graph.remove_blocks(sequence)
return a0
def __match_ifthen(self, a0, graph):
"""
匹配 ifthen
:param a0:
:param graph:
:return:
"""
logging.info("构建器:匹配 ifthen")
suc_ids = graph.get_dual_successors(a0)
if suc_ids is None:
return a0
a1 = graph.get_natural_successor(a0)
a2 = graph.get_single_successor(a1)
if a2 not in suc_ids:
return a0
new_id = graph.allocate_id()
block = IfThen(new_id, graph[a2].get_entry_address(), graph[a0], graph[a1])
graph.add_block(block)
graph.transfer_predecessors(a0, new_id)
graph.remove_blocks({a0, a1})
graph.add_edge(new_id, a2)
return new_id
def __match_ifthenelse(self, a0, graph):
"""
匹配 ifthenelse
:param a0:
:param graph:
:return:
"""
logging.info("构建器:匹配 ifthenelse")
suc_ids = graph.get_dual_successors(a0)
if suc_ids is None:
return a0
a1 = graph.get_natural_successor(a0)
a2 = (suc_ids - {a1}).pop()
if graph.get_single_predecessor(a1) != graph.get_single_predecessor(a2):
return a0
a3 = graph.get_single_successor(a1)
if a3 is None or a3 != graph.get_single_successor(a2):
return a0
suc_address = graph[a3].get_entry_address()
new_id = graph.allocate_id()
block = IfThenElse(new_id, suc_address, graph[a0], graph[a1], graph[a2])
graph.add_block(block)
graph.transfer_predecessors(a0, new_id)
graph.add_edge(new_id, a3)
graph.remove_blocks({a0, a1, a2})
return new_id
def __match_loop(self, a0, graph):
"""
匹配循环
:param a0:
:param graph:
:return:
"""
logging.info("构建器:匹配循环")
suc_ids = graph.get_dual_successors(a0)
if suc_ids is None:
return a0
a1, a2 = suc_ids
if graph.get_single_successor(a2) == a0:
a1, a2 = a2, a1
if graph.get_single_successor(a1) != a0\
or graph.get_single_predecessor(a1) != a0:
return a0
new_id = graph.allocate_id()
suc_address = graph[a2].get_entry_address()
block = Loop(new_id, suc_address, graph[a0], graph[a1])
graph.add_block(block)
graph.transfer_predecessors(a0, new_id)
graph.add_edge(new_id, a2)
graph.remove_blocks({a0, a1})
return new_id
def visualize_functions(self):
"""
函数可视化
:return:
"""
logging.info("构建器:函数可视化")
for func in self.get_all_functions():
func.visualize_function()
def debug_functions(self):
"""
debug函数
:return:
"""
for func in self.get_all_functions():
func.debug_function()
if __name__ == "__main__":
LOG_FORMAT = "%(filename)s:%(lineno)d %(levelname)s %(funcName)s: %(message)s"
# logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
"""程序主入口"""
input_file = open(sys.argv[1])
line = input_file.readline().strip()
if " " in line:
line = line.split(" ")[1]
input_file.close()
"""将字节码读入构建器"""
a = Structurer(line)
if "-v" in sys.argv:
logging.info("visualize")
a.visualize_functions()
if "-d" in sys.argv:
a.debug_functions()