Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Reuse PolymarketWebSocketClient during non-initial subscription #2238

Open
ryantam626 opened this issue Jan 23, 2025 · 2 comments
Open
Labels
enhancement New feature or request RFC A request for comment

Comments

@ryantam626
Copy link
Contributor

Background

I use nautlius to record polymarket data, I subscribe to new instruments' order book as I become aware of them - which could be during a reload of instruments (i.e. not during init).

If the new batch of instruments I want to subscribe to is massive, then the number of new PolymarketWebSocketClient creation becomes massive as well, and it's quite easy to hit a rate limit error.

Relevant code

async def _subscribe_asset_book(self, instrument_id):
token_id = get_polymarket_token_id(instrument_id)
if not self._ws_client.is_connected():
ws_client = self._ws_client
if token_id in ws_client.asset_subscriptions():
return # Already subscribed
ws_client.subscribe_book(asset=token_id)
if not self._main_ws_delay:
await ws_client.connect()
else:
ws_client = self._create_websocket_client()
if token_id in ws_client.asset_subscriptions():
return # Already subscribed
self._ws_clients[instrument_id] = ws_client
ws_client.subscribe_book(asset=token_id)
await ws_client.connect()

Notice how in the event where self._ws_client is already established and connected, we then opt for always creating a new client and subscribe over another WS.

Proposal

Change the meaning of ws_connection_delay_secs - make client always wait for this many seconds prior to connecting to WS.

@ryantam626 ryantam626 added the enhancement New feature or request label Jan 23, 2025
@ryantam626
Copy link
Contributor Author

Something along the lines of this.

From 125e4e8ab8cee63784ecf4ba989225f4e1f36c00 Mon Sep 17 00:00:00 2001
From: Ryan Tam <[email protected]>
Date: Thu, 23 Jan 2025 20:13:42 +0000
Subject: [PATCH] feat: Gate and group polymarket subscription by configurable
 value

This is to avoid overwhelming polymarket by establishing many WS at
once, since polymarket will throw 429 at us.
---
 nautilus_trader/adapters/polymarket/config.py |  6 +-
 nautilus_trader/adapters/polymarket/data.py   | 80 +++++++++----------
 2 files changed, 40 insertions(+), 46 deletions(-)

diff --git a/nautilus_trader/adapters/polymarket/config.py b/nautilus_trader/adapters/polymarket/config.py
index 249625b3a..adfd02453 100644
--- a/nautilus_trader/adapters/polymarket/config.py
+++ b/nautilus_trader/adapters/polymarket/config.py
@@ -50,8 +50,8 @@ class PolymarketDataClientConfig(LiveDataClientConfig, frozen=True):
         The HTTP client custom endpoint override.
     base_url_ws : str, optional
         The WebSocket client custom endpoint override.
-    ws_connection_delay_secs : PositiveInt, default 5
-        The delay (seconds) prior to main websocket connection to allow initial subscriptions to arrive.
+    ws_connection_delay_secs : PositiveFloat, default 0.1
+        The delay (seconds) prior to websocket connection to allow subscriptions to arrive.
     update_instruments_interval_mins: PositiveInt or None, default 60
         The interval (minutes) between updating Polymarket instruments.
     compute_effective_deltas : bool, default False
@@ -69,7 +69,7 @@ class PolymarketDataClientConfig(LiveDataClientConfig, frozen=True):
     passphrase: str | None = None
     base_url_http: str | None = None
     base_url_ws: str | None = None
-    ws_connection_delay_secs: PositiveInt = 5
+    ws_connection_delay_secs: PositiveFloat = 0.1
     update_instruments_interval_mins: PositiveInt | None = 60
     compute_effective_deltas: bool = False
 
diff --git a/nautilus_trader/adapters/polymarket/data.py b/nautilus_trader/adapters/polymarket/data.py
index 15060a3b6..48f89c76d 100644
--- a/nautilus_trader/adapters/polymarket/data.py
+++ b/nautilus_trader/adapters/polymarket/data.py
@@ -107,15 +107,15 @@ class PolymarketDataClient(LiveMarketDataClient):
 
         # WebSocket API
         self._ws_base_url = self._config.base_url_ws
-        self._ws_client: PolymarketWebSocketClient = self._create_websocket_client()
-        self._ws_clients: dict[InstrumentId, PolymarketWebSocketClient] = {}
+        self._ws_clients: list[PolymarketWebSocketClient] = []
+        self._ws_client_pending_connection: PolymarketWebSocketClient | None = None
+
         self._decoder_market_msg = msgspec.json.Decoder(MARKET_WS_MESSAGE)
 
         # Tasks
         self._update_instruments_interval_mins: int | None = config.update_instruments_interval_mins
         self._update_instruments_task: asyncio.Task | None = None
-        self._main_ws_connect_task: asyncio.Task | None = None
-        self._main_ws_delay = True
+        self._delayed_ws_client_connection_task: asyncio.Task | None = None
 
         # Hot caches
         self._last_quotes: dict[InstrumentId, QuoteTick] = {}
@@ -131,34 +131,27 @@ class PolymarketDataClient(LiveMarketDataClient):
                 self._update_instruments(self._update_instruments_interval_mins),
             )
 
-        self._main_ws_connect_task = self.create_task(self._connect_main_ws_after_delay())
-
     async def _disconnect(self) -> None:
         if self._update_instruments_task:
             self._log.debug("Canceling task 'update_instruments'")
             self._update_instruments_task.cancel()
             self._update_instruments_task = None
 
-        if self._main_ws_connect_task:
-            self._log.debug("Canceling task 'connect_main_ws_after_delay'")
-            self._main_ws_connect_task.cancel()
-            self._main_ws_connect_task = None
+        if self._delayed_ws_client_connection_task:
+            self._log.debug("Canceling task 'delayed_ws_client_connection'")
+            self._delayed_ws_client_connection_task.cancel()
+            self._delayed_ws_client_connection_task = None
 
         # Shutdown websockets
         tasks: set[Coroutine[Any, Any, None]] = set()
 
-        if self._ws_client.is_connected():
-            tasks.add(self._ws_client.disconnect())
-
-        for ws_client in self._ws_clients.values():
+        for ws_client in self._ws_clients:
             if ws_client.is_connected():
                 tasks.add(ws_client.disconnect())
 
         if tasks:
             await asyncio.gather(*tasks)
 
-        self._main_ws_delay = True
-
     def _create_websocket_client(self) -> PolymarketWebSocketClient:
         self._log.info("Creating new PolymarketWebSocketClient", LogColor.MAGENTA)
         return PolymarketWebSocketClient(
@@ -170,18 +163,6 @@ class PolymarketDataClient(LiveMarketDataClient):
             loop=self._loop,
         )
 
-    async def _connect_main_ws_after_delay(self) -> None:
-        delay_secs = self._config.ws_connection_delay_secs
-        self._log.info(
-            f"Awaiting initial websocket connection delay ({delay_secs}s)...",
-            LogColor.BLUE,
-        )
-        await asyncio.sleep(delay_secs)
-        if self._ws_client.asset_subscriptions():
-            await self._ws_client.connect()
-
-        self._main_ws_delay = False
-
     def _send_all_instruments_to_data_engine(self) -> None:
         for instrument in self._instrument_provider.get_all().values():
             self._handle_data(instrument)
@@ -201,23 +182,36 @@ class PolymarketDataClient(LiveMarketDataClient):
         except asyncio.CancelledError:
             self._log.debug("Canceled task 'update_instruments'")
 
+    async def _delayed_ws_client_connection(
+        self,
+        ws_client: PolymarketWebSocketClient,
+        sleep_secs: int,
+    ) -> None:
+        try:
+            await asyncio.sleep(sleep_secs)
+            await ws_client.connect()
+        finally:
+            self._ws_client_pending_connection = None
+            self._delayed_ws_client_connection_task = None
+
     async def _subscribe_asset_book(self, instrument_id):
-        token_id = get_polymarket_token_id(instrument_id)
+        create_connect_task = False
+        if self._ws_client_pending_connection is None:
+            self._ws_client_pending_connection = self._create_websocket_client()
+            create_connect_task = True
 
-        if not self._ws_client.is_connected():
-            ws_client = self._ws_client
-            if token_id in ws_client.asset_subscriptions():
-                return  # Already subscribed
-            ws_client.subscribe_book(asset=token_id)
-            if not self._main_ws_delay:
-                await ws_client.connect()
-        else:
-            ws_client = self._create_websocket_client()
-            if token_id in ws_client.asset_subscriptions():
-                return  # Already subscribed
-            self._ws_clients[instrument_id] = ws_client
-            ws_client.subscribe_book(asset=token_id)
-            await ws_client.connect()
+        token_id = get_polymarket_token_id(instrument_id)
+        self._ws_client_pending_connection.subscribe_book(token_id)
+
+        if create_connect_task:
+            self._delayed_ws_client_connection_task = self.create_task(
+                self._delayed_ws_client_connection(
+                    self._ws_client_pending_connection,
+                    self._config.ws_connection_delay_secs,
+                ),
+                log_msg="Delayed start PolymarketWebSocketClient connection",
+                success_msg="Finished delaying start of PolymarketWebSocketClient connection",
+            )
 
     async def _subscribe_order_book_deltas(
         self,
-- 
2.43.0

@ryantam626
Copy link
Contributor Author

(P.S. there is no RFC template so I used the feature request template)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request RFC A request for comment
Projects
None yet
Development

No branches or pull requests

2 participants