Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E2E_PART2 #83

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def get_env_var(var_name, default=None):
# Configuration constants. Need to be upercase to appear in reports
DEFAULT_NWAKU = "wakuorg/nwaku:latest"
DEFAULT_GOWAKU = "wakuorg/go-waku:latest"
STRESS_ENABLED = False
NODE_1 = get_env_var("NODE_1", DEFAULT_NWAKU)
NODE_2 = get_env_var("NODE_2", DEFAULT_NWAKU)
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{DEFAULT_NWAKU},{DEFAULT_GOWAKU},{DEFAULT_NWAKU}")
Expand All @@ -26,7 +27,7 @@ def get_env_var(var_name, default=None):
IP_RANGE = get_env_var("IP_RANGE", "172.18.0.0/24")
GATEWAY = get_env_var("GATEWAY", "172.18.0.1")
RUNNING_IN_CI = get_env_var("CI")
API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 20)
API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 50)
RLN_CREDENTIALS = get_env_var("RLN_CREDENTIALS")
PG_USER = get_env_var("POSTGRES_USER", "postgres")
PG_PASS = get_env_var("POSTGRES_PASSWORD", "test123")
Expand Down
2 changes: 2 additions & 0 deletions src/node/waku_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ def start(self, wait_for_node_sec=20, **kwargs):
"rln-creds-id": None,
"rln-creds-source": None,
"nodekey": self.generate_random_nodekey(),
# "max-connections": "1000",
# "filter-subscription-timeout": "600",
}

if self.is_gowaku():
Expand Down
130 changes: 129 additions & 1 deletion tests/e2e/test_e2e.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from src.env_vars import NODE_1, NODE_2
from src.env_vars import NODE_1, NODE_2, STRESS_ENABLED
from src.libs.common import delay
from src.libs.custom_logger import get_custom_logger
from src.node.waku_node import WakuNode
Expand Down Expand Up @@ -177,3 +177,131 @@ def test_chain_of_relay_nodes(self):

# self.node1 relays and we check that self.node10 receives the message
self.check_published_message_reaches_relay_peer(sender=self.node1, peer_list=[self.node10], message_propagation_delay=1)

@pytest.mark.timeout(60 * 7)
def test_filter_20_senders_1_receiver(self):
total_senders = 20
if "go-waku" in NODE_2:
total_senders = 10
node_list = []

logger.debug(f"Start {total_senders} nodes to publish messages ")
self.node1.start(relay="true")
node_list.append(self.node1)
for i in range(total_senders - 1):
node_list.append(WakuNode(NODE_2, f"nodes{i + 1}_{self.test_id}"))
delay(1)
node_list[i + 1].start(relay="true", discv5_bootstrap_node=node_list[i].get_enr_uri())
delay(3)

logger.debug(f"Start filter node and subscribed filter node ")
self.node21 = WakuNode(NODE_1, f"node21_{self.test_id}")
self.node22 = WakuNode(NODE_1, f"node22_{self.test_id}")
self.node21.start(relay="true", filter="true", store="false", discv5_bootstrap_node=node_list[total_senders - 1].get_enr_uri())
self.node22.start(
relay="false", filter="true", filternode=self.node21.get_multiaddr_with_id(), store="false", discv5_bootstrap_node=self.node21
)

node_list.append(self.node21)

logger.debug(f"Subscribe nodes to relay pubsub topic {self.test_pubsub_topic}")
for node in node_list:
node.set_relay_subscriptions([self.test_pubsub_topic])
logger.debug(f"Node22 make filter request to pubsubtopic {self.test_pubsub_topic} and content topic {self.test_content_topic}")
self.node22.set_filter_subscriptions({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic})
self.wait_for_autoconnection(node_list, hard_wait=80)

logger.debug(f"{total_senders} Nodes publish {total_senders} messages")
for node in node_list[:-1]:
self.publish_message(sender=node, pubsub_topic=self.test_pubsub_topic, message=self.create_message())
delay(1)

logger.debug("Node 22 requests messages of subscribed filter topic")
messages_response = self.get_filter_messages(self.test_content_topic, pubsub_topic=self.test_pubsub_topic, node=self.node22)

logger.debug(f"Total number received messages for node 22 is {len(messages_response)}")
assert len(messages_response) == total_senders, f"Received messages != published which is {total_senders} !!"

@pytest.mark.timeout(60 * 7)
def test_filter_3_senders_multiple_msg_1_receiver(self):
messages_num = 3
total_senders = 3
self.node4 = WakuNode(NODE_1, f"node4_{self.test_id}")
self.node5 = WakuNode(NODE_1, f"node5_{self.test_id}")
node_list = []

logger.debug("Start 5 nodes")
self.node1.start(relay="true", store="false")
self.node2.start(relay="true", store="false", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", store="false", filter="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.node4.start(relay="true", filter="true", store="false", discv5_bootstrap_node=self.node3.get_enr_uri())
self.node5.start(
relay="false", filter="true", filternode=self.node4.get_multiaddr_with_id(), store="false", discv5_bootstrap_node=self.node3.get_enr_uri()
)

logger.debug(f"Subscribe nodes to relay pubsub topic {self.test_pubsub_topic}")
node_list = [self.node1, self.node2, self.node3, self.node4]
for node in node_list:
node.set_relay_subscriptions([self.test_pubsub_topic])

logger.debug(f"Node5 makes filter request pubsubtopic {self.test_pubsub_topic} and content topic {self.test_content_topic}")
self.node5.set_filter_subscriptions({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic})
node_list.append(self.node5)
self.wait_for_autoconnection(node_list, hard_wait=60)

logger.debug(f" {total_senders} Nodes publish {messages_num} message")
for node in node_list[:-2]:
for i in range(messages_num // total_senders):
self.publish_message(sender=node, pubsub_topic=self.test_pubsub_topic, message=self.create_message())
delay(2)

logger.debug(f"Node5 requests messages of subscribed filter topic {self.test_pubsub_topic}")
messages_response = self.get_filter_messages(self.test_content_topic, pubsub_topic=self.test_pubsub_topic, node=self.node5)
logger.debug(f"Response for node 5 is {messages_response}")
assert len(messages_response) == messages_num, f"Received messages != published which is{messages_num} !!"

@pytest.mark.timeout(60 * 5)
def test_filter_many_subscribed_nodes(self):
max_subscribed_nodes = 2
if STRESS_ENABLED:
max_subscribed_nodes = 50
node_list = []
response_list = []
logger.debug("Start 2 nodes")
self.node1.start(relay="true", store="false")
self.node2.start(relay="true", filter="true", store="false", discv5_bootstrap_node=self.node1.get_enr_uri())

logger.debug(f"Subscribe nodes to relay pubsub topic {self.test_pubsub_topic}")
node_list_relay = [self.node1, self.node2]
for node in node_list_relay:
node.set_relay_subscriptions([self.test_pubsub_topic])

node_list.append(self.node2)
logger.debug(f"{max_subscribed_nodes} Node start and making filter requests to node2")
for i in range(max_subscribed_nodes):
node_list.append(WakuNode(NODE_2, f"node{i+2}_{self.test_id}"))
delay(0.1)
node_list[i + 1].start(
relay="false",
filter="true",
filternode=self.node2.get_multiaddr_with_id(),
discv5_bootstrap_node=node_list[i].get_enr_uri(),
store="false",
)
delay(1)
node_list[i + 1].set_filter_subscriptions(
{"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}
)
self.wait_for_autoconnection(node_list_relay, hard_wait=100)

logger.debug("Node1 publish message")
self.publish_message(sender=self.node1, pubsub_topic=self.test_pubsub_topic, message=self.create_message())
delay(4)

logger.debug(f"{max_subscribed_nodes} Node requests the published message of subscribed filter topic")
for i in range(max_subscribed_nodes):
messages_response = self.get_filter_messages(self.test_content_topic, pubsub_topic=self.test_pubsub_topic, node=node_list[i + 1])
logger.debug(f"Response for node {i+1} is {messages_response}")
response_list.append(messages_response)

assert len(response_list) == max_subscribed_nodes, "Received message count doesn't match sent "
Loading