diff --git a/src/Nostr/RelayConnection.hs b/src/Nostr/RelayConnection.hs index fe38253..3afbb50 100644 --- a/src/Nostr/RelayConnection.hs +++ b/src/Nostr/RelayConnection.hs @@ -20,6 +20,7 @@ import Effectful.State.Static.Shared (State, get, gets, modify) import Effectful.TH import Network.URI (URI(..), parseURI, uriAuthority, uriPort, uriRegName, uriScheme) import Network.WebSockets qualified as WS +import Network.WebSockets.Connection.PingPong (defaultPingPongOptions, withPingPong) import Wuss qualified as Wuss import QtQuick @@ -172,47 +173,49 @@ nostrClient :: RelayConnectionEff es => TMVar Bool -> RelayURI -> TChan Request nostrClient connectionMVar r requestChan runE conn = runE $ do logDebug $ "Connected to " <> r - modify @RelayPoolState $ \st -> - st { activeConnections = Map.adjust - (\d -> d { connectionState = Connected - , requestChannel = requestChan - }) - r - (activeConnections st) - } - notifyRelayStatus - - void $ atomically $ putTMVar connectionMVar True - - updateQueue <- newTQueueIO - receiveThread <- async $ receiveLoop updateQueue - sendThread <- async $ sendLoop - void $ waitAnyCancel [receiveThread, sendThread] - modify @RelayPoolState $ \st -> - st { activeConnections = Map.adjust (\d -> d { connectionState = Disconnected }) r (activeConnections st) } - notifyRelayStatus + liftIO $ withPingPong defaultPingPongOptions conn $ \conn' -> runE $ do + modify @RelayPoolState $ \st -> + st { activeConnections = Map.adjust + (\d -> d { connectionState = Connected + , requestChannel = requestChan + }) + r + (activeConnections st) + } + notifyRelayStatus + + void $ atomically $ putTMVar connectionMVar True + + updateQueue <- newTQueueIO + receiveThread <- async $ receiveLoop conn' updateQueue + sendThread <- async $ sendLoop conn' + void $ waitAnyCancel [receiveThread, sendThread] + modify @RelayPoolState $ \st -> + st { activeConnections = Map.adjust (\d -> d { connectionState = Disconnected }) r (activeConnections st) } + notifyRelayStatus + where - receiveLoop q = do - msg <- liftIO (try (WS.receiveData conn) :: IO (Either SomeException BSL.ByteString)) + receiveLoop conn' q = do + msg <- liftIO (try (WS.receiveData conn') :: IO (Either SomeException BSL.ByteString)) case msg of Left _ -> return () -- Exit the loop on error Right msg' -> case eitherDecode msg' of Right response -> do updates <- handleResponse r response atomically $ writeTQueue q updates - receiveLoop q + receiveLoop conn' q Left err -> do logError $ "Could not decode server response from " <> r <> ": " <> T.pack err - receiveLoop q + receiveLoop conn' q - sendLoop = do + sendLoop conn' = do msg <- atomically $ readTChan requestChan case msg of NT.Disconnect -> do - liftIO $ WS.sendClose conn (T.pack "Bye!") + liftIO $ WS.sendClose conn' (T.pack "Bye!") return () NT.SendEvent event -> do - result <- liftIO $ try @SomeException $ WS.sendTextData conn $ encode msg + result <- liftIO $ try @SomeException $ WS.sendTextData conn' $ encode msg case result of Left ex -> do logError $ "Error sending data to " <> r <> ": " <> T.pack (show ex) @@ -225,14 +228,14 @@ nostrClient connectionMVar r requestChan runE conn = runE $ do r (activeConnections st) } - sendLoop + sendLoop conn' _ -> do - result <- liftIO $ try @SomeException $ WS.sendTextData conn $ encode msg + result <- liftIO $ try @SomeException $ WS.sendTextData conn' $ encode msg case result of Left ex -> do logError $ "Error sending data to " <> r <> ": " <> T.pack (show ex) return () - Right _ -> sendLoop + Right _ -> sendLoop conn' -- | Handle responses.