Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test/store v3 #38

Merged
merged 25 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
log/
.vscode
allure-results/
postgresql

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pytest-rerunfailures==13.0
pytest-timeout==2.2.0
pytest-xdist==3.5.0
python-dotenv==1.0.1
pytest-dependency==0.6.0
PyYAML==6.0.1
requests==2.31.0
setuptools==69.0.3
Expand Down
2 changes: 2 additions & 0 deletions src/env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ def get_env_var(var_name, default=None):
NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68")
API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 20)
RLN_CREDENTIALS = get_env_var("RLN_CREDENTIALS")
PG_USER = get_env_var("POSTGRES_USER", "postgres")
PG_PASS = get_env_var("POSTGRES_PASSWORD", "test123")

# example for .env file
# RLN_CREDENTIALS = {"rln-relay-cred-password": "password", "rln-relay-eth-client-address": "wss://sepolia.infura.io/ws/v3/api_key", "rln-relay-eth-contract-address": "0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4", "rln-relay-eth-private-key-1": "1111111111111111111111111111111111111111111111111111111111111111", "rln-relay-eth-private-key-2": "1111111111111111111111111111111111111111111111111111111111111111"}
30 changes: 15 additions & 15 deletions src/node/api_clients/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,29 @@ def get_filter_messages(self, content_topic, pubsub_topic=None):
return get_messages_response.json()

def get_store_messages(
self, peerAddr, includeData, pubsubTopic, contentTopics, startTime, endTime, hashes, cursor, pageSize, ascending, store_v, **kwargs
self, peer_addr, include_data, pubsub_topic, content_topics, start_time, end_time, hashes, cursor, page_size, ascending, store_v, **kwargs
):
base_url = f"store/{store_v}/messages"
params = []

if peerAddr is not None:
params.append(f"peerAddr={quote(peerAddr, safe='')}")
if includeData is not None:
params.append(f"includeData={includeData}")
if pubsubTopic is not None:
params.append(f"pubsubTopic={quote(pubsubTopic, safe='')}")
if contentTopics is not None:
params.append(f"contentTopics={quote(contentTopics, safe='')}")
if startTime is not None:
params.append(f"startTime={startTime}")
if endTime is not None:
params.append(f"endTime={endTime}")
if peer_addr is not None:
params.append(f"peerAddr={quote(peer_addr, safe='')}")
if include_data is not None:
params.append(f"includeData={include_data}")
if pubsub_topic is not None:
params.append(f"pubsubTopic={quote(pubsub_topic, safe='')}")
if content_topics is not None:
params.append(f"contentTopics={quote(content_topics, safe='')}")
if start_time is not None:
params.append(f"startTime={start_time}")
if end_time is not None:
params.append(f"endTime={end_time}")
if hashes is not None:
params.append(f"hashes={quote(hashes, safe='')}")
if cursor is not None:
params.append(f"cursor={quote(cursor, safe='')}")
if pageSize is not None:
params.append(f"pageSize={pageSize}")
if page_size is not None:
params.append(f"pageSize={page_size}")
if ascending is not None:
params.append(f"ascending={ascending}")

Expand Down
4 changes: 2 additions & 2 deletions src/node/docker_mananger.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def create_network(self, network_name=NETWORK_NAME):
logger.debug(f"Network {network_name} created")
return network

def start_container(self, image_name, ports, args, log_path, container_ip, volumes):
def start_container(self, image_name, ports, args, log_path, container_ip, volumes, remove_container=True):
cli_args = []
for key, value in args.items():
if isinstance(value, list): # Check if value is a list
Expand All @@ -46,7 +46,7 @@ def start_container(self, image_name, ports, args, log_path, container_ip, volum
cli_args_str_for_log = " ".join(cli_args)
logger.debug(f"docker run -i -t {port_bindings_for_log} {image_name} {cli_args_str_for_log}")
container = self._client.containers.run(
image_name, command=cli_args, ports=port_bindings, detach=True, remove=True, auto_remove=True, volumes=volumes
image_name, command=cli_args, ports=port_bindings, detach=True, remove=remove_container, auto_remove=remove_container, volumes=volumes
)

network = self._client.networks.get(NETWORK_NAME)
Expand Down
93 changes: 93 additions & 0 deletions src/node/store_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
class StoreResponse:
def __init__(self, store_response, node):
self.response = store_response
self.node = node

@property
def request_id(self):
try:
if self.node.is_nwaku():
return self.response.get("requestId")
else:
return self.response.get("request_id")
except:
return None

@property
def status_code(self):
try:
if self.node.is_nwaku():
return self.response.get("statusCode")
else:
return self.response.get("status_code")
except:
return None

@property
def status_desc(self):
try:
if self.node.is_nwaku():
return self.response.get("statusDesc")
else:
return self.response.get("status_desc")
except:
return None

@property
def messages(self):
try:
return self.response.get("messages")
except:
return None

@property
def pagination_cursor(self):
try:
if self.node.is_nwaku():
return self.response.get("paginationCursor")
else:
return self.response.get("pagination_cursor")
except:
return None

def message_hash(self, index):
if self.messages is not None:
if self.node.is_nwaku():
return self.messages[index]["messageHash"]
else:
return self.messages[index]["message_hash"]
else:
return None

def message_payload(self, index):
try:
if self.messages is not None:
payload = self.messages[index]["message"]["payload"]
return payload
else:
return None
except IndexError:
return None

def message_at(self, index):
try:
if self.messages is not None:
message = self.messages[index]["message"]
return message
else:
return None
except IndexError:
return None

def message_pubsub_topic(self, index):
if self.messages is not None:
if self.node.is_nwaku():
return self.messages[index]["pubsubTopic"]
else:
return self.messages[index]["pubsub_topic"]
else:
return None

@property
def resp_json(self):
return self.response
1 change: 1 addition & 0 deletions src/node/waku_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class MessageRpcResponse:
timestamp: Optional[int]
ephemeral: Optional[bool]
meta: Optional[str]
rateLimitProof: Optional[str] = field(default_factory=dict)
rate_limit_proof: Optional[dict] = field(default_factory=dict)


Expand Down
50 changes: 41 additions & 9 deletions src/node/waku_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ def start(self, wait_for_node_sec=20, **kwargs):
else:
raise NotImplementedError("Not implemented for this node type")

if "remove_container" in kwargs:
remove_container = kwargs["remove_container"]
del kwargs["remove_container"]
else:
remove_container = True

default_args.update(sanitize_docker_flags(kwargs))

rln_args, rln_creds_set, keystore_path = self.parse_rln_credentials(default_args, False)
Expand All @@ -116,7 +122,13 @@ def start(self, wait_for_node_sec=20, **kwargs):
logger.debug(f"Using volumes {self._volumes}")

self._container = self._docker_manager.start_container(
self._docker_manager.image, self._ports, default_args, self._log_path, self._ext_ip, self._volumes
self._docker_manager.image,
ports=self._ports,
args=default_args,
log_path=self._log_path,
container_ip=self._ext_ip,
volumes=self._volumes,
remove_container=remove_container,
)

logger.debug(f"Started container from image {self._image_name}. REST: {self._rest_port}")
Expand Down Expand Up @@ -168,6 +180,10 @@ def stop(self):
if self._container:
logger.debug(f"Stopping container with id {self._container.short_id}")
self._container.stop()
try:
self._container.remove()
except:
pass
self._container = None
logger.debug("Container stopped.")

Expand Down Expand Up @@ -291,18 +307,30 @@ def get_filter_messages(self, content_topic, pubsub_topic=None):
return self._api.get_filter_messages(content_topic, pubsub_topic)

def get_store_messages(
self, peerAddr, includeData, pubsubTopic, contentTopics, startTime, endTime, hashes, cursor, pageSize, ascending, store_v, **kwargs
self,
peer_addr=None,
include_data=None,
pubsub_topic=None,
content_topics=None,
start_time=None,
end_time=None,
hashes=None,
cursor=None,
page_size=None,
ascending=None,
store_v="v3",
**kwargs,
):
return self._api.get_store_messages(
peerAddr=peerAddr,
includeData=includeData,
pubsubTopic=pubsubTopic,
contentTopics=contentTopics,
startTime=startTime,
endTime=endTime,
peer_addr=peer_addr,
include_data=include_data,
pubsub_topic=pubsub_topic,
content_topics=content_topics,
start_time=start_time,
end_time=end_time,
hashes=hashes,
cursor=cursor,
pageSize=pageSize,
page_size=page_size,
ascending=ascending,
store_v=store_v,
**kwargs,
Expand Down Expand Up @@ -393,3 +421,7 @@ def parse_rln_credentials(self, default_args, is_registration):
raise NotImplementedError("Not implemented for type other than Nim Waku ")

return rln_args, True, keystore_path

@property
def container(self):
return self._container
44 changes: 44 additions & 0 deletions src/postgres_setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import docker
import os
from src.env_vars import NETWORK_NAME, PG_PASS, PG_USER
from src.libs.custom_logger import get_custom_logger

logger = get_custom_logger(__name__)


def start_postgres():
pg_env = {"POSTGRES_USER": PG_USER, "POSTGRES_PASSWORD": PG_PASS}

base_path = os.path.abspath(".")
volumes = {os.path.join(base_path, "postgresql"): {"bind": "/var/lib/postgresql/data", "mode": "Z"}}

client = docker.from_env()

postgres_container = client.containers.run(
"postgres:15.4-alpine3.18",
name="postgres",
environment=pg_env,
volumes=volumes,
command="postgres",
ports={"5432/tcp": ("127.0.0.1", 5432)},
restart_policy={"Name": "on-failure", "MaximumRetryCount": 5},
healthcheck={
"Test": ["CMD-SHELL", "pg_isready -U postgres -d postgres"],
"Interval": 30000000000, # 30 seconds in nanoseconds
"Timeout": 60000000000, # 60 seconds in nanoseconds
"Retries": 5,
"StartPeriod": 80000000000, # 80 seconds in nanoseconds
},
detach=True,
network_mode=NETWORK_NAME,
)

logger.debug("Postgres container started")

return postgres_container


def stop_postgres(postgres_container):
postgres_container.stop()
postgres_container.remove()
logger.debug("Postgres container stopped and removed.")
14 changes: 14 additions & 0 deletions src/steps/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import base64
import hashlib
import inspect
from time import time
import allure
Expand Down Expand Up @@ -34,3 +36,15 @@ def create_message(self, **kwargs):
message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)}
message.update(kwargs)
return message

@allure.step
def compute_message_hash(self, pubsub_topic, msg):
ctx = hashlib.sha256()
ctx.update(pubsub_topic.encode("utf-8"))
ctx.update(base64.b64decode(msg["payload"]))
ctx.update(msg["contentTopic"].encode("utf-8"))
if "meta" in msg:
ctx.update(base64.b64decode(msg["meta"]))
ctx.update(int(msg["timestamp"]).to_bytes(8, byteorder="big"))
hash_bytes = ctx.digest()
return base64.b64encode(hash_bytes).decode("utf-8")
Loading
Loading