Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new aptos collector #62

Merged
merged 9 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,45 @@ def block_height(self):
def latency(self):
"""Returns connection latency."""
return self.interface.latest_query_latency


class AptosCollector():
"""A collector to fetch information about Aptos endpoints."""

def __init__(self, url, labels, chain_id, **client_parameters):

self.labels = labels
self.chain_id = chain_id
self.interface = HttpsInterface(url, client_parameters.get('open_timeout'),
client_parameters.get('ping_timeout'))

self._logger_metadata = {
'component': 'AptosCollector',
'url': strip_url(url)
}

def alive(self):
"""Returns true if endpoint is alive, false if not."""
# Run cached query because we can also fetch client version from this
# later on. This will save us an RPC call per run.
return self.interface.cached_rest_api_rpc_get() is not None

def block_height(self):
"""Runs a cached query to return block height"""
blockchain_info = self.interface.cached_rest_api_rpc_get()
return validate_dict_and_return_key_value(
blockchain_info, 'block_height', self._logger_metadata, to_number=True)

def client_version(self):
"""Runs a cached query to return client version."""
blockchain_info = self.interface.cached_rest_api_rpc_get()
version = validate_dict_and_return_key_value(
blockchain_info, 'git_hash', self._logger_metadata, stringify=True)
if version is None:
return None
client_version = {"client_version": version}
return client_version

def latency(self):
"""Returns connection latency."""
return self.interface.latest_query_latency
2 changes: 1 addition & 1 deletion src/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def endpoints(self):
def _load_configuration(self):
allowed_providers = self._load_validation_file()
supported_collectors = ('evm', 'cardano', 'conflux', 'solana',
'bitcoin', 'doge', 'filecoin', 'starknet')
'bitcoin', 'doge', 'filecoin', 'starknet', 'aptos')

configuration_schema = Schema({
'blockchain':
Expand Down
34 changes: 25 additions & 9 deletions src/helpers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Module for providing useful functions accessible globally."""

import urllib.parse
import json
from json.decoder import JSONDecodeError
from jsonrpcclient import Ok, parse_json

from log import logger


Expand All @@ -13,34 +13,50 @@ def strip_url(url) -> str:
return urllib.parse.urlparse(url).hostname


def return_and_validate_rpc_json_result(message: str, logger_metadata) -> dict:
def return_and_validate_json_result(message: str, json_type: str,logger_metadata) -> dict:
"""Loads json rpc response text and validates the response
as per JSON-RPC 2.0 Specification. In case the message is
as per JSON-RPC 2.0 Specification or JSON parsable if type is REST. In case the message is
not valid it returns None. This method is used by both HTTPS and
Websocket Interface."""
try:
parsed = parse_json(message)
if isinstance(parsed, Ok): # pylint: disable=no-else-return
return parsed.result
if json_type=='RPC':
parsed = parse_json(message)
if isinstance(parsed, Ok): # pylint: disable=no-else-return
return parsed.result
else:
logger.error('Error in RPC message.',
message=message, **logger_metadata)
else:
logger.error('Error in RPC message.',
message=message, **logger_metadata)
parsed = json.loads(message)
return parsed
except (JSONDecodeError, KeyError) as error:
logger.error('Invalid JSON RPC object in RPC message.',
message=message,
error=error,
**logger_metadata)
return None

def return_and_validate_rpc_json_result(message: str, logger_metadata) -> dict:
"""Validate that message is JSON parsable and per JSON-RPC specs"""
return return_and_validate_json_result(message,json_type='RPC',logger_metadata=logger_metadata)

def return_and_validate_rest_api_json_result(message: str, logger_metadata) -> dict:
"""Validate that message is JSON parsable"""
return return_and_validate_json_result(message,json_type='REST',logger_metadata=logger_metadata)

def validate_dict_and_return_key_value(data, key, logger_metadata, stringify=False):
def validate_dict_and_return_key_value(data, key, logger_metadata, stringify=False, to_number=False): # pylint: disable=line-too-long
"""Validates that a dict is provided and returns the key value either in
original form or as a string"""
if isinstance(data, dict):
value = data.get(key)
if value is not None:
if stringify:
return str(value)
if to_number:
try:
return float(value)
except ValueError:
return None
return value
logger.error("Provided data is not a dict or has no value for key",
key=key,
Expand Down
81 changes: 54 additions & 27 deletions src/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import requests
from urllib3 import Timeout

from helpers import strip_url, return_and_validate_rpc_json_result
from helpers import strip_url, return_and_validate_rpc_json_result, return_and_validate_rest_api_json_result # pylint: disable=line-too-long
from cache import Cache
from log import logger

Expand Down Expand Up @@ -37,37 +37,43 @@ def latest_query_latency(self):
self._latest_query_latency = None
return latency

def _return_and_validate_post_request(self, payload: dict) -> str:
"""Sends a POST request and validates the http response code. If
response code is OK, it returns the response.text, otherwise
it returns None.
"""
def _return_and_validate_request(self, method='GET', payload=None, params=None):
"""Sends a GET or POST request and validates the http response code."""
with self.session as ses:
try:
self._logger.debug("Querying endpoint.",
payload=payload,
**self._logger_metadata)
self._logger.debug(f"Querying endpoint with {method}.",
payload=payload,
params=params,
**self._logger_metadata)
start_time = perf_counter()
req = ses.post(self.url,
json=payload,
timeout=Timeout(connect=self.connect_timeout,
read=self.response_timeout))
if req.status_code == requests.codes.ok: # pylint: disable=no-member
if method.upper() == 'GET':
req = ses.get(self.url,
params=params,
timeout=Timeout(connect=self.connect_timeout,
read=self.response_timeout))
elif method.upper() == 'POST':
req = ses.post(self.url,
json=payload,
timeout=Timeout(connect=self.connect_timeout,
read=self.response_timeout))
else:
raise ValueError(f"Unsupported HTTP method: {method}")

if req.status_code == requests.codes.ok: # pylint: disable=no-member
self._latest_query_latency = perf_counter() - start_time
return req.text
except (IOError, requests.HTTPError,
json.decoder.JSONDecodeError) as error:
self._logger.error("Problem while sending a post request.",
payload=payload,
error=error,
**self._logger_metadata)
return None
except (IOError, requests.HTTPError, json.decoder.JSONDecodeError, ValueError) as error:
self._logger.error(f"Problem while sending a {method} request.",
payload=payload,
params=params,
error=error,
**self._logger_metadata)
return None

def json_rpc_post(self, payload):
"""Checks the validity of a successful json-rpc response. If any of the
validations fail, the method returns type None. """
response = self._return_and_validate_post_request(payload)
response = self._return_and_validate_request(method='POST', payload=payload)
if response is not None:
result = return_and_validate_rpc_json_result(
response, self._logger_metadata)
Expand All @@ -76,20 +82,41 @@ def json_rpc_post(self, payload):
return None

def cached_json_rpc_post(self, payload: dict):
"""Calls json_rpc_post and stores the result in in-memory
cache, by using payload as key.Method will always return
cached value after the first call. Cache never expires."""
cache_key = str(payload)
"""Calls json_rpc_post and stores the result in in-memory cache."""
cache_key = f"rpc:{str(payload)}"

if self.cache.is_cached(cache_key):
return_value = self.cache.retrieve_key_value(cache_key)
return return_value

value = self.json_rpc_post(payload)
value = self.json_rpc_post(payload=payload)
if value is not None:
self.cache.store_key_value(cache_key, value)
return value

def json_rest_api_get(self, params: dict = None):
"""Checks the validity of a successful json-rpc response. If any of the
validations fail, the method returns type None. """
response = self._return_and_validate_request(method='GET', params=params)
if response is not None:
result = return_and_validate_rest_api_json_result(
response, self._logger_metadata)
if result is not None:
return result
return None

def cached_rest_api_rpc_get(self, params: dict = None):
"""Calls json_rpc_get and stores the result in in-memory cache."""
cache_key = f"rest:{str(params)}"

if self.cache.is_cached(cache_key):
return_value = self.cache.retrieve_key_value(cache_key)
return return_value

value = self.json_rest_api_get(params)
if value is not None:
self.cache.store_key_value(cache_key, value)
return value

class WebsocketSubscription(threading.Thread): # pylint: disable=too-many-instance-attributes
"""A thread class used to subscribe and track
Expand Down
2 changes: 2 additions & 0 deletions src/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def get_collector_registry(self) -> list:
collector = collectors.SolanaCollector
case "starknet", "starknet":
collector = collectors.StarknetCollector
case "aptos", "aptos":
collector = collectors.AptosCollector
case "evm", other: # pylint: disable=unused-variable
collector = collectors.EvmCollector
if collector is None:
Expand Down
84 changes: 84 additions & 0 deletions src/test_collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,3 +650,87 @@ def test_latency(self):
"""Tests that the latency is obtained from the interface based on latest_query_latency"""
self.mocked_connection.return_value.latest_query_latency = 0.123
self.assertEqual(0.123, self.starknet_collector.latency())

class TestAptosCollector(TestCase):
"""Tests the Aptos collector class"""

def setUp(self):
self.url = "https://test.com"
self.labels = ["dummy", "labels"]
self.chain_id = 123
self.open_timeout = 8
self.ping_timeout = 9
self.client_params = {
"open_timeout": self.open_timeout, "ping_timeout": self.ping_timeout}
with mock.patch('collectors.HttpsInterface') as mocked_connection:
self.aptos_collector = collectors.AptosCollector(
self.url, self.labels, self.chain_id, **self.client_params)
self.mocked_connection = mocked_connection

def test_logger_metadata(self):
"""Validate logger metadata. Makes sure url is stripped by helpers.strip_url function."""
expected_metadata = {
'component': 'AptosCollector', 'url': 'test.com'}
self.assertEqual(expected_metadata,
self.aptos_collector._logger_metadata)

def test_https_interface_created(self):
"""Tests that the Aptos collector calls the https interface with the correct args"""
self.mocked_connection.assert_called_once_with(
self.url, self.open_timeout, self.ping_timeout)

def test_interface_attribute_exists(self):
"""Tests that the interface attribute exists."""
self.assertTrue(hasattr(self.aptos_collector, 'interface'))

def test_alive_call(self):
"""Tests the alive function uses the correct call"""
self.aptos_collector.alive()
self.mocked_connection.return_value.cached_rest_api_rpc_get.assert_called_once()

def test_alive_false(self):
"""Tests the alive function returns false when get returns None"""
self.mocked_connection.return_value.cached_rest_api_rpc_get.return_value = None
result = self.aptos_collector.alive()
self.assertFalse(result)

def test_block_height(self):
"""Tests the block_height function uses the correct call to get block height"""
self.aptos_collector.block_height()
self.mocked_connection.return_value.cached_rest_api_rpc_get.assert_called_once()

def test_block_height_returns_none(self):
"""Tests that the block height returns None if cached_rest_api_rpc_get returns None"""
self.mocked_connection.return_value.cached_rest_api_rpc_get.return_value = None
result = self.aptos_collector.block_height()
self.assertIsNone(result)

def test_client_version(self):
"""Tests the client_version function uses the correct call to get client version"""
self.aptos_collector.client_version()
self.mocked_connection.return_value.cached_rest_api_rpc_get.assert_called_once()

def test_client_version_get_git_hash(self):
"""Tests that the client version is returned as a string with the git_hash key"""
self.mocked_connection.return_value.cached_rest_api_rpc_get.return_value = {
"git_hash": "abcdef123"}
result = self.aptos_collector.client_version()
self.assertEqual({"client_version": "abcdef123"}, result)

def test_client_version_key_error_returns_none(self):
"""Tests that the client_version returns None on KeyError"""
self.mocked_connection.return_value.cached_rest_api_rpc_get.return_value = {
"dummy_key": "value"}
result = self.aptos_collector.client_version()
self.assertIsNone(result)

def test_client_version_returns_none(self):
"""Tests that the client_version returns None if cached_rest_api_rpc_get returns None"""
self.mocked_connection.return_value.cached_rest_api_rpc_get.return_value = None
result = self.aptos_collector.client_version()
self.assertIsNone(result)

def test_latency(self):
"""Tests that the latency is obtained from the interface based on latest_query_latency"""
self.mocked_connection.return_value.latest_query_latency = 0.123
self.assertEqual(0.123, self.aptos_collector.latency())
Loading