Skip to content

Commit

Permalink
- Remove redundant thread management in nostrClient
Browse files Browse the repository at this point in the history
- Use async/waitAnyCancel pattern for read/write loops
- Clean up connection termination logic
- Remove unnecessary reconnection logic from nostrClient
- Add proper error handling in send/receive loops
  • Loading branch information
prolic committed Nov 12, 2024
1 parent 84d0389 commit b02b3b8
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 288 deletions.
8 changes: 4 additions & 4 deletions src/EffectfulQML.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

module EffectfulQML where

import Control.Monad (forM_, void, when)
import Control.Monad (forever, forM_, void, when)
import Effectful
import Effectful.Concurrent
import Effectful.Concurrent.Async (async)
import Effectful.Concurrent.STM (TQueue, atomically, flushTQueue, newTQueueIO, readTQueue, writeTQueue)
import Effectful.Dispatch.Dynamic (EffectHandler, interpret)
import Effectful.State.Static.Shared (State, evalState, get, gets, put)
import Effectful.Dispatch.Dynamic (interpret)
import Effectful.State.Static.Shared (State, get, gets, put)
import Effectful.TH
import Graphics.QML qualified as QML

Expand Down Expand Up @@ -61,7 +61,7 @@ runEffectfulQML = interpret $ \_ -> \case
RunEngineLoop config changeKey ctx -> do
q <- newTQueueIO
put $ EffectfulQMLState (Just changeKey) (Just ctx) initialUIRefs (Just q)
void $ async $ do
void $ async $ forever $ do
uiUpdates <- atomically $ readTQueue q
moreUpdates <- atomically $ flushTQueue q
let combinedUpdates = uiUpdates <> mconcat moreUpdates
Expand Down
1 change: 0 additions & 1 deletion src/Nostr/GiftWrap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type GiftWrapEff es = ( State AppState :> es
runGiftWrap :: GiftWrapEff es => Eff (GiftWrap : es) a -> Eff es a
runGiftWrap = interpret $ \_ -> \case
HandleGiftWrapEvent event' -> do
logDebug $ "Handling gift wrap event: " <> pack (show event')
st <- get @AppState
case keyPair st of
Just kp -> do
Expand Down
104 changes: 43 additions & 61 deletions src/Nostr/RelayConnection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@
module Nostr.RelayConnection where

import Control.Exception (SomeException, try)
import Control.Monad (forM_, unless, void, when)
import Control.Monad (void, when)
import Data.Aeson (eitherDecode, encode)
import Data.ByteString.Lazy qualified as BSL
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe, isJust)
import Data.Text qualified as T
import Effectful
import Effectful.Concurrent (Concurrent, forkIO)
import Effectful.Concurrent.Async (async)
import Effectful.Concurrent.Async (async, waitAnyCancel)
import Effectful.Concurrent.STM ( TChan, TMVar, atomically, newTChanIO, newTQueueIO
, newEmptyTMVarIO, newTMVarIO, putTMVar, readTChan, readTMVar
, tryPutTMVar, tryReadTMVar, takeTMVar, writeTChan, writeTQueue )
, newEmptyTMVarIO, putTMVar, readTChan
, takeTMVar, writeTChan, writeTQueue )
import Effectful.Dispatch.Dynamic (interpret)
import Effectful.State.Static.Shared (State, get, gets, modify)
import Effectful.TH
Expand Down Expand Up @@ -141,7 +140,7 @@ connectWithRetry r maxRetries requestChan = do
else WS.runClient (T.unpack $ T.drop 5 r) 80 "/"

void $ forkIO $ withEffToIO (ConcUnlift Persistent Unlimited) $ \runE -> do
let runClient = nostrClient connectionMVar r requestChan (maxRetries - attempts - 1) runE
let runClient = nostrClient connectionMVar r requestChan runE
result <- try @SomeException $ connectAction runClient
case result of
Right _ -> return ()
Expand All @@ -164,70 +163,53 @@ connectWithRetry r maxRetries requestChan = do


-- | Nostr client for relay connections.
nostrClient :: RelayConnectionEff es => TMVar Bool -> RelayURI -> TChan Request -> Int -> (forall a. Eff es a -> IO a) -> WS.ClientApp ()
nostrClient connectionMVar r requestChan remainingRetries runE conn = runE $ do
nostrClient :: RelayConnectionEff es => TMVar Bool -> RelayURI -> TChan Request -> (forall a. Eff es a -> IO a) -> WS.ClientApp ()
nostrClient connectionMVar r requestChan runE conn = runE $ do
logDebug $ "Connected to " <> r
void $ atomically $ putTMVar connectionMVar True
modify @RelayPoolState $ \st ->
st { activeConnections = Map.adjust (\d -> d { connectionState = Connected }) r (activeConnections st) }
notifyRelayStatus
updateQueue <- newTQueueIO
terminateThreads <- newTMVarIO Nothing

-- Websocket reading thread
void $ forkIO $ let
readLoop = do
shouldTerminate <- atomically $ tryReadTMVar terminateThreads
unless (isJust shouldTerminate) $ do
msg <- liftIO (try (WS.receiveData conn) :: IO (Either SomeException BSL.ByteString))
case msg of
Left ex -> do
logError $ "Error receiving data from " <> r <> ": " <> T.pack (show ex)
void $ atomically $ tryPutTMVar terminateThreads (Just ConnectionFailure)
Right msg' -> case eitherDecode msg' of
Right response -> do
updates <- handleResponse r response
atomically $ writeTQueue updateQueue updates
readLoop
Left err -> do
logError $ "Could not decode server response from " <> r <> ": " <> T.pack err
readLoop
in readLoop

-- Main message handling loop
let loop = do
msg <- atomically $ readTChan requestChan
case msg of
NT.Disconnect -> do
liftIO $ WS.sendClose conn (T.pack "Bye!")
void $ atomically $ tryPutTMVar terminateThreads (Just UserInitiated)
_ -> do
result <- liftIO $ try @SomeException $ WS.sendTextData conn $ encode msg
case result of
Left ex -> do
logError $ "Error sending data to " <> r <> ": " <> T.pack (show ex)
void $ atomically $ tryPutTMVar terminateThreads (Just ConnectionFailure)
Right _ -> loop
loop

-- Handle cleanup and potential reconnection
reason <- atomically $ fromMaybe ConnectionFailure <$> readTMVar terminateThreads

-- Start receive and send loops as async tasks
receiveThread <- async $ receiveLoop updateQueue
sendThread <- async $ sendLoop

-- Wait for either thread to finish
void $ waitAnyCancel [receiveThread, sendThread]
modify @RelayPoolState $ \st ->
st { activeConnections = Map.adjust (\d -> d { connectionState = Disconnected }) r (activeConnections st) }
notifyRelayStatus

unless (reason == UserInitiated) $ do
logDebug $ "Reconnecting to: " <> r
-- Only attempt reconnection for non-user-initiated disconnects
void $ connectWithRetry r remainingRetries requestChan
-- Resubscribe active subscriptions
st <- get @RelayPoolState
let relaySubs = case Map.lookup r (activeConnections st) of
Just rd -> Map.elems (activeSubscriptions rd)
Nothing -> []
forM_ relaySubs $ \sub -> do
let sub' = NT.Subscription (subscriptionId sub) (subscriptionFilters sub)
atomically $ writeTChan requestChan (NT.Subscribe sub')
where
receiveLoop q = do
msg <- liftIO (try (WS.receiveData conn) :: IO (Either SomeException BSL.ByteString))
case msg of
Left ex -> do
logError $ "Error receiving data from " <> r <> ": " <> T.pack (show ex)
return () -- Exit the loop on error
Right msg' -> case eitherDecode msg' of
Right response -> do
updates <- handleResponse r response
atomically $ writeTQueue q updates
receiveLoop q
Left err -> do
logError $ "Could not decode server response from " <> r <> ": " <> T.pack err
receiveLoop q

sendLoop = do
msg <- atomically $ readTChan requestChan
case msg of
NT.Disconnect -> do
liftIO $ WS.sendClose conn (T.pack "Bye!")
return () -- Exit the loop after disconnect
_ -> do
result <- liftIO $ try @SomeException $ WS.sendTextData conn $ encode msg
case result of
Left ex -> do
logError $ "Error sending data to " <> r <> ": " <> T.pack (show ex)
return () -- Exit the loop on error
Right _ -> sendLoop


-- | Handle responses.
Expand Down
91 changes: 6 additions & 85 deletions src/Nostr/RelayPool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@

module Nostr.RelayPool where

import Control.Monad (forM, forM_, when)
import Control.Monad (forM_, when)
import Data.Map.Strict qualified as Map
import Data.Text qualified as T
import Effectful
import Effectful.Concurrent (Concurrent, threadDelay)
import Effectful.Concurrent.STM (atomically, flushTQueue, readTQueue, newTVarIO, readTVar, writeTVar)
import Effectful.Dispatch.Dynamic (interpret)
import Effectful.State.Static.Shared (State, get)
import Effectful.TH
Expand All @@ -20,11 +18,10 @@ import Nostr.Keys (PubKeyXO, keyPairToPubKeyXO)
import Nostr.Publisher
import Nostr.RelayConnection
import Nostr.Subscription
import Nostr.Types ( Event(..), Filter, Kind(..), Relay(..), RelayURI
, getUri, followListFilter, giftWrapFilter, metadataFilter, preferredDMRelaysFilter )
import Nostr.Types (Event(..), Kind(..), Relay(..), RelayURI)
import Nostr.Util
import Types ( AppState(..), ConnectionState(..), Follow(..), RelayData(..)
, RelayPoolState(..), SubscriptionEvent(..), emptyUpdates )
import Presentation.KeyMgmt (KeyMgmt)
import Types (AppState(..), ConnectionState(..), RelayData(..), RelayPoolState(..))
import RelayMgmt (RelayMgmt)
import RelayMgmt qualified as RM

Expand Down Expand Up @@ -66,11 +63,13 @@ type RelayPoolEff es =
, Publisher :> es
, RelayMgmt :> es
, Subscription :> es
, KeyMgmt :> es
, GiftWrap :> es
, EffectfulQML :> es
, Concurrent :> es
, Logging :> es
, Util :> es
, IOE :> es
)


Expand Down Expand Up @@ -132,81 +131,3 @@ runRelayPool = interpret $ \_ -> \case
GiftWrap -> publishGiftWrap event pk
-- Default case: publish to outbox-capable relays (FollowList, ShortTextNote, etc.)
_ -> publishToOutbox event


-- | Determine relay type and start appropriate subscriptions
handleRelaySubscription :: RelayPoolEff es => RelayURI -> Eff es ()
handleRelaySubscription r = do
kp <- getKeyPair
let pk = keyPairToPubKeyXO kp
st <- get @AppState
let followPks = maybe [] (map (\(Follow pk' _ _) -> pk')) $ Map.lookup pk (follows st)
st' <- get @RelayPoolState

-- Check if it's a DM relay
let isDM = any (\(_, (relays, _)) ->
any (\relay -> getUri relay == r) relays)
(Map.toList $ dmRelays st')

-- Check if it's an inbox-capable relay
let isInbox = any (\(_, (relays, _)) ->
any (\relay -> case relay of
InboxRelay uri -> uri == r
InboxOutboxRelay uri -> uri == r
_ -> False) relays)
(Map.toList $ generalRelays st')

-- Start appropriate subscriptions based on relay type
let fs = if isDM then Just $ createDMRelayFilters pk followPks
else if isInbox then Just $ createInboxRelayFilters pk followPks
else Nothing

logInfo $ "Starting subscription for " <> r <> " with filters " <> T.pack (show fs)

case fs of
Just fs' -> do
subId' <- newSubscriptionId
mq <- subscribe r subId' (createDMRelayFilters pk followPks)
case mq of
Just q -> do
shouldStop <- newTVarIO False

let loop = do
e <- atomically $ readTQueue q
es <- atomically $ flushTQueue q

updates <- fmap mconcat $ forM (e : es) $ \case
EventAppeared event' -> handleEvent r subId' fs' event'
SubscriptionEose -> return emptyUpdates
SubscriptionClosed _ -> do
atomically $ writeTVar shouldStop True
return emptyUpdates

shouldStopNow <- atomically $ readTVar shouldStop

if shouldStopNow
then return ()
else loop

notify updates
loop
Nothing -> logWarning $ "Failed to start subscription for " <> r
Nothing -> return () -- Outbox only relay or unknown relay, no subscriptions needed



-- | Create DM relay subscription filters
createDMRelayFilters :: PubKeyXO -> [PubKeyXO] -> [Filter]
createDMRelayFilters xo followedPubKeys =
[ metadataFilter (xo : followedPubKeys)
, preferredDMRelaysFilter (xo : followedPubKeys)
, giftWrapFilter xo
]

-- | Create inbox relay subscription filters
createInboxRelayFilters :: PubKeyXO -> [PubKeyXO] -> [Filter]
createInboxRelayFilters xo followedPubKeys =
[ followListFilter (xo : followedPubKeys)
, metadataFilter (xo : followedPubKeys)
, preferredDMRelaysFilter (xo : followedPubKeys)
]
Loading

0 comments on commit b02b3b8

Please sign in to comment.