Skip to content

Commit

Permalink
Create new aptos collector (#62)
Browse files Browse the repository at this point in the history
* Refactor the HTTP interface to include get request

* Fix test interfaces, fix pylint

* Add first version of Aptos collector

* Add Aptos to the registries

* Working version of Aptos

* Fix styling

* Add test cases for Aptos

* Fix name for cached_rest_api_get

* Fix name for cached_json_rest_api_get
  • Loading branch information
tonynguyen-ccl authored Aug 26, 2024
1 parent 2a73342 commit 62b68f9
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 39 deletions.
42 changes: 42 additions & 0 deletions src/collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,45 @@ def block_height(self):
def latency(self):
"""Returns connection latency."""
return self.interface.latest_query_latency


class AptosCollector():
"""A collector to fetch information about Aptos endpoints."""

def __init__(self, url, labels, chain_id, **client_parameters):

self.labels = labels
self.chain_id = chain_id
self.interface = HttpsInterface(url, client_parameters.get('open_timeout'),
client_parameters.get('ping_timeout'))

self._logger_metadata = {
'component': 'AptosCollector',
'url': strip_url(url)
}

def alive(self):
"""Returns true if endpoint is alive, false if not."""
# Run cached query because we can also fetch client version from this
# later on. This will save us an RPC call per run.
return self.interface.cached_json_rest_api_get() is not None

def block_height(self):
"""Runs a cached query to return block height"""
blockchain_info = self.interface.cached_json_rest_api_get()
return validate_dict_and_return_key_value(
blockchain_info, 'block_height', self._logger_metadata, to_number=True)

def client_version(self):
"""Runs a cached query to return client version."""
blockchain_info = self.interface.cached_json_rest_api_get()
version = validate_dict_and_return_key_value(
blockchain_info, 'git_hash', self._logger_metadata, stringify=True)
if version is None:
return None
client_version = {"client_version": version}
return client_version

def latency(self):
"""Returns connection latency."""
return self.interface.latest_query_latency
2 changes: 1 addition & 1 deletion src/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def endpoints(self):
def _load_configuration(self):
allowed_providers = self._load_validation_file()
supported_collectors = ('evm', 'cardano', 'conflux', 'solana',
'bitcoin', 'doge', 'filecoin', 'starknet')
'bitcoin', 'doge', 'filecoin', 'starknet', 'aptos')

configuration_schema = Schema({
'blockchain':
Expand Down
34 changes: 25 additions & 9 deletions src/helpers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Module for providing useful functions accessible globally."""

import urllib.parse
import json
from json.decoder import JSONDecodeError
from jsonrpcclient import Ok, parse_json

from log import logger


Expand All @@ -13,34 +13,50 @@ def strip_url(url) -> str:
return urllib.parse.urlparse(url).hostname


def return_and_validate_rpc_json_result(message: str, logger_metadata) -> dict:
def return_and_validate_json_result(message: str, json_type: str,logger_metadata) -> dict:
"""Loads json rpc response text and validates the response
as per JSON-RPC 2.0 Specification. In case the message is
as per JSON-RPC 2.0 Specification or JSON parsable if type is REST. In case the message is
not valid it returns None. This method is used by both HTTPS and
Websocket Interface."""
try:
parsed = parse_json(message)
if isinstance(parsed, Ok): # pylint: disable=no-else-return
return parsed.result
if json_type=='RPC':
parsed = parse_json(message)
if isinstance(parsed, Ok): # pylint: disable=no-else-return
return parsed.result
else:
logger.error('Error in RPC message.',
message=message, **logger_metadata)
else:
logger.error('Error in RPC message.',
message=message, **logger_metadata)
parsed = json.loads(message)
return parsed
except (JSONDecodeError, KeyError) as error:
logger.error('Invalid JSON RPC object in RPC message.',
message=message,
error=error,
**logger_metadata)
return None

def return_and_validate_rpc_json_result(message: str, logger_metadata) -> dict:
"""Validate that message is JSON parsable and per JSON-RPC specs"""
return return_and_validate_json_result(message,json_type='RPC',logger_metadata=logger_metadata)

def return_and_validate_rest_api_json_result(message: str, logger_metadata) -> dict:
"""Validate that message is JSON parsable"""
return return_and_validate_json_result(message,json_type='REST',logger_metadata=logger_metadata)

def validate_dict_and_return_key_value(data, key, logger_metadata, stringify=False):
def validate_dict_and_return_key_value(data, key, logger_metadata, stringify=False, to_number=False): # pylint: disable=line-too-long
"""Validates that a dict is provided and returns the key value either in
original form or as a string"""
if isinstance(data, dict):
value = data.get(key)
if value is not None:
if stringify:
return str(value)
if to_number:
try:
return float(value)
except ValueError:
return None
return value
logger.error("Provided data is not a dict or has no value for key",
key=key,
Expand Down
81 changes: 54 additions & 27 deletions src/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import requests
from urllib3 import Timeout

from helpers import strip_url, return_and_validate_rpc_json_result
from helpers import strip_url, return_and_validate_rpc_json_result, return_and_validate_rest_api_json_result # pylint: disable=line-too-long
from cache import Cache
from log import logger

Expand Down Expand Up @@ -37,37 +37,43 @@ def latest_query_latency(self):
self._latest_query_latency = None
return latency

def _return_and_validate_post_request(self, payload: dict) -> str:
"""Sends a POST request and validates the http response code. If
response code is OK, it returns the response.text, otherwise
it returns None.
"""
def _return_and_validate_request(self, method='GET', payload=None, params=None):
"""Sends a GET or POST request and validates the http response code."""
with self.session as ses:
try:
self._logger.debug("Querying endpoint.",
payload=payload,
**self._logger_metadata)
self._logger.debug(f"Querying endpoint with {method}.",
payload=payload,
params=params,
**self._logger_metadata)
start_time = perf_counter()
req = ses.post(self.url,
json=payload,
timeout=Timeout(connect=self.connect_timeout,
read=self.response_timeout))
if req.status_code == requests.codes.ok: # pylint: disable=no-member
if method.upper() == 'GET':
req = ses.get(self.url,
params=params,
timeout=Timeout(connect=self.connect_timeout,
read=self.response_timeout))
elif method.upper() == 'POST':
req = ses.post(self.url,
json=payload,
timeout=Timeout(connect=self.connect_timeout,
read=self.response_timeout))
else:
raise ValueError(f"Unsupported HTTP method: {method}")

if req.status_code == requests.codes.ok: # pylint: disable=no-member
self._latest_query_latency = perf_counter() - start_time
return req.text
except (IOError, requests.HTTPError,
json.decoder.JSONDecodeError) as error:
self._logger.error("Problem while sending a post request.",
payload=payload,
error=error,
**self._logger_metadata)
return None
except (IOError, requests.HTTPError, json.decoder.JSONDecodeError, ValueError) as error:
self._logger.error(f"Problem while sending a {method} request.",
payload=payload,
params=params,
error=error,
**self._logger_metadata)
return None

def json_rpc_post(self, payload):
"""Checks the validity of a successful json-rpc response. If any of the
validations fail, the method returns type None. """
response = self._return_and_validate_post_request(payload)
response = self._return_and_validate_request(method='POST', payload=payload)
if response is not None:
result = return_and_validate_rpc_json_result(
response, self._logger_metadata)
Expand All @@ -76,20 +82,41 @@ def json_rpc_post(self, payload):
return None

def cached_json_rpc_post(self, payload: dict):
"""Calls json_rpc_post and stores the result in in-memory
cache, by using payload as key.Method will always return
cached value after the first call. Cache never expires."""
cache_key = str(payload)
"""Calls json_rpc_post and stores the result in in-memory cache."""
cache_key = f"rpc:{str(payload)}"

if self.cache.is_cached(cache_key):
return_value = self.cache.retrieve_key_value(cache_key)
return return_value

value = self.json_rpc_post(payload)
value = self.json_rpc_post(payload=payload)
if value is not None:
self.cache.store_key_value(cache_key, value)
return value

def json_rest_api_get(self, params: dict = None):
"""Checks the validity of a successful json-rpc response. If any of the
validations fail, the method returns type None. """
response = self._return_and_validate_request(method='GET', params=params)
if response is not None:
result = return_and_validate_rest_api_json_result(
response, self._logger_metadata)
if result is not None:
return result
return None

def cached_json_rest_api_get(self, params: dict = None):
"""Calls json_rest_api_get and stores the result in in-memory cache."""
cache_key = f"rest:{str(params)}"

if self.cache.is_cached(cache_key):
return_value = self.cache.retrieve_key_value(cache_key)
return return_value

value = self.json_rest_api_get(params)
if value is not None:
self.cache.store_key_value(cache_key, value)
return value

class WebsocketSubscription(threading.Thread): # pylint: disable=too-many-instance-attributes
"""A thread class used to subscribe and track
Expand Down
2 changes: 2 additions & 0 deletions src/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def get_collector_registry(self) -> list:
collector = collectors.SolanaCollector
case "starknet", "starknet":
collector = collectors.StarknetCollector
case "aptos", "aptos":
collector = collectors.AptosCollector
case "evm", other: # pylint: disable=unused-variable
collector = collectors.EvmCollector
if collector is None:
Expand Down
84 changes: 84 additions & 0 deletions src/test_collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,3 +650,87 @@ def test_latency(self):
"""Tests that the latency is obtained from the interface based on latest_query_latency"""
self.mocked_connection.return_value.latest_query_latency = 0.123
self.assertEqual(0.123, self.starknet_collector.latency())

class TestAptosCollector(TestCase):
"""Tests the Aptos collector class"""

def setUp(self):
self.url = "https://test.com"
self.labels = ["dummy", "labels"]
self.chain_id = 123
self.open_timeout = 8
self.ping_timeout = 9
self.client_params = {
"open_timeout": self.open_timeout, "ping_timeout": self.ping_timeout}
with mock.patch('collectors.HttpsInterface') as mocked_connection:
self.aptos_collector = collectors.AptosCollector(
self.url, self.labels, self.chain_id, **self.client_params)
self.mocked_connection = mocked_connection

def test_logger_metadata(self):
"""Validate logger metadata. Makes sure url is stripped by helpers.strip_url function."""
expected_metadata = {
'component': 'AptosCollector', 'url': 'test.com'}
self.assertEqual(expected_metadata,
self.aptos_collector._logger_metadata)

def test_https_interface_created(self):
"""Tests that the Aptos collector calls the https interface with the correct args"""
self.mocked_connection.assert_called_once_with(
self.url, self.open_timeout, self.ping_timeout)

def test_interface_attribute_exists(self):
"""Tests that the interface attribute exists."""
self.assertTrue(hasattr(self.aptos_collector, 'interface'))

def test_alive_call(self):
"""Tests the alive function uses the correct call"""
self.aptos_collector.alive()
self.mocked_connection.return_value.cached_json_rest_api_get.assert_called_once()

def test_alive_false(self):
"""Tests the alive function returns false when get returns None"""
self.mocked_connection.return_value.cached_json_rest_api_get.return_value = None
result = self.aptos_collector.alive()
self.assertFalse(result)

def test_block_height(self):
"""Tests the block_height function uses the correct call to get block height"""
self.aptos_collector.block_height()
self.mocked_connection.return_value.cached_json_rest_api_get.assert_called_once()

def test_block_height_returns_none(self):
"""Tests that the block height returns None if cached_json_rest_api_get returns None"""
self.mocked_connection.return_value.cached_json_rest_api_get.return_value = None
result = self.aptos_collector.block_height()
self.assertIsNone(result)

def test_client_version(self):
"""Tests the client_version function uses the correct call to get client version"""
self.aptos_collector.client_version()
self.mocked_connection.return_value.cached_json_rest_api_get.assert_called_once()

def test_client_version_get_git_hash(self):
"""Tests that the client version is returned as a string with the git_hash key"""
self.mocked_connection.return_value.cached_json_rest_api_get.return_value = {
"git_hash": "abcdef123"}
result = self.aptos_collector.client_version()
self.assertEqual({"client_version": "abcdef123"}, result)

def test_client_version_key_error_returns_none(self):
"""Tests that the client_version returns None on KeyError"""
self.mocked_connection.return_value.cached_json_rest_api_get.return_value = {
"dummy_key": "value"}
result = self.aptos_collector.client_version()
self.assertIsNone(result)

def test_client_version_returns_none(self):
"""Tests that the client_version returns None if cached_json_rest_api_get returns None"""
self.mocked_connection.return_value.cached_json_rest_api_get.return_value = None
result = self.aptos_collector.client_version()
self.assertIsNone(result)

def test_latency(self):
"""Tests that the latency is obtained from the interface based on latest_query_latency"""
self.mocked_connection.return_value.latest_query_latency = 0.123
self.assertEqual(0.123, self.aptos_collector.latency())
Loading

0 comments on commit 62b68f9

Please sign in to comment.