diff --git a/examples/thread_safe_substrate_interface.py b/examples/thread_safe_substrate_interface.py new file mode 100644 index 0000000..66de025 --- /dev/null +++ b/examples/thread_safe_substrate_interface.py @@ -0,0 +1,128 @@ +# Copyright 2021 Vincent Texier +# +# This software is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import logging +import time +from queue import Queue +from threading import Thread + +from substrateinterface import SubstrateInterface + + +class ThreadSafe(Thread): + + queue: Queue = Queue() + + def __init__(self, *args, **kwargs): + """ + Init a SubstrateInterface client adapter instance as a thread + + :param args: Positional arguments + :param kwargs: Keywords arguments + """ + super().__init__(*args, **kwargs) + + def run(self): + """ + Started asynchronously with Thread.start() + + :return: + """ + while True: + # print("loop...") + call, method, args, result_handler, result = self.queue.get() + result_ = dict() + # print(call, method, args, result_handler, result) + if call == "--close--": + logging.debug("Close queue thread on substrate_interface") + break + + try: + # logging.debug(f"threadsafe call to rpc method {method}") + result_ = call(method, args, result_handler) + except Exception as exception: + logging.error(method) + logging.error(args) + # logging.exception(exception) + result.put(exception) + # print(call.__name__, " put result ", result_) + result.put(result_) + # print("reloop...") + + logging.debug("SubstrateInterface connection closed and thread terminated.") + + def close(self): + """ + Close connection + + :return: + """ + # Closing the connection + self.queue.put(("--close--", None, None, None, None)) + + +class ThreadSafeSubstrateInterface(SubstrateInterface): + """ + Override substrate_interface client class with a queue to be thread safe + + """ + + def __init__(self, *args, **kwargs): + """ + Init a SubstrateInterface client adapter instance as a thread + + :param args: Positional arguments + :param kwargs: Keywords arguments + """ + # create and start thread before calling parent init (which makes a rpc_request!) + self.thread = ThreadSafe() + self.thread.start() + + super().__init__(*args, **kwargs) + + def rpc_request(self, method, params, result_handler=None) -> dict: + """ + Override rpc_request method to use threadsafe queue + + :param method: Name of the RPC method + :param params: Params of the RPC method + :param result_handler: Optional variable to receive results, default to None + :return: + """ + result: Queue = Queue() + self.thread.queue.put( + (super().rpc_request, method, params, result_handler, result) + ) + # print(self.thread.queue.get()) + # print('done calling %s' % method) + return_ = result.get() + if isinstance(return_, Exception): + raise return_ + return return_ + + def close(self): + logging.debug("Close RPC connection thread") + self.thread.close() + + +if __name__ == '__main__': + # start Substrate instance as a thread... + substrate = ThreadSafeSubstrateInterface( + url="ws://127.0.0.1:9944" + ) + while True: + substrate.get_block() + number = substrate.get_block_number(substrate.block_hash) + print(number, substrate.block_hash, substrate.version, substrate.runtime_version) + time.sleep(6) diff --git a/substrateinterface/base.py b/substrateinterface/base.py index ae4d497..fcf6ec6 100644 --- a/substrateinterface/base.py +++ b/substrateinterface/base.py @@ -254,7 +254,31 @@ def rpc_request(self, method, params, result_handler=None): ------- a dict with the parsed result of the request. """ + if self.websocket: + json_body = self.rpc_request_websocket(method, params, result_handler) + else: + if result_handler: + raise ConfigurationError("Result handlers only available for websockets (ws://) connections") + + json_body = self.rpc_request_http(method, params) + + return json_body + + def rpc_request_websocket(self, method, params, result_handler=None): + """ + Method that handles the actual RPC request to the Substrate node. The other implemented functions eventually + use this method to perform the request. + + Parameters + ---------- + result_handler: Callback function that processes the result received from the node + method: method of the JSONRPC request + params: a list containing the parameters of the JSONRPC request + Returns + ------- + a dict with the parsed result of the request. + """ request_id = self.request_id self.request_id += 1 @@ -267,83 +291,101 @@ def rpc_request(self, method, params, result_handler=None): self.debug_message('RPC request #{}: "{}"'.format(request_id, method)) - if self.websocket: - try: - self.websocket.send(json.dumps(payload)) - except WebSocketConnectionClosedException: - if self.config.get('auto_reconnect') and self.url: - # Try to reconnect websocket and retry rpc_request - self.debug_message("Connection Closed; Trying to reconnecting...") - self.connect_websocket() - - return self.rpc_request(method=method, params=params, result_handler=result_handler) - else: - # websocket connection is externally created, re-raise exception - raise + try: + self.websocket.send(json.dumps(payload)) + except WebSocketConnectionClosedException: + if self.config.get('auto_reconnect') and self.url: + # Try to reconnect websocket and retry rpc_request + self.debug_message("Connection Closed; Trying to reconnecting...") + self.connect_websocket() + + return self.rpc_request(method=method, params=params, result_handler=result_handler) + else: + # websocket connection is externally created, re-raise exception + raise - update_nr = 0 - json_body = None - subscription_id = None + update_nr = 0 + json_body = None + subscription_id = None - while json_body is None: - # Search for subscriptions - for message, remove_message in list_remove_iter(self.__rpc_message_queue): + while json_body is None: + # Search for subscriptions + for message, remove_message in list_remove_iter(self.__rpc_message_queue): - # Check if result message is matching request ID - if 'id' in message and message['id'] == request_id: + # Check if result message is matching request ID + if 'id' in message and message['id'] == request_id: + remove_message() - remove_message() + # Check if response has error + if 'error' in message: + raise SubstrateRequestException(message['error']) - # Check if response has error - if 'error' in message: - raise SubstrateRequestException(message['error']) + # If result handler is set, pass result through and loop until handler return value is set + if callable(result_handler): - # If result handler is set, pass result through and loop until handler return value is set - if callable(result_handler): + # Set subscription ID and only listen to messages containing this ID + subscription_id = message['result'] + self.debug_message(f"Websocket subscription [{subscription_id}] created") - # Set subscription ID and only listen to messages containing this ID - subscription_id = message['result'] - self.debug_message(f"Websocket subscription [{subscription_id}] created") + else: + json_body = message - else: - json_body = message + # Process subscription updates + for message, remove_message in list_remove_iter(self.__rpc_message_queue): + # Check if message is meant for this subscription + if 'params' in message and message['params']['subscription'] == subscription_id: - # Process subscription updates - for message, remove_message in list_remove_iter(self.__rpc_message_queue): - # Check if message is meant for this subscription - if 'params' in message and message['params']['subscription'] == subscription_id: + remove_message() - remove_message() + self.debug_message(f"Websocket result [{subscription_id} #{update_nr}]: {message}") - self.debug_message(f"Websocket result [{subscription_id} #{update_nr}]: {message}") + # Call result_handler with message for processing + callback_result = result_handler(message, update_nr, subscription_id) + if callback_result is not None: + json_body = callback_result - # Call result_handler with message for processing - callback_result = result_handler(message, update_nr, subscription_id) - if callback_result is not None: - json_body = callback_result + update_nr += 1 - update_nr += 1 + # Read one more message to queue + if json_body is None: + self.__rpc_message_queue.append(json.loads(self.websocket.recv())) - # Read one more message to queue - if json_body is None: - self.__rpc_message_queue.append(json.loads(self.websocket.recv())) + return json_body - else: + def rpc_request_http(self, method, params): + """ + Method that handles the actual RPC request to the Substrate node with HTTP. - if result_handler: - raise ConfigurationError("Result handlers only available for websockets (ws://) connections") + Parameters + ---------- + method: method of the JSONRPC request + params: a list containing the parameters of the JSONRPC request + + Returns + ------- + a dict with the parsed result of the request. + """ + request_id = self.request_id + self.request_id += 1 + + payload = { + "jsonrpc": "2.0", + "method": method, + "params": params, + "id": request_id + } - response = self.session.request("POST", self.url, data=json.dumps(payload), headers=self.default_headers) + response = self.session.request("POST", self.url, data=json.dumps(payload), headers=self.default_headers) - if response.status_code != 200: - raise SubstrateRequestException( - "RPC request failed with HTTP status code {}".format(response.status_code)) + if response.status_code != 200: + raise SubstrateRequestException( + "RPC request failed with HTTP status code {}".format(response.status_code)) - json_body = response.json() + json_body = response.json() - # Check if response has error - if 'error' in json_body: - raise SubstrateRequestException(json_body['error']) + # Check if response has error + if 'error' in json_body: + raise SubstrateRequestException(json_body['error']) return json_body @@ -1783,14 +1825,14 @@ def result_handler(message, update_nr, subscription_id): message_result = {k.lower(): v for k, v in message['params']['result'].items()} if 'finalized' in message_result and wait_for_finalization: - self.rpc_request('author_unwatchExtrinsic', [subscription_id]) + self.rpc_request_websocket('author_unwatchExtrinsic', [subscription_id]) return { 'block_hash': message_result['finalized'], 'extrinsic_hash': '0x{}'.format(extrinsic.extrinsic_hash.hex()), 'finalized': True } elif 'inblock' in message_result and wait_for_inclusion and not wait_for_finalization: - self.rpc_request('author_unwatchExtrinsic', [subscription_id]) + self.rpc_request_websocket('author_unwatchExtrinsic', [subscription_id]) return { 'block_hash': message_result['inblock'], 'extrinsic_hash': '0x{}'.format(extrinsic.extrinsic_hash.hex()),