Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add websocket ping-pong to keep connections alive #54

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 32 additions & 29 deletions src/Nostr/RelayConnection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import Effectful.State.Static.Shared (State, get, gets, modify)
import Effectful.TH
import Network.URI (URI(..), parseURI, uriAuthority, uriPort, uriRegName, uriScheme)
import Network.WebSockets qualified as WS
import Network.WebSockets.Connection.PingPong (defaultPingPongOptions, withPingPong)
import Wuss qualified as Wuss

import QtQuick
Expand Down Expand Up @@ -172,47 +173,49 @@ nostrClient :: RelayConnectionEff es => TMVar Bool -> RelayURI -> TChan Request
nostrClient connectionMVar r requestChan runE conn = runE $ do
logDebug $ "Connected to " <> r

modify @RelayPoolState $ \st ->
st { activeConnections = Map.adjust
(\d -> d { connectionState = Connected
, requestChannel = requestChan
})
r
(activeConnections st)
}
notifyRelayStatus

void $ atomically $ putTMVar connectionMVar True

updateQueue <- newTQueueIO
receiveThread <- async $ receiveLoop updateQueue
sendThread <- async $ sendLoop
void $ waitAnyCancel [receiveThread, sendThread]
modify @RelayPoolState $ \st ->
st { activeConnections = Map.adjust (\d -> d { connectionState = Disconnected }) r (activeConnections st) }
notifyRelayStatus
liftIO $ withPingPong defaultPingPongOptions conn $ \conn' -> runE $ do
modify @RelayPoolState $ \st ->
st { activeConnections = Map.adjust
(\d -> d { connectionState = Connected
, requestChannel = requestChan
})
r
(activeConnections st)
}
notifyRelayStatus

void $ atomically $ putTMVar connectionMVar True

updateQueue <- newTQueueIO
receiveThread <- async $ receiveLoop conn' updateQueue
sendThread <- async $ sendLoop conn'
void $ waitAnyCancel [receiveThread, sendThread]
modify @RelayPoolState $ \st ->
st { activeConnections = Map.adjust (\d -> d { connectionState = Disconnected }) r (activeConnections st) }
notifyRelayStatus

where
receiveLoop q = do
msg <- liftIO (try (WS.receiveData conn) :: IO (Either SomeException BSL.ByteString))
receiveLoop conn' q = do
msg <- liftIO (try (WS.receiveData conn') :: IO (Either SomeException BSL.ByteString))
case msg of
Left _ -> return () -- Exit the loop on error
Right msg' -> case eitherDecode msg' of
Right response -> do
updates <- handleResponse r response
atomically $ writeTQueue q updates
receiveLoop q
receiveLoop conn' q
Left err -> do
logError $ "Could not decode server response from " <> r <> ": " <> T.pack err
receiveLoop q
receiveLoop conn' q

sendLoop = do
sendLoop conn' = do
msg <- atomically $ readTChan requestChan
case msg of
NT.Disconnect -> do
liftIO $ WS.sendClose conn (T.pack "Bye!")
liftIO $ WS.sendClose conn' (T.pack "Bye!")
return ()
NT.SendEvent event -> do
result <- liftIO $ try @SomeException $ WS.sendTextData conn $ encode msg
result <- liftIO $ try @SomeException $ WS.sendTextData conn' $ encode msg
case result of
Left ex -> do
logError $ "Error sending data to " <> r <> ": " <> T.pack (show ex)
Expand All @@ -225,14 +228,14 @@ nostrClient connectionMVar r requestChan runE conn = runE $ do
r
(activeConnections st)
}
sendLoop
sendLoop conn'
_ -> do
result <- liftIO $ try @SomeException $ WS.sendTextData conn $ encode msg
result <- liftIO $ try @SomeException $ WS.sendTextData conn' $ encode msg
case result of
Left ex -> do
logError $ "Error sending data to " <> r <> ": " <> T.pack (show ex)
return ()
Right _ -> sendLoop
Right _ -> sendLoop conn'


-- | Handle responses.
Expand Down
Loading