From 93f85d00fdbd29e715859b1d79b02a380642dcb0 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Tue, 6 Dec 2022 16:37:59 -0500 Subject: [PATCH 1/2] Handle all the types of expected error and retry connection. Correct sleep_delay logic to make it back off exponentially. Add small random jitter to sleep_delay so reconnects don't all happen at the same time when SPV goes down. --- lbry/wallet/network.py | 54 +++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 55b8b145b3..3e9b19e531 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -309,47 +309,46 @@ async def connect_to_fastest(self) -> Optional[ClientSession]: return async def network_loop(self): - sleep_delay = 30 + sleep_delay = 15 while self.running: await asyncio.wait( - [asyncio.sleep(30), self._urgent_need_reconnect.wait()], return_when=asyncio.FIRST_COMPLETED + [asyncio.sleep(sleep_delay), self._urgent_need_reconnect.wait()], + return_when=asyncio.FIRST_COMPLETED ) if self._urgent_need_reconnect.is_set(): - sleep_delay = 30 + sleep_delay = 10 + random.uniform(0, 5) self._urgent_need_reconnect.clear() if not self.is_connected: client = await self.connect_to_fastest() if not client: - log.warning("failed to connect to any spv servers, retrying later") sleep_delay *= 2 - sleep_delay = min(sleep_delay, 300) + sleep_delay = min(sleep_delay, 120) + log.warning("failed to connect to any spv servers, retrying after %.2fs", sleep_delay) continue - log.debug("get spv server features %s:%i", *client.server) - features = await client.send_request('server.features', []) - self.client, self.server_features = client, features - log.debug("discover other hubs %s:%i", *client.server) - await self._update_hubs(await client.send_request('server.peers.subscribe', [])) - log.info("subscribe to headers %s:%i", *client.server) - self._update_remote_height((await self.subscribe_headers(),)) - self._on_connected_controller.add(True) - server_str = "%s:%i" % client.server - log.info("maintaining connection to spv server %s", server_str) - self._keepalive_task = asyncio.create_task(self.client.keepalive_loop()) try: - if not self._urgent_need_reconnect.is_set(): - await asyncio.wait( - [self._keepalive_task, self._urgent_need_reconnect.wait()], - return_when=asyncio.FIRST_COMPLETED - ) - else: - await self._keepalive_task + log.debug("get spv server features %s:%i", *client.server) + features = await client.send_request('server.features', []) + self.client, self.server_features = client, features + log.debug("discover other hubs %s:%i", *client.server) + await self._update_hubs(await client.send_request('server.peers.subscribe', [])) + log.info("subscribe to headers %s:%i", *client.server) + self._update_remote_height((await self.subscribe_headers(),)) + self._on_connected_controller.add(True) + sleep_delay = 15 + server_str = "%s:%i" % client.server + log.info("maintaining connection to spv server %s", server_str) + self._keepalive_task = asyncio.create_task(self.client.keepalive_loop()) + await asyncio.wait( + [self._keepalive_task, self._urgent_need_reconnect.wait()], + return_when=asyncio.FIRST_COMPLETED + ) if self._urgent_need_reconnect.is_set(): log.warning("urgent reconnect needed") - if self._keepalive_task and not self._keepalive_task.done(): - self._keepalive_task.cancel() - except asyncio.CancelledError: + except (asyncio.TimeoutError, ConnectionResetError, ConnectionError, RPCError, ProtocolError): pass finally: + if self._keepalive_task and not self._keepalive_task.done(): + self._keepalive_task.cancel() self._keepalive_task = None self.client = None self.server_features = None @@ -381,9 +380,10 @@ def rpc(self, list_or_method, args, restricted=True, session: Optional[ClientSes async def retriable_call(self, function, *args, **kwargs): while self.running: if not self.is_connected: - log.warning("Wallet server unavailable, waiting for it to come back and retry.") + log.warning("%s: Wallet server unavailable, waiting for it to come back and retry.", function.__name__) self._urgent_need_reconnect.set() await self.on_connected.first + log.warning("%s: Wallet server available, proceeding.", function.__name__) try: return await function(*args, **kwargs) except asyncio.TimeoutError: From 154921a0ce55e265d61f7275b25f02ab10b2cdc6 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Wed, 7 Dec 2022 13:14:05 -0500 Subject: [PATCH 2/2] Clear self._urgent_reconnect_needed once more after connection established. Tweak sleep_delay logic so failures during initial RPC sequence also back off. --- lbry/wallet/network.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 3e9b19e531..3664a457d8 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -309,14 +309,14 @@ async def connect_to_fastest(self) -> Optional[ClientSession]: return async def network_loop(self): - sleep_delay = 15 + def reset_sleep(): + return 10 + random.uniform(0, 5) + sleep_delay = reset_sleep() while self.running: await asyncio.wait( [asyncio.sleep(sleep_delay), self._urgent_need_reconnect.wait()], return_when=asyncio.FIRST_COMPLETED ) - if self._urgent_need_reconnect.is_set(): - sleep_delay = 10 + random.uniform(0, 5) self._urgent_need_reconnect.clear() if not self.is_connected: client = await self.connect_to_fastest() @@ -325,17 +325,24 @@ async def network_loop(self): sleep_delay = min(sleep_delay, 120) log.warning("failed to connect to any spv servers, retrying after %.2fs", sleep_delay) continue + sleep_delay = reset_sleep() + server_str = "%s:%i" % client.server try: + # Perform initial sequence of RPCs. log.debug("get spv server features %s:%i", *client.server) features = await client.send_request('server.features', []) - self.client, self.server_features = client, features log.debug("discover other hubs %s:%i", *client.server) await self._update_hubs(await client.send_request('server.peers.subscribe', [])) log.info("subscribe to headers %s:%i", *client.server) - self._update_remote_height((await self.subscribe_headers(),)) + self._update_remote_height((await client.send_request('blockchain.headers.subscribe', [True]),)) + + # All initial RPCs were successful. We're now connected. + self.client, self.server_features = client, features + self._urgent_need_reconnect.clear() + sleep_delay = reset_sleep() + # Release any waiters. self._on_connected_controller.add(True) - sleep_delay = 15 - server_str = "%s:%i" % client.server + log.info("maintaining connection to spv server %s", server_str) self._keepalive_task = asyncio.create_task(self.client.keepalive_loop()) await asyncio.wait( @@ -345,7 +352,9 @@ async def network_loop(self): if self._urgent_need_reconnect.is_set(): log.warning("urgent reconnect needed") except (asyncio.TimeoutError, ConnectionResetError, ConnectionError, RPCError, ProtocolError): - pass + sleep_delay *= 2 + sleep_delay = min(sleep_delay, 120) + log.warning("failed to connect to spv server %s, retrying after %.2fs", server_str, sleep_delay) finally: if self._keepalive_task and not self._keepalive_task.done(): self._keepalive_task.cancel() @@ -436,9 +445,6 @@ def get_history(self, address): def broadcast(self, raw_transaction): return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True) - def subscribe_headers(self): - return self.rpc('blockchain.headers.subscribe', [True], True) - async def subscribe_address(self, address, *addresses): addresses = list((address, ) + addresses) server_addr_and_port = self.client.server_address_and_port # on disconnect client will be None