-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgraphbuilder.py
212 lines (191 loc) · 6.03 KB
/
graphbuilder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# -*- coding: utf-8 -*-
from disassembler import Disassembler
from interpreter import BasicInterpreter
from image import Image
from structures import ExternalFunction
from opcodes import exit_ops
from resolver import BasicResolver
import sys, os
import logging
FALLBACK_SIGNATURE = 0xffffffff
class GraphBuilder(Disassembler):
"""
图像生成器
"""
def __init__(self, binary):
"""
图像生成器初始化
:param binary:
"""
logging.info("图像生成器初始化")
Disassembler.__init__(self, binary)
self.__init_resolver()
# initial build of graph
self.__build_graph(self.get_blocks())
self.__mark_signature_blocks()
# simplify graph
self.graph.simplify(self.__signature_blocks, self.resolver)
# build again because we need the indirect jumps
self.__build_graph(self.graph.get_blocks())
self.__create_external_functions()
self.__create_fallback_function()
def __init_resolver(self):
"""
初始化解析器
:return:
"""
logging.info("图像生成器:解析器初始化")
self.resolver = BasicResolver(self.jump_dests)
for block in self.get_blocks().values():
exit_bytecode = block.get_exit_bytecode()
opcode = exit_bytecode.opcode
if opcode in exit_ops | {"JUMP"}:
continue
block_id = block.get_id()
suc_id = block_id + 1
self.resolver.set_natural_successor(block_id, suc_id)
def __build_graph(self, blocks):
"""
构建图像
:param blocks:
:return:
"""
logging.info("图像生成器:构建图像")
interpreter = BasicInterpreter(blocks, self.resolver)
self.graph, self.tracker = \
interpreter.explore_control_flow_graph(0, Image(-1))
self.indirect_jumps = interpreter.ambiguous_blocks
# def __simplify_graph(self):
# change = True
# while change:
# removed = set()
# for block_id, block in self.graph.get_blocks().items():
# if block_id in removed:
# continue
# suc_id = self.__can_merge(block_id)
# if suc_id is None:
# continue
# if block_id in self.__signature_blocks:
# continue
# suc_block = self.graph[suc_id]
# block.merge(suc_block)
# # print(block_id, suc_id)
#
# self.graph.remove_edge(block_id, suc_id)
# suc_ids = self.graph.get_successor_ids(suc_id)
# for i in suc_ids:
# self.graph.add_edge(block_id, i)
#
# self.graph.remove_block(suc_id)
# removed.add(suc_id)
#
# nas = self.resolver.get_natural_successor(suc_id) # don't care if none
# self.resolver.set_natural_successor(block_id, nas)
# change = len(removed) != 0
#
# def __can_merge(self, block_id):
# suc_ids = self.graph.get_successor_ids(block_id)
# if len(suc_ids) != 1:
# return None
# suc_id = suc_ids.pop()
# if len(self.graph.get_predecessor_ids(suc_id)) != 1:
# return None
# return suc_id
def __mark_signature_blocks(self):
"""
标记签名块
:return:
"""
logging.info("图像生成器:标记签名块")
self.__signature_blocks = dict()
for block in self.graph.get_blocks().values():
signature = block.get_function_signature()
if signature != -1:
self.__signature_blocks[block.get_id()] = signature
def __create_external_functions(self):
"""
创建外部函数
:return:
"""
logging.info("图像生成器:循环创建外部函数")
self.external_functions = dict()
for cur_id, signature in self.__signature_blocks.items():
func = self.__create_external_function(cur_id, signature)
self.external_functions[signature] = func
# func.visualize_function()
def __create_external_function(self, cur_id, signature):
"""
创建外部函数
:param cur_id:
:param signature:
:return:
"""
logging.info("图像生成器:创建外部函数:"+'cur_id:{:#x} '.format(cur_id)+'signature:{:#x}'.format(signature))
entry_ids = self.graph.get_successor_ids(cur_id)
entry_id = max([int(i) for i in entry_ids])
interpreter = BasicInterpreter(self.graph.get_blocks(), self.resolver)
image = self.tracker.get_observed_image(entry_id)
graph, trackers = interpreter.explore_control_flow_graph(entry_id, image)
f = ExternalFunction(signature, graph, trackers, (entry_id, None))
# f.indirect_jumps = interpreter.ambiguous_blocks
return f
def __create_fallback_function(self):
"""
创建回调函数
:return:
"""
logging.info("图像生成器:创建回调函数")
if len(self.__signature_blocks) == 0:
func = ExternalFunction(FALLBACK_SIGNATURE, self.graph, self.tracker, (0, None))
self.external_functions[FALLBACK_SIGNATURE] = func
else:
suc_ids = self.graph.get_successor_ids(0)
suc_ids = suc_ids - set(self.__signature_blocks.keys())
if len(suc_ids) == 1:
if 0 not in self.__signature_blocks:
func = self.__create_external_function(0, FALLBACK_SIGNATURE)
self.external_functions[FALLBACK_SIGNATURE] = func
# this sucks
if FALLBACK_SIGNATURE not in self.external_functions:
if not self.graph.has_block(1):
self.graph.add_block(self.get_blocks()[1])
func = ExternalFunction(FALLBACK_SIGNATURE, self.graph, self.tracker, (1, None))
self.external_functions[FALLBACK_SIGNATURE] = func
def validate_execution_path(self, program_counters):
"""
验证执行路径
:param program_counters:
:return:
"""
logging.info("图像生成器:验证执行路径")
path = self.get_block_trace(program_counters)
# print(path)
self.graph.validate_path_exists(path)
def visualize_contract(self, out_file=None):
"""
可视化合约
:param out_file:
:return:
"""
logging.info("图像生成器:画图")
self.graph.visualize_contract_dot("temp/temp.dot", out_file)
os.system("dot -Tpdf temp/temp.dot -o temp/%s" % out_file)
def debug_function_bytecodes(self):
"""
debug函数字节码
:return:
"""
for func in self.external_functions.values():
func.debug_function()
if __name__ == "__main__":
input_file = open(sys.argv[1])
line = input_file.readline().strip()
if " " in line:
line = line.split(" ")[1]
input_file.close()
a = GraphBuilder(line)
if "-v" in sys.argv:
a.visualize_contract("0.CFG.pdf")
a.visualize_contract("1.GraphBuilder.pdf")
if "-d" in sys.argv:
a.debug_function_bytecodes()