-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreplay.py
150 lines (130 loc) · 4.49 KB
/
replay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# we disable profiling before importing tango
import os
os.environ['TANGO_NO_PROFILE'] = 'y'
from tango.exceptions import LoadedException, ChannelBrokenException, \
ProcessTerminatedException, ProcessCrashedException
from tango.core import FuzzerConfig
from tango.raw import RawInput
from tango.ptrace.errors import PtraceError
import asyncio
import argparse
import logging
from subprocess import PIPE
from typing import Iterable
from ast import literal_eval
from tango.core.tracker import *
class EmptyTracker(BaseTracker):
async def finalize(self, owner):
pass
@property
def entry_state(self) -> AbstractState:
"""
The state of the target when it is first launched (with no inputs sent)
:returns: The state object describing the entry state.
:rtype: AbstractState
"""
pass
@property
def current_state(self) -> AbstractState:
pass
@property
def state_graph(self) -> AbstractStateGraph:
pass
def peek(self, default_source: AbstractState, expected_destination: AbstractState) -> AbstractState:
pass
def reset_state(self, state: AbstractState):
"""
Informs the state tracker that the loader has reset the target into a
state.
"""
pass
def out_edges(self, state: AbstractState) -> Iterable[Transition]:
pass
def in_edges(self, state: AbstractState) -> Iterable[Transition]:
pass
def parse_args():
parser = argparse.ArgumentParser(description=(
"Replays an input file to a TangoFuzz target."
))
parser.add_argument("config",
help="The path to the TangoFuzz fuzz.json file.")
parser.add_argument("file",
help="The path to the input file.")
parser.add_argument('-o', '--override', action='append', nargs=2)
parser.add_argument('-v', '--verbose', action='count', default=-1,
help=("Controls the verbosity of messages. "
"-v prints info. -vv prints debug. Default: warnings and higher.")
)
return parser.parse_args()
def configure_verbosity(level):
mapping = {
0: logging.WARNING,
1: logging.INFO,
2: logging.DEBUG
}
# will raise exception when level is invalid
numeric_level = mapping[level]
logging.getLogger().setLevel(numeric_level)
async def replay_load(config, file):
gen = await config.instantiate('generator')
inp = gen.load_input(file)
ld = await config.instantiate('loader')
try:
await ld.load_state(None).asend(None)
except StopAsyncIteration:
pass
drv = await config.instantiate('driver')
return drv, inp
import signal
async def send_eof(channel, proper_signal):
try:
channel.root.detach()
except PtraceError:
# no such process
return
await asyncio.sleep(0.1)
channel.root.kill(proper_signal)
async def replay(config, file):
drv, inp = await replay_load(config, file)
try:
await drv.execute_input(inp)
except LoadedException as ex:
if not isinstance(ex._ex, ProcessCrashedException) and \
not isinstance(ex._ex, ProcessTerminatedException) and \
not isinstance(ex._ex, ChannelBrokenException):
raise
if "signal_to_exit" in config._config["driver"]:
signal_name = config._config["driver"]["signal_to_exit"]
signal_number = getattr(signal, signal_name)
await send_eof(drv._channel, signal.Signals(signal_number))
else:
await send_eof(drv._channel, signal.SIGTERM)
def main():
args = parse_args()
configure_verbosity(args.verbose)
overrides = dict()
if args.override:
for name, value in args.override:
keys = name.split('.')
levels = keys[:-1]
d = overrides
for k in levels:
if not d.get(k):
d[k] = dict()
d = d[k]
key = keys[-1]
if value.lower() in ('true', 'false'):
value = (value == 'true')
elif value.isnumeric():
value = literal_eval(value)
elif value.startswith('{'):
import json
value = json.loads(value)
d[key] = value
overrides['tracker'] = {'type': 'empty'}
config = FuzzerConfig(args.config, overrides)
config._config["driver"]["exec"]["stdout"] = "inherit"
config._config["driver"]["exec"]["stderr"] = "inherit"
asyncio.run(replay(config, args.file))
if __name__ == '__main__':
main()