-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathkademlia_dht.py
228 lines (171 loc) · 8 KB
/
kademlia_dht.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import asyncio
import logging
import socket
import pickle
from routing_table import RoutingTable
from rpc_protocol import DatagramRPCProtocol, rpc
from utils import sha1_int, random_id
logger = logging.getLogger('node')
class KademliaNode(DatagramRPCProtocol):
def __init__(self, alpha=3, k=20, identifier=None):
# Initialiaze DatagramRPCProtocol
super(KademliaNode, self).__init__()
# TODO: Make the node id a function of node's public key
# Just like Bitcoin wallet IDs use HASH160
if identifier is None:
identifier = random_id()
self.identifier = identifier
# Constants from the kademlia protocol
self.k = k
self.alpha = alpha
# Each node has their own dictionary
self.storage = {}
# The k-bucket based kademlia routing table
self.routing_table = RoutingTable(self.identifier, k=self.k)
@rpc
def ping(self, peer, peer_identifier):
logger.info('handling ping(%r, %r)', peer, peer_identifier)
# The 1st identifier is consumed by kademlia
# While the 2nd is sent as a reply back to the caller
return (self.identifier, self.identifier)
@rpc
def store(self, peer, peer_identifier, key, value):
logger.info('handling store(%r, %r, %r, %r)',
peer, peer_identifier, key, value)
self.storage[key] = value
return (self.identifier, True)
@rpc
def find_node(self, peer, peer_identifier, key):
logger.info('handling find_node(from=%r, peer_id=%r, find=%r)',
peer, peer_identifier, key)
response = self.routing_table.find_closest_peers(key, excluding=peer_identifier)
return (self.identifier, response)
@rpc
def find_value(self, peer, peer_identifier, key):
logger.info('handling find_value(%r, %r, %r)',
peer, peer_identifier, key)
if key in self.storage:
response = ('found', self.storage[key])
return (self.identifier, response)
response = ('notfound', self.routing_table.find_closest_peers(key, excluding=peer_identifier))
return (self.identifier, response)
@asyncio.coroutine
def ping_all_neighbors(self):
for node_id, peer in list(self.routing_table):
yield from self.ping(peer, self.identifier)
@asyncio.coroutine
def join(self, known_node):
"""
Run by a node when it wants to join the network.
http://xlattice.sourceforge.net/components/protocol/kademlia/specs.html#join
"""
# When a new node is created, ping some known_node
logger.info("Pinging %r", known_node)
try:
yield from self.ping(known_node, self.identifier)
except socket.timeout:
logger.warn("Could not ping %r", known_node)
return
# Try to find all peers close to myself
# (this'll update my routing table)
yield from self.lookup_node(self.identifier)
# Pinging all neighbors will update their routing tables
logger.info("Pinging all neighbors")
yield from self.ping_all_neighbors()
try:
# Check if my public key is already in the network
yield from self.get(self.identifier)
except KeyError:
# Store my information onto the network
# (allowing others to find me)
yield from self.put(self.identifier, (self.socket_addr, self.pub_key))
logger.info("Sending my genesis transaction %r", self.ledger.genesis_tx)
yield from self.add_tx_to_ledger(known_node, self.identifier, self.ledger.genesis_tx) # add it to the ledger of bootstrapper
ledger_bootstrap = yield from self.get_ledger(known_node, self.identifier) # get the bootstrapper's ledger
logger.info("Got Ledger %r", ledger_bootstrap)
self.ledger.record = ledger_bootstrap.record # replace my ledger with that of bootstrappers
yield from self.broadcast(random_id(), 'add_tx_to_ledger', self.identifier, self.ledger.genesis_tx) # broadcast my genesis transaction to everyone
# TODO: Refactor the hashed part
@asyncio.coroutine
def put(self, raw_key, value, hashed=True): # hashed True key being passed is already hashe
if(not hashed): # hashed False => key passed needs to be hashed to 160bit
hashed_key = sha1_int(raw_key)
else:
hashed_key = raw_key # dht key is node_id already hashed
peers_close_to_key = yield from self.lookup_node(hashed_key, find_value=False)
store_tasks = [
self.store(peer, self.identifier, hashed_key, value)
for _, peer in peers_close_to_key
]
results = yield from asyncio.gather(*store_tasks, return_exceptions=True)
successful = [r for r in results if r is True]
return len(successful)
@asyncio.coroutine
def get(self, raw_key, hashed=True): # hashed True key being passed is already hashe
if(not hashed): # hashed False => key passed needs to be hashed to 160bit
hashed_key = sha1_int(raw_key)
else:
hashed_key = raw_key
if hashed_key in self.storage:
return self.storage[hashed_key]
try:
response = yield from self.lookup_node(hashed_key, find_value=True)
except KeyError as e:
raise e
return response
@asyncio.coroutine
def lookup_node(self, hashed_key, find_value=False):
def distance(peer): return peer[0] ^ hashed_key
contacted, dead = set(), set()
peers = {
(peer_identifier, peer)
for peer_identifier, peer in
self.routing_table.find_closest_peers(hashed_key)
}
if not peers:
raise KeyError(hashed_key, 'No peers available.')
while True:
uncontacted = peers - contacted
if not uncontacted:
break
closest = sorted(uncontacted, key=distance)[:self.alpha]
for peer_identifier, peer in closest:
contacted.add((peer_identifier, peer))
try:
if find_value:
result, contacts = yield from self.find_value(peer, self.identifier, hashed_key)
if result == 'found':
return contacts
else:
contacts = yield from self.find_node(peer, self.identifier, hashed_key)
except socket.timeout:
self.routing_table.forget_peer(peer_identifier)
dead.add((peer_identifier, peer))
continue
for new_peer_identifier, new_peer in contacts:
if new_peer_identifier == self.identifier:
continue
peers.add((new_peer_identifier, new_peer))
if find_value:
raise KeyError(hashed_key, 'Not found among any available peers.')
else:
return sorted(peers - dead, key=distance)[:self.k]
@asyncio.coroutine
def broadcast(self, message_identifier, procedure_name, *args, **kwargs):
"""
Broadcast a message containing a procedure_name to all the nodes
who will then execute it.
Arguments:
message_identifier : unique msg id for each broadcast
procedure_name : name of the remote procedure to be executed
args : parameters for that procedure
"""
logger.info("sending a broadcast of procedure %r transaction: %r", procedure_name, args[1:])
if message_identifier not in self.broadcast_list:
self.broadcast_list.append(message_identifier)
# Create a mesage with its type, procedure_name and args
obj = ('broadcast', message_identifier, procedure_name, *args)
message = pickle.dumps(obj, protocol=0)
# Send the msg to each connected peer
for _, peer in self.routing_table:
self.transport.sendto(message, peer)