forked from qtumproject/qtum
-
Notifications
You must be signed in to change notification settings - Fork 0
/
qtum_faulty_header_chain.py
executable file
·136 lines (117 loc) · 5.93 KB
/
qtum_faulty_header_chain.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python3
# Copyright (c) 2015-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import *
from test_framework.script import *
from test_framework.mininode import *
from test_framework.messages import *
from test_framework.qtum import *
import time
import io
class QtumHeaderSpamTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 2
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def start_p2p_connection(self):
self.p2p_node = self.node.add_p2p_connection(P2PInterface())
def _remove_from_staking_prevouts(self, block):
for j in range(len(self.staking_prevouts)):
prevout = self.staking_prevouts[j]
if prevout[0].serialize() == block.prevoutStake.serialize():
self.staking_prevouts.pop(j)
break
def sign_block_with_standard_private_key(self, block):
block_sig_key = ECKey()
block_sig_key.set(hash256(struct.pack('<I', 0)), False)
block.sign_block(block_sig_key)
def create_pos_block(self, staking_prevouts, parent_block, parent_block_stake_modifier, block_height, block_reward=2000400000000):
coinbase = create_coinbase(block_height+1)
coinbase.vout[0].nValue = 0
coinbase.vout[0].scriptPubKey = b""
coinbase.rehash()
block = create_block(parent_block.sha256, coinbase, (parent_block.nTime + 0x10) & 0xfffffff0)
if not block.solve_stake(parent_block_stake_modifier, staking_prevouts):
return None
# create a new private key used for block signing.
block_sig_key = ECKey()
block_sig_key.set(hash256(struct.pack('<I', 0)), False)
pubkey = block_sig_key.get_pubkey().get_bytes()
scriptPubKey = CScript([pubkey, OP_CHECKSIG])
stake_tx_unsigned = CTransaction()
stake_tx_unsigned.vin.append(CTxIn(block.prevoutStake))
stake_tx_unsigned.vout.append(CTxOut())
stake_tx_unsigned.vout.append(CTxOut(2000400000000, scriptPubKey))
stake_tx_signed_raw_hex = self.node.signrawtransactionwithwallet(bytes_to_hex_str(stake_tx_unsigned.serialize()))['hex']
f = io.BytesIO(hex_str_to_bytes(stake_tx_signed_raw_hex))
stake_tx_signed = CTransaction()
stake_tx_signed.deserialize(f)
block.vtx.append(stake_tx_signed)
block.hashMerkleRoot = block.calc_merkle_root()
block.sign_block(block_sig_key)
return block
def calculate_stake_modifier(self, parent_block_modifier, current_block):
data = b""
data += ser_uint256(current_block.sha256 if current_block.prevoutStake.serialize() == COutPoint(0, 0xffffffff).serialize() else current_block.prevoutStake.hash)
data += ser_uint256(parent_block_modifier)
return uint256_from_str(hash256(data))
def run_test(self):
self.node = self.nodes[0]
self.alt_node = self.nodes[1]
privkey = byte_to_base58(hash256(struct.pack('<I', 0)), 239)
self.node.importprivkey(privkey)
self.start_p2p_connection()
self.node.setmocktime(int(time.time())-10000)
self.node.generatetoaddress(600, "qSrM9K6FMhZ29Vkp8Rdk8Jp66bbfpjFETq")
self.sync_all()
disconnect_nodes(self.node, 1)
self.node.setmocktime(0)
self.staking_prevouts = collect_prevouts(self.node)
tip = self.node.getblock(self.node.getbestblockhash())
stake_modifier = int(tip['modifier'], 16)
block_height = tip['height']
block_raw_hex = self.node.getblock(self.node.getbestblockhash(), False)
f = io.BytesIO(hex_str_to_bytes(block_raw_hex))
block = CBlock()
block.deserialize(f)
block.rehash()
# Make sure that first sending a header and then announcing its block succeeds
block = self.create_pos_block(self.staking_prevouts, block, stake_modifier, block_height)
self._remove_from_staking_prevouts(block)
block.rehash()
self._remove_from_staking_prevouts(block)
self.p2p_node.send_message(msg_headers([CBlockHeader(block)]))
time.sleep(0.05)
assert(self.node.getblockheader(block.hash))
assert_raises_rpc_error(-1, "Block not found on disk", self.node.getblock, block.hash)
self.p2p_node.send_message(msg_block(block))
time.sleep(0.05)
assert(self.node.getblockheader(block.hash))
assert(self.node.getblock(block.hash))
# Make sure that the identical block with only a modified nonce (i.e. the same prevoutStake but different block hash is not accepted)
block.nNonce += 1
self.sign_block_with_standard_private_key(block)
self.p2p_node.send_message(msg_headers([CBlockHeader(block)]))
time.sleep(0.05)
block.rehash()
assert_raises_rpc_error(-5, "Block not found", self.node.getblockheader, block.hash)
assert_raises_rpc_error(-5, "Block not found", self.node.getblock, block.hash)
# Make sure that the chain is still reorgable if presented with a longer chain
blocks = [block]
child_stake_modifier = self.calculate_stake_modifier(stake_modifier, block)
child_block = self.create_pos_block(self.staking_prevouts, block, child_stake_modifier, block_height+1)
self._remove_from_staking_prevouts(child_block)
child_block.rehash()
print(self.alt_node.submitblock(bytes_to_hex_str(block.serialize())))
print(self.alt_node.submitblock(bytes_to_hex_str(child_block.serialize())))
self.node.setmocktime(child_block.nTime-16)
connect_nodes(self.node, 1)
time.sleep(1)
self.node.setmocktime(0)
self.alt_node.generate(1)
self.sync_all()
if __name__ == '__main__':
QtumHeaderSpamTest().main()