Skip to content

Commit

Permalink
tests: add metrics tests
Browse files Browse the repository at this point in the history
closes #933 #934 #935 #551

Signed-off-by: Evgeniy Zayats <[email protected]>
  • Loading branch information
Evgeniy Zayats committed Jan 30, 2025
1 parent 7eb19b4 commit c339657
Show file tree
Hide file tree
Showing 2 changed files with 297 additions and 0 deletions.
56 changes: 56 additions & 0 deletions pytest_tests/lib/helpers/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json
import logging
import re
from collections import defaultdict

import allure
import requests
from helpers.test_control import wait_for_success
from neofs_testlib.env.env import StorageNode

logger = logging.getLogger("NeoLogger")


def parse_prometheus_metrics(metrics_lines: str) -> dict:
parsed_metrics = defaultdict(list)
for line in metrics_lines.strip().splitlines():
if line.startswith("#"):
continue
match = re.match(r"^(?P<name>[^\{]+)(\{(?P<params>[^\}]+)\})?\s+(?P<value>\S+)$", line)
if match:
name = match.group("name")
params = match.group("params")
value = float(match.group("value"))

params_dict = {}
if params:
params_pairs = params.split(",")
for pair in params_pairs:
key, val = pair.split("=")
params_dict[key.strip()] = val.strip().strip('"')

parsed_metrics[name].append({"value": value, "params": params_dict})
return parsed_metrics


def get_metrics(sn: StorageNode) -> dict:
resp = requests.get(f"http://{sn.prometheus_address}")
if resp.status_code != 200:
raise AssertionError(f"Invalid status code from metrics url: {resp.status_code}; {resp.reason}; {resp.text};")
return parse_prometheus_metrics(resp.text)


@allure.step("Wait for correct metric value")
@wait_for_success(120, 1)
def wait_for_metric_to_arrive(sn: StorageNode, metric_name: str, expected_value: float):
metrics = get_metrics(sn)
allure.attach(
json.dumps(dict(metrics)),
"metrics",
allure.attachment_type.JSON,
)
actual_value = metrics[metric_name][0]["value"]
logger.info(f"Current value of {metric_name} = {actual_value}")
assert actual_value == expected_value, (
f"invalid value for {metric_name}, expected: {expected_value}, got: {actual_value}"
)
241 changes: 241 additions & 0 deletions pytest_tests/tests/metrics/test_sn_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
import json
import logging
import threading
import time
from importlib.resources import files

import allure
import neofs_env.neofs_epoch as neofs_epoch
import pytest
import yaml
from helpers.container import create_container, delete_container
from helpers.file_helper import generate_file
from helpers.metrics import get_metrics, wait_for_metric_to_arrive
from helpers.neofs_verbs import (
delete_object,
get_object,
get_range,
get_range_hash,
head_object,
put_object,
search_object,
)
from neofs_testlib.env.env import NeoFSEnv, NodeWallet, StorageNode

logger = logging.getLogger("NeoLogger")


@pytest.fixture(scope="module")
def single_noded_env():
neofs_env_config = yaml.safe_load(
files("neofs_testlib.env.templates").joinpath("neofs_env_config.yaml").read_text()
)
neofs_env = NeoFSEnv(neofs_env_config=neofs_env_config)
neofs_env.download_binaries()
neofs_env.deploy_inner_ring_nodes()
neofs_env.deploy_storage_nodes(
count=1,
node_attrs={
0: ["UN-LOCODE:RU MOW", "Price:22"],
},
)
neofs_env.neofs_adm().fschain.set_config(
rpc_endpoint=f"http://{neofs_env.fschain_rpc}",
alphabet_wallets=neofs_env.alphabet_wallets_dir,
post_data="ContainerFee=0 ContainerAliasFee=0 MaxObjectSize=524288",
)
time.sleep(30)
yield neofs_env
neofs_env.kill()


def test_sn_metrics(single_noded_env: NeoFSEnv, default_wallet: NodeWallet):
simple_object_size = 1000
sn = single_noded_env.storage_nodes[0]

cid = create_container(
default_wallet.path, shell=single_noded_env.shell, endpoint=single_noded_env.sn_rpc, rule="REP 1"
)

file_path = generate_file(simple_object_size)

oid = put_object(
default_wallet.path,
file_path,
cid,
single_noded_env.shell,
single_noded_env.sn_rpc,
)

get_object(
default_wallet.path,
cid,
oid,
single_noded_env.shell,
single_noded_env.sn_rpc,
)

head_object(
default_wallet.path,
cid,
oid,
shell=single_noded_env.shell,
endpoint=single_noded_env.sn_rpc,
)

search_object(
default_wallet.path,
cid,
shell=single_noded_env.shell,
endpoint=single_noded_env.sn_rpc,
expected_objects_list=[oid],
root=True,
)

get_range(
default_wallet.path,
cid,
oid,
shell=single_noded_env.shell,
endpoint=single_noded_env.sn_rpc,
range_cut="0:1",
)

get_range_hash(
default_wallet.path,
cid,
oid,
range_cut="0:1",
shell=single_noded_env.shell,
endpoint=single_noded_env.sn_rpc,
)

time.sleep(30)

with allure.step("Get metrics"):
after_metrics = get_metrics(sn)
allure.attach(json.dumps(dict(after_metrics)), "sn metrics", allure.attachment_type.JSON)

size_metrics_for_container = next(
(c for c in after_metrics["neofs_node_engine_container_size"] if c["params"]["cid"] == cid), None
)
assert size_metrics_for_container, "no metrics for the created container"
assert size_metrics_for_container["value"] == simple_object_size, (
"invalid value for neofs_node_engine_container_size"
)
assert after_metrics["neofs_node_object_get_payload"][0]["value"] == simple_object_size, (
"invalid value for neofs_node_object_get_payload"
)
assert after_metrics["neofs_node_object_put_payload"][0]["value"] == simple_object_size, (
"invalid value for neofs_node_object_put_payload"
)

for metric in (
"neofs_node_engine_put_time_count",
"neofs_node_engine_get_time_count",
"neofs_node_engine_search_time_count",
"neofs_node_object_get_req_count",
"neofs_node_object_get_req_count_success",
"neofs_node_object_rpc_get_time_count",
"neofs_node_object_rpc_range_time_count",
"neofs_node_object_rpc_search_time_count",
"neofs_node_object_search_req_count",
"neofs_node_object_search_req_count_success",
"neofs_node_engine_head_time_count",
"neofs_node_object_head_req_count",
"neofs_node_object_head_req_count_success",
"neofs_node_object_rpc_head_time_count",
"neofs_node_object_put_req_count",
"neofs_node_object_put_req_count_success",
"neofs_node_object_range_hash_req_count",
"neofs_node_object_range_hash_req_count_success",
"neofs_node_object_range_req_count",
"neofs_node_object_range_req_count_success",
):
assert after_metrics[metric][0]["value"] == 1, f"invalid value for {metric}"

assert after_metrics["neofs_node_engine_range_time_count"][0]["value"] == 2, (
"invalid value for neofs_node_engine_range_time_count"
)

for metric in (
"neofs_node_engine_put_time_bucket",
"neofs_node_engine_get_time_bucket",
"neofs_node_engine_range_time_bucket",
"neofs_node_engine_search_time_bucket",
"neofs_node_object_rpc_get_time_bucket",
"neofs_node_object_rpc_range_time_bucket",
"neofs_node_object_rpc_search_time_bucket",
"neofs_node_object_counter",
"neofs_node_engine_head_time_bucket",
"neofs_node_object_rpc_head_time_bucket",
"neofs_node_object_rpc_range_hash_time_bucket",
"neofs_node_object_rpc_put_time_bucket",
"neofs_node_engine_list_objects_time_bucket",
):
assert len([m for _, m in enumerate(after_metrics[metric]) if m["value"] >= 1]) >= 1, (
f"invalid value for {metric}"
)

assert after_metrics["neofs_node_engine_capacity"][0]["value"] == 494384795648.0, (
"invalid value for neofs_node_engine_capacity"
)

delete_object(
default_wallet.path,
cid,
oid,
shell=single_noded_env.shell,
endpoint=single_noded_env.sn_rpc,
)

time.sleep(30)

with allure.step("Get metrics after object deletion"):
after_metrics = get_metrics(sn)
allure.attach(json.dumps(dict(after_metrics)), "sn metrics object delete", allure.attachment_type.JSON)

for metric in ("neofs_node_object_delete_req_count", "neofs_node_object_delete_req_count_success"):
assert after_metrics[metric][0]["value"] == 1, f"invalid value for {metric}"

for metric in ("neofs_node_object_rpc_delete_time_bucket", "neofs_node_engine_inhume_time_bucket"):
assert len([m for _, m in enumerate(after_metrics[metric]) if m["value"] >= 1]) >= 1, (
f"invalid value for {metric}"
)

delete_container(
default_wallet.path, cid, shell=single_noded_env.shell, endpoint=single_noded_env.sn_rpc, await_mode=True
)


def test_sn_metrics_epoch(single_noded_env: NeoFSEnv):
fresh_epoch = neofs_epoch.ensure_fresh_epoch(single_noded_env)
wait_for_metric_to_arrive(single_noded_env.storage_nodes[0], "neofs_node_state_epoch", float(fresh_epoch))


def test_sn_metrics_node_version(single_noded_env: NeoFSEnv):
node_version = single_noded_env.storage_nodes[0]._get_version()
metrics = get_metrics(single_noded_env.storage_nodes[0])
assert metrics["neofs_node_version"][0]["params"]["version"] == node_version, "invalid value for neofs_node_version"


def test_sn_health_metrics(single_noded_env: NeoFSEnv):
SHUTTING_DOWN = 3.0
READY = 2.0
# There are also undefined and starting states, but it is not possible to reliably catch
# them under regular circumstances

new_storage_node = StorageNode(single_noded_env, 2, ["UN-LOCODE:RU LED", "Price:33"])
deploy_thread = threading.Thread(target=new_storage_node.start)
stopping_thread = threading.Thread(target=new_storage_node.stop)
try:
deploy_thread.start()
wait_for_metric_to_arrive(new_storage_node, "neofs_node_state_health", READY)
deploy_thread.join()
stopping_thread.start()
wait_for_metric_to_arrive(new_storage_node, "neofs_node_state_health", SHUTTING_DOWN)
finally:
if deploy_thread.is_alive:
deploy_thread.join()
if stopping_thread.is_alive:
stopping_thread.join()

0 comments on commit c339657

Please sign in to comment.