Skip to content

Commit

Permalink
[script/event_graph_bindings] increase #node, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
zerin committed Feb 11, 2025
1 parent 209b9bf commit 33960df
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 93 deletions.
68 changes: 32 additions & 36 deletions script/event_graph_bindings/broadcast_event.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,20 @@
from darkfi_eventgraph_py import event_graph as eg, p2p, sled
from consts import *
from utils import start_p2p, stop_p2p, get_fut_p2p, is_connected
import asyncio
import random
import time
import subprocess
import os
import threading
# number of nodes
N = 3
P2PDATASTORE_PATH = '/tmp/p2pdatastore'
SLED_DB_PATH = '/tmp/sleddb'
STARTING_PORT = 54321
os.system("rm -rf " + P2PDATASTORE_PATH+"*")
os.system("rm -rf " + SLED_DB_PATH+"*")
OUTBOUND_TIMEOUT = 2
CH_HANDSHAKE_TIMEOUT = 15
CH_HEARTBEAT_INTERVAL = 15
DISCOVERY_COOLOFF = 15
DISCOVERY_ATTEMPT = 5
REFINERY_INTERVAL = 15
WHITE_CONNECT_PERCENT = 70
GOLD_CONNECT_COUNT = 2
TIME_NO_CON = 60
W8_TIME = 60
W8_TIME_4_CON = 40
def is_connected(node):
return p2p.is_connected(node)

def get_random_node_idx():
return int(random.random()*(N-1))

async def start_p2p(w8_time, node):
await p2p.start_p2p(w8_time, node)

async def get_fut_p2p(settings):
return await p2p.new_p2p(settings)

def get_new_eg(node, sled_db):
return eg.new_event_graph(node, sled_db, P2PDATASTORE_PATH, False, 'dag', 1)

async def register_protocol(p2p_node, eg_node):
await p2p.register_protocol_p2p(p2p_node, eg_node)

# create p2p node
def get_seed_node(starting_port=STARTING_PORT):
inbound_port = starting_port
Expand Down Expand Up @@ -105,7 +80,6 @@ def new_nodes(seed_addr, starting_port=STARTING_PORT):
p2ps = []
event_graphs = []
for i in range(1, N):

inbound_port = starting_port + i
external_port = starting_port + i
node_id = str(inbound_port)
Expand Down Expand Up @@ -197,6 +171,7 @@ def event_id(event):
start_ts += [seed_t]
seed_register_t = threading.Thread(target=asyncio.run, args=(register_protocol(seed_p2p_ptr, seed_event_graph),))
seed_register_t.start()

############################
# create N nodes #
############################
Expand All @@ -220,6 +195,7 @@ def event_id(event):
eg_t = threading.Thread(target=asyncio.run, args=(register_protocol(node, egs[idx]),))
eg_t.start()
register_ts += [eg_t]

for t in register_ts:
t.join()

Expand All @@ -234,18 +210,22 @@ def event_id(event):
node_t.start()
start_ts += [node_t]

print("wait {} secs for nodes to connect".format(W8_TIME_4_CON))
time.sleep(W8_TIME_4_CON)

# insert event at random node
ids = insert_events(random_node, [event])
print('inserted event ids: {}'.format(str(ids[0])))
# wait for nodes to conenct
print("wait {} secs for nodes to connect".format(W8_TIME_4_CON))
time.sleep(W8_TIME_4_CON)

# broadcast the new event
random_node_p2p = p2ps[rnd_idx]
print('broadcasting event: {} through node: {}'.format(event, random_node_p2p))
asyncio.run(broadcast_event_onp2p(15, random_node_p2p, event))

for t in start_ts:
t.join()

for node in p2ps:
assert(is_connected(node))
print('node: {} is connected successfully'.format(node))
Expand All @@ -256,12 +236,28 @@ def event_id(event):
broadcasted_event_id = str(event_id(event))
received_event_id = str(event_id(received_event))
assert broadcasted_event_id == received_event_id, '{}, {}'.format(broadcasted_event_id, received_event_id)

time.sleep(30)
time.sleep(5)
# assert event is broadcast to all nodes
for evg in egs:
assert(evg.dag_len()==(N-1))
print('event graph: {} received broadcasted event'.format(evg))
# events + 1 = 2
assert(evg.dag_len()==2)
event = asyncio.run(get_event_by_id(evg, ids[0]))

print("Success! joining threads.")
for t in start_ts:
print("========================================")
print("= stop nodes =")
print("========================================")

stop_ts = []
seed_t = threading.Thread(target=asyncio.run, args=(stop_p2p(1, seed_p2p_ptr),))
seed_t.start()
stop_ts += [seed_t]

# stop the nodes first, then the seed.
for node in p2ps:
node_t = threading.Thread(target=asyncio.run, args=(stop_p2p(2, node),))
node_t.start()
stop_ts+=[node_t]

for t in stop_ts:
t.join()
19 changes: 19 additions & 0 deletions script/event_graph_bindings/consts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import os
# number of nodes
N = 10
P2PDATASTORE_PATH = '/tmp/p2pdatastore'
SLED_DB_PATH = '/tmp/sleddb'
STARTING_PORT = 53412
os.system("rm -rf " + P2PDATASTORE_PATH+"*")
os.system("rm -rf " + SLED_DB_PATH+"*")
OUTBOUND_TIMEOUT = 2
CH_HANDSHAKE_TIMEOUT = 15
CH_HEARTBEAT_INTERVAL = 15
DISCOVERY_COOLOFF = 15
DISCOVERY_ATTEMPT = 5
REFINERY_INTERVAL = 15
WHITE_CONNECT_PERCENT = 70
GOLD_CONNECT_COUNT = 2
TIME_NO_CON = 60
W8_TIME = 60
W8_TIME_4_CON = 40
90 changes: 33 additions & 57 deletions script/event_graph_bindings/start_stop_p2p.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,24 @@
from darkfi_eventgraph_py import p2p, sled
from darkfi_eventgraph_py import p2p
from consts import *
from utils import start_p2p, stop_p2p, get_fut_p2p, is_connected
import asyncio
import random
import time
import subprocess
import os
import threading
import time
# number of nodes
N = 3
P2PDATASTORE_PATH = '/tmp/p2pdatastore'
STARTING_PORT = 53412
os.system("rm -rf " + P2PDATASTORE_PATH+"*")
OUTBOUND_TIMEOUT = 2
CH_HANDSHAKE_TIMEOUT = 15
CH_HEARTBEAT_INTERVAL = 15
DISCOVERY_COOLOFF = 15
DISCOVERY_ATTEMPT = 5
REFINERY_INTERVAL = 15
WHITE_CONNECT_PERCENT = 70
GOLD_CONNECT_COUNT = 2
TIME_NO_CON = 60
W8_TIME = 60
async def start_p2p(w8_time, node):
await p2p.start_p2p(w8_time, node)

async def stop_p2p(w8_time, node):
await p2p.stop_p2p(w8_time, node)

async def get_fut_p2p(settings):
node = await p2p.new_p2p(settings)
return node

def get_seed_node(starting_port=STARTING_PORT):
inbound_port = starting_port
def get_peer_node(i, seed_addr, starting_port=STARTING_PORT):
inbound_port = starting_port + i
external_port = starting_port + i
print("node with port: {}".format(inbound_port))
addrs = p2p.Url("tcp://127.0.0.1:{}".format(inbound_port))
inbound_addrs = [addrs]
external_addrs = [addrs]
node_id = str(inbound_port)
print("seed with port: {}".format(inbound_port))
seed_addr = p2p.Url("tcp://127.0.0.1:{}".format(inbound_port))
inbound_addrs = [seed_addr]
external_addrs = [seed_addr]
peers = []
seeds = []
seeds = [seed_addr]
app_version = p2p.new_version(0, 1, 1, '')
allowed_transports = ['tcp']
transport_mixing = True
outbound_connections = 0
outbound_connections = 100
inbound_connections = 10000
outbound_connect_timeout = OUTBOUND_TIMEOUT
channel_handshake_timeout = CH_HANDSHAKE_TIMEOUT
Expand Down Expand Up @@ -87,23 +62,22 @@ def get_seed_node(starting_port=STARTING_PORT):
blacklist,
ban_policy
)
seed_p2p_ptr = asyncio.run(get_fut_p2p(settings))
return seed_p2p_ptr, seed_addr
p2p_ptr = asyncio.run(get_fut_p2p(settings))
return p2p_ptr

def get_peer_node(i, seed_addr, starting_port=STARTING_PORT):
inbound_port = starting_port + i
external_port = starting_port + i
print("node with port: {}".format(inbound_port))
addrs = p2p.Url("tcp://127.0.0.1:{}".format(inbound_port))
inbound_addrs = [addrs]
external_addrs = [addrs]
def get_seed_node(starting_port=STARTING_PORT):
inbound_port = starting_port
node_id = str(inbound_port)
print("seed with port: {}".format(inbound_port))
seed_addr = p2p.Url("tcp://127.0.0.1:{}".format(inbound_port))
inbound_addrs = [seed_addr]
external_addrs = [seed_addr]
peers = []
seeds = [seed_addr]
seeds = []
app_version = p2p.new_version(0, 1, 1, '')
allowed_transports = ['tcp']
transport_mixing = True
outbound_connections = 100
outbound_connections = 0
inbound_connections = 10000
outbound_connect_timeout = OUTBOUND_TIMEOUT
channel_handshake_timeout = CH_HANDSHAKE_TIMEOUT
Expand Down Expand Up @@ -147,8 +121,8 @@ def get_peer_node(i, seed_addr, starting_port=STARTING_PORT):
blacklist,
ban_policy
)
p2p_ptr = asyncio.run(get_fut_p2p(settings))
return p2p_ptr
seed_p2p_ptr = asyncio.run(get_fut_p2p(settings))
return seed_p2p_ptr, seed_addr

# create p2p node
def new_nodes(seed_addr, starting_port=STARTING_PORT):
Expand All @@ -162,30 +136,31 @@ def new_nodes(seed_addr, starting_port=STARTING_PORT):
nodes+=[p2p_ptr]
return nodes

def is_connected(node):
return p2p.is_connected(node)

# create N nodes
seed_p2p_ptr, seed_addr = get_seed_node()
print("=====================================")
print("starting seed node on {}".format(seed_addr))
print("=====================================")

ts = []
start_ts = []
seed_t = threading.Thread(target=asyncio.run, args=(start_p2p(W8_TIME, seed_p2p_ptr),))
seed_t.start()
ts+=[seed_t]
start_ts+=[seed_t]
p2ps = new_nodes(seed_addr)
for idx, node in enumerate(p2ps):
print("========================================")
print("starting node: {}".format(node))
print("========================================")
node_t = threading.Thread(target=asyncio.run, args=(start_p2p(W8_TIME, node),))
node_t.start()
ts+=[node_t]
start_ts+=[node_t]

# wait for peers to connect
time.sleep(40)
print("wait {} secs for nodes to connect".format(W8_TIME_4_CON))
time.sleep(W8_TIME_4_CON)

for t in start_ts:
t.join()

for node in p2ps:
assert(is_connected(node))
Expand All @@ -198,6 +173,7 @@ def is_connected(node):
seed_t = threading.Thread(target=asyncio.run, args=(stop_p2p(1, seed_p2p_ptr),))
seed_t.start()
stop_ts += [seed_t]

# stop the nodes first, then the seed.
for node in p2ps:
node_t = threading.Thread(target=asyncio.run, args=(stop_p2p(2, node),))
Expand Down
14 changes: 14 additions & 0 deletions script/event_graph_bindings/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from darkfi_eventgraph_py import p2p

async def start_p2p(w8_time, node):
await p2p.start_p2p(w8_time, node)

async def stop_p2p(w8_time, node):
await p2p.stop_p2p(w8_time, node)

async def get_fut_p2p(settings):
node = await p2p.new_p2p(settings)
return node

def is_connected(node):
return p2p.is_connected(node)

0 comments on commit 33960df

Please sign in to comment.