diff --git a/src/EffectfulQML.hs b/src/EffectfulQML.hs index 94ca5ec..1d268e2 100644 --- a/src/EffectfulQML.hs +++ b/src/EffectfulQML.hs @@ -4,13 +4,13 @@ module EffectfulQML where -import Control.Monad (forM_, void, when) +import Control.Monad (forever, forM_, void, when) import Effectful import Effectful.Concurrent import Effectful.Concurrent.Async (async) import Effectful.Concurrent.STM (TQueue, atomically, flushTQueue, newTQueueIO, readTQueue, writeTQueue) -import Effectful.Dispatch.Dynamic (EffectHandler, interpret) -import Effectful.State.Static.Shared (State, evalState, get, gets, put) +import Effectful.Dispatch.Dynamic (interpret) +import Effectful.State.Static.Shared (State, get, gets, put) import Effectful.TH import Graphics.QML qualified as QML @@ -61,7 +61,7 @@ runEffectfulQML = interpret $ \_ -> \case RunEngineLoop config changeKey ctx -> do q <- newTQueueIO put $ EffectfulQMLState (Just changeKey) (Just ctx) initialUIRefs (Just q) - void $ async $ do + void $ async $ forever $ do uiUpdates <- atomically $ readTQueue q moreUpdates <- atomically $ flushTQueue q let combinedUpdates = uiUpdates <> mconcat moreUpdates diff --git a/src/Nostr/GiftWrap.hs b/src/Nostr/GiftWrap.hs index 20d0ae5..9c1bfcd 100644 --- a/src/Nostr/GiftWrap.hs +++ b/src/Nostr/GiftWrap.hs @@ -46,7 +46,6 @@ type GiftWrapEff es = ( State AppState :> es runGiftWrap :: GiftWrapEff es => Eff (GiftWrap : es) a -> Eff es a runGiftWrap = interpret $ \_ -> \case HandleGiftWrapEvent event' -> do - logDebug $ "Handling gift wrap event: " <> pack (show event') st <- get @AppState case keyPair st of Just kp -> do diff --git a/src/Nostr/RelayConnection.hs b/src/Nostr/RelayConnection.hs index 2feef27..bdcb85e 100644 --- a/src/Nostr/RelayConnection.hs +++ b/src/Nostr/RelayConnection.hs @@ -3,18 +3,17 @@ module Nostr.RelayConnection where import Control.Exception (SomeException, try) -import Control.Monad (forM_, unless, void, when) +import Control.Monad (void, when) import Data.Aeson (eitherDecode, encode) import Data.ByteString.Lazy qualified as BSL import Data.Map.Strict qualified as Map -import Data.Maybe (fromMaybe, isJust) import Data.Text qualified as T import Effectful import Effectful.Concurrent (Concurrent, forkIO) -import Effectful.Concurrent.Async (async) +import Effectful.Concurrent.Async (async, waitAnyCancel) import Effectful.Concurrent.STM ( TChan, TMVar, atomically, newTChanIO, newTQueueIO - , newEmptyTMVarIO, newTMVarIO, putTMVar, readTChan, readTMVar - , tryPutTMVar, tryReadTMVar, takeTMVar, writeTChan, writeTQueue ) + , newEmptyTMVarIO, putTMVar, readTChan + , takeTMVar, writeTChan, writeTQueue ) import Effectful.Dispatch.Dynamic (interpret) import Effectful.State.Static.Shared (State, get, gets, modify) import Effectful.TH @@ -141,7 +140,7 @@ connectWithRetry r maxRetries requestChan = do else WS.runClient (T.unpack $ T.drop 5 r) 80 "/" void $ forkIO $ withEffToIO (ConcUnlift Persistent Unlimited) $ \runE -> do - let runClient = nostrClient connectionMVar r requestChan (maxRetries - attempts - 1) runE + let runClient = nostrClient connectionMVar r requestChan runE result <- try @SomeException $ connectAction runClient case result of Right _ -> return () @@ -164,70 +163,53 @@ connectWithRetry r maxRetries requestChan = do -- | Nostr client for relay connections. -nostrClient :: RelayConnectionEff es => TMVar Bool -> RelayURI -> TChan Request -> Int -> (forall a. Eff es a -> IO a) -> WS.ClientApp () -nostrClient connectionMVar r requestChan remainingRetries runE conn = runE $ do +nostrClient :: RelayConnectionEff es => TMVar Bool -> RelayURI -> TChan Request -> (forall a. Eff es a -> IO a) -> WS.ClientApp () +nostrClient connectionMVar r requestChan runE conn = runE $ do logDebug $ "Connected to " <> r void $ atomically $ putTMVar connectionMVar True modify @RelayPoolState $ \st -> st { activeConnections = Map.adjust (\d -> d { connectionState = Connected }) r (activeConnections st) } notifyRelayStatus updateQueue <- newTQueueIO - terminateThreads <- newTMVarIO Nothing - - -- Websocket reading thread - void $ forkIO $ let - readLoop = do - shouldTerminate <- atomically $ tryReadTMVar terminateThreads - unless (isJust shouldTerminate) $ do - msg <- liftIO (try (WS.receiveData conn) :: IO (Either SomeException BSL.ByteString)) - case msg of - Left ex -> do - logError $ "Error receiving data from " <> r <> ": " <> T.pack (show ex) - void $ atomically $ tryPutTMVar terminateThreads (Just ConnectionFailure) - Right msg' -> case eitherDecode msg' of - Right response -> do - updates <- handleResponse r response - atomically $ writeTQueue updateQueue updates - readLoop - Left err -> do - logError $ "Could not decode server response from " <> r <> ": " <> T.pack err - readLoop - in readLoop - - -- Main message handling loop - let loop = do - msg <- atomically $ readTChan requestChan - case msg of - NT.Disconnect -> do - liftIO $ WS.sendClose conn (T.pack "Bye!") - void $ atomically $ tryPutTMVar terminateThreads (Just UserInitiated) - _ -> do - result <- liftIO $ try @SomeException $ WS.sendTextData conn $ encode msg - case result of - Left ex -> do - logError $ "Error sending data to " <> r <> ": " <> T.pack (show ex) - void $ atomically $ tryPutTMVar terminateThreads (Just ConnectionFailure) - Right _ -> loop - loop - - -- Handle cleanup and potential reconnection - reason <- atomically $ fromMaybe ConnectionFailure <$> readTMVar terminateThreads + + -- Start receive and send loops as async tasks + receiveThread <- async $ receiveLoop updateQueue + sendThread <- async $ sendLoop + + -- Wait for either thread to finish + void $ waitAnyCancel [receiveThread, sendThread] modify @RelayPoolState $ \st -> st { activeConnections = Map.adjust (\d -> d { connectionState = Disconnected }) r (activeConnections st) } notifyRelayStatus - - unless (reason == UserInitiated) $ do - logDebug $ "Reconnecting to: " <> r - -- Only attempt reconnection for non-user-initiated disconnects - void $ connectWithRetry r remainingRetries requestChan - -- Resubscribe active subscriptions - st <- get @RelayPoolState - let relaySubs = case Map.lookup r (activeConnections st) of - Just rd -> Map.elems (activeSubscriptions rd) - Nothing -> [] - forM_ relaySubs $ \sub -> do - let sub' = NT.Subscription (subscriptionId sub) (subscriptionFilters sub) - atomically $ writeTChan requestChan (NT.Subscribe sub') + where + receiveLoop q = do + msg <- liftIO (try (WS.receiveData conn) :: IO (Either SomeException BSL.ByteString)) + case msg of + Left ex -> do + logError $ "Error receiving data from " <> r <> ": " <> T.pack (show ex) + return () -- Exit the loop on error + Right msg' -> case eitherDecode msg' of + Right response -> do + updates <- handleResponse r response + atomically $ writeTQueue q updates + receiveLoop q + Left err -> do + logError $ "Could not decode server response from " <> r <> ": " <> T.pack err + receiveLoop q + + sendLoop = do + msg <- atomically $ readTChan requestChan + case msg of + NT.Disconnect -> do + liftIO $ WS.sendClose conn (T.pack "Bye!") + return () -- Exit the loop after disconnect + _ -> do + result <- liftIO $ try @SomeException $ WS.sendTextData conn $ encode msg + case result of + Left ex -> do + logError $ "Error sending data to " <> r <> ": " <> T.pack (show ex) + return () -- Exit the loop on error + Right _ -> sendLoop -- | Handle responses. diff --git a/src/Nostr/RelayPool.hs b/src/Nostr/RelayPool.hs index 9c64041..3bae1df 100644 --- a/src/Nostr/RelayPool.hs +++ b/src/Nostr/RelayPool.hs @@ -2,12 +2,10 @@ module Nostr.RelayPool where -import Control.Monad (forM, forM_, when) +import Control.Monad (forM_, when) import Data.Map.Strict qualified as Map -import Data.Text qualified as T import Effectful import Effectful.Concurrent (Concurrent, threadDelay) -import Effectful.Concurrent.STM (atomically, flushTQueue, readTQueue, newTVarIO, readTVar, writeTVar) import Effectful.Dispatch.Dynamic (interpret) import Effectful.State.Static.Shared (State, get) import Effectful.TH @@ -20,11 +18,10 @@ import Nostr.Keys (PubKeyXO, keyPairToPubKeyXO) import Nostr.Publisher import Nostr.RelayConnection import Nostr.Subscription -import Nostr.Types ( Event(..), Filter, Kind(..), Relay(..), RelayURI - , getUri, followListFilter, giftWrapFilter, metadataFilter, preferredDMRelaysFilter ) +import Nostr.Types (Event(..), Kind(..), Relay(..), RelayURI) import Nostr.Util -import Types ( AppState(..), ConnectionState(..), Follow(..), RelayData(..) - , RelayPoolState(..), SubscriptionEvent(..), emptyUpdates ) +import Presentation.KeyMgmt (KeyMgmt) +import Types (AppState(..), ConnectionState(..), RelayData(..), RelayPoolState(..)) import RelayMgmt (RelayMgmt) import RelayMgmt qualified as RM @@ -66,11 +63,13 @@ type RelayPoolEff es = , Publisher :> es , RelayMgmt :> es , Subscription :> es + , KeyMgmt :> es , GiftWrap :> es , EffectfulQML :> es , Concurrent :> es , Logging :> es , Util :> es + , IOE :> es ) @@ -132,81 +131,3 @@ runRelayPool = interpret $ \_ -> \case GiftWrap -> publishGiftWrap event pk -- Default case: publish to outbox-capable relays (FollowList, ShortTextNote, etc.) _ -> publishToOutbox event - - --- | Determine relay type and start appropriate subscriptions -handleRelaySubscription :: RelayPoolEff es => RelayURI -> Eff es () -handleRelaySubscription r = do - kp <- getKeyPair - let pk = keyPairToPubKeyXO kp - st <- get @AppState - let followPks = maybe [] (map (\(Follow pk' _ _) -> pk')) $ Map.lookup pk (follows st) - st' <- get @RelayPoolState - - -- Check if it's a DM relay - let isDM = any (\(_, (relays, _)) -> - any (\relay -> getUri relay == r) relays) - (Map.toList $ dmRelays st') - - -- Check if it's an inbox-capable relay - let isInbox = any (\(_, (relays, _)) -> - any (\relay -> case relay of - InboxRelay uri -> uri == r - InboxOutboxRelay uri -> uri == r - _ -> False) relays) - (Map.toList $ generalRelays st') - - -- Start appropriate subscriptions based on relay type - let fs = if isDM then Just $ createDMRelayFilters pk followPks - else if isInbox then Just $ createInboxRelayFilters pk followPks - else Nothing - - logInfo $ "Starting subscription for " <> r <> " with filters " <> T.pack (show fs) - - case fs of - Just fs' -> do - subId' <- newSubscriptionId - mq <- subscribe r subId' (createDMRelayFilters pk followPks) - case mq of - Just q -> do - shouldStop <- newTVarIO False - - let loop = do - e <- atomically $ readTQueue q - es <- atomically $ flushTQueue q - - updates <- fmap mconcat $ forM (e : es) $ \case - EventAppeared event' -> handleEvent r subId' fs' event' - SubscriptionEose -> return emptyUpdates - SubscriptionClosed _ -> do - atomically $ writeTVar shouldStop True - return emptyUpdates - - shouldStopNow <- atomically $ readTVar shouldStop - - if shouldStopNow - then return () - else loop - - notify updates - loop - Nothing -> logWarning $ "Failed to start subscription for " <> r - Nothing -> return () -- Outbox only relay or unknown relay, no subscriptions needed - - - --- | Create DM relay subscription filters -createDMRelayFilters :: PubKeyXO -> [PubKeyXO] -> [Filter] -createDMRelayFilters xo followedPubKeys = - [ metadataFilter (xo : followedPubKeys) - , preferredDMRelaysFilter (xo : followedPubKeys) - , giftWrapFilter xo - ] - --- | Create inbox relay subscription filters -createInboxRelayFilters :: PubKeyXO -> [PubKeyXO] -> [Filter] -createInboxRelayFilters xo followedPubKeys = - [ followListFilter (xo : followedPubKeys) - , metadataFilter (xo : followedPubKeys) - , preferredDMRelaysFilter (xo : followedPubKeys) - ] diff --git a/src/Nostr/Subscription.hs b/src/Nostr/Subscription.hs index a7453e8..ee0a281 100644 --- a/src/Nostr/Subscription.hs +++ b/src/Nostr/Subscription.hs @@ -1,6 +1,6 @@ module Nostr.Subscription where -import Control.Monad (forM_, replicateM, when) +import Control.Monad (forM, forM_, replicateM, when) import Data.Aeson (eitherDecode) import Data.ByteString qualified as BS import Data.ByteString.Base16 qualified as B16 @@ -10,7 +10,8 @@ import Data.Text (pack, unpack) import Data.Text.Encoding (decodeUtf8, encodeUtf8) import Effectful import Effectful.Concurrent -import Effectful.Concurrent.STM (TQueue, atomically, newTQueueIO, writeTChan) +import Effectful.Concurrent.Async (async) +import Effectful.Concurrent.STM (TQueue, atomically, flushTQueue, newTQueueIO, newTVarIO, readTQueue, readTVar, writeTChan, writeTVar) import Effectful.Dispatch.Dynamic (interpret) import Effectful.State.Static.Shared (State, get, modify) import Effectful.TH @@ -23,9 +24,11 @@ import Nostr.Bech32 (pubKeyXOToBech32) import Nostr.Event (validateEvent) import Nostr.GiftWrap (GiftWrap, handleGiftWrapEvent) import Nostr.Keys (PubKeyXO, byteStringToHex, keyPairToPubKeyXO) +import Nostr.RelayConnection import Nostr.Types ( Event(..), EventId(..), Filter, Kind(..), Relay(..) , RelayURI, SubscriptionId, Tag(..), getUri ) import Nostr.Types qualified as NT +import Nostr.Util import Presentation.KeyMgmt (AccountId(..), KeyMgmt, updateProfile) import RelayMgmt import Types @@ -48,8 +51,10 @@ type SubscriptionEff es = ( State AppState :> es , State RelayPoolState :> es , GiftWrap :> es + , RelayConnection :> es , KeyMgmt :> es , RelayMgmt :> es + , Util :> es , Logging :> es , Concurrent :> es , EffectfulQML :> es @@ -62,28 +67,9 @@ runSubscription => Eff (Subscription : es) a -> Eff es a runSubscription = interpret $ \_ -> \case - NewSubscriptionId -> do - bytes <- liftIO $ replicateM 8 randomIO - let byteString = BS.pack bytes - return $ decodeUtf8 $ B16.encode byteString + NewSubscriptionId -> generateRandomSubscriptionId - Subscribe r subId' fs -> do - st <- get @RelayPoolState - case Map.lookup r (activeConnections st) of - Nothing -> do - logWarning $ "Cannot start subscription: no connection found for relay: " <> r - return Nothing - Just rd -> do - let channel = requestChannel rd - atomically $ writeTChan channel (NT.Subscribe $ NT.Subscription subId' fs) - q <- newTQueueIO - modify @RelayPoolState $ \st' -> - st { activeConnections = Map.adjust - (\rd' -> rd' { activeSubscriptions = Map.insert subId' (SubscriptionDetails subId' fs q 0 0) (activeSubscriptions rd') }) - r - (activeConnections st') - } - return $ pure q + Subscribe r subId' fs -> createSubscription r subId' fs StopSubscription subId' -> do st <- get @RelayPoolState @@ -99,116 +85,215 @@ runSubscription = interpret $ \_ -> \case } Nothing -> return () - HandleEvent _ _ _ event' -> do - -- @todo validate event against filters ?? - if not (validateEvent event') - then do - logWarning $ "Invalid event seen: " <> (byteStringToHex $ getEventId (eventId event')) - pure emptyUpdates - else do - logDebug $ "Handling event of kind " <> pack (show $ kind event') <> " with id " <> (byteStringToHex $ getEventId (eventId event')) - case kind event' of - Metadata -> do - case eitherDecode (fromStrict $ encodeUtf8 $ content event') of - Right profile -> do - st <- get @AppState - let isOwnProfile = maybe False (\kp -> pubKey event' == keyPairToPubKeyXO kp) (keyPair st) - - modify $ \s -> s { profiles = Map.insertWith (\new old -> if snd new > snd old then new else old) - (pubKey event') - (profile, createdAt event') - (profiles s) - } - - when isOwnProfile $ do - let aid = AccountId $ pubKeyXOToBech32 (pubKey event') - updateProfile aid profile - - pure $ emptyUpdates { profilesChanged = True } - Left err -> do - logWarning $ "Failed to decode metadata: " <> pack err - pure emptyUpdates - - FollowList -> do - let followList' = [Follow pk (fmap InboxRelay relay') petName' | PTag pk relay' petName' <- tags event'] - modify $ \st -> st { follows = Map.insert (pubKey event') followList' (follows st) } - pure $ emptyUpdates { followsChanged = True } - - GiftWrap -> do - handleGiftWrapEvent event' - pure $ emptyUpdates { chatsChanged = True } - - RelayListMetadata -> do - logDebug $ pack $ show event' - let validRelayTags = [ r' | RelayTag r' <- tags event', isValidRelayURI (getUri r') ] - ts = createdAt event' - pk = pubKey event' - case validRelayTags of - [] -> do - logWarning $ "No valid relay URIs found in RelayListMetadata event from " - <> (pubKeyXOToBech32 pk) - pure emptyUpdates - relays -> do - importGeneralRelays pk relays ts - -- @todo auto connect to new relays, disconnect from old ones - -- IF the event is from our pubkey - logDebug $ "Updated relay list for " <> (pubKeyXOToBech32 pk) - <> " with " <> pack (show $ length relays) <> " relays" - pure $ emptyUpdates { generalRelaysChanged = True } - - PreferredDMRelays -> do - logDebug $ "Handling PreferredDMRelays event: " <> pack (show event') - let validRelayTags = [ r' | RelayTag r' <- tags event', isValidRelayURI (getUri r') ] - case validRelayTags of - [] -> do - logWarning $ "No valid relay URIs found in PreferredDMRelays event from " - <> (pubKeyXOToBech32 $ pubKey event') - pure emptyUpdates - preferredRelays -> do - importDMRelays (pubKey event') preferredRelays (createdAt event') - -- @todo auto connect to new relays, disconnect from old ones - -- IF the event is from our pubkey - pure $ emptyUpdates { dmRelaysChanged = True } - - _ -> do - logDebug $ "Ignoring event of kind: " <> pack (show (kind event')) - pure emptyUpdates - where - isValidRelayURI :: RelayURI -> Bool - isValidRelayURI uriText = - case parseURI (unpack uriText) of - Just uri -> - let scheme = uriScheme uri - authority = uriAuthority uri - in (scheme == "ws:" || scheme == "wss:") && - maybe False (not . null . uriRegName) authority - Nothing -> False - - --- Keep the most recent relay list and timestamp -keepMostRecent :: ([Relay], Int) -> ([Relay], Int) -> ([Relay], Int) -keepMostRecent (newRelays, newTime) (oldRelays, oldTime) - | newTime > oldTime = (newRelays, newTime) - | otherwise = (oldRelays, oldTime) - - -updateGeneralRelays :: SubscriptionEff es => PubKeyXO -> [Relay] -> Int -> Eff es () -updateGeneralRelays pk relays ts = do - modify @RelayPoolState $ \st -> st - { generalRelays = Map.insertWith keepMostRecent - pk - (relays, ts) - (generalRelays st) - } - notifyRelayStatus - - -updateDMRelays :: SubscriptionEff es => PubKeyXO -> [Relay] -> Int -> Eff es () -updateDMRelays pk relays ts = do - modify @RelayPoolState $ \st -> st - { dmRelays = Map.insertWith keepMostRecent - pk - (relays, ts) - (dmRelays st) - } - notifyRelayStatus + HandleEvent _ _ _ event' -> handleEvent' event' + + +handleEvent' :: SubscriptionEff es => Event -> Eff es UIUpdates +handleEvent' event' = do + -- @todo validate event against filters ?? + if not (validateEvent event') + then do + logWarning $ "Invalid event seen: " <> (byteStringToHex $ getEventId (eventId event')) + pure emptyUpdates + else do + logDebug $ "Handling event of kind " <> pack (show $ kind event') <> " with id " <> (byteStringToHex $ getEventId (eventId event')) + case kind event' of + Metadata -> do + case eitherDecode (fromStrict $ encodeUtf8 $ content event') of + Right profile -> do + st <- get @AppState + let isOwnProfile = maybe False (\kp -> pubKey event' == keyPairToPubKeyXO kp) (keyPair st) + + modify $ \s -> s { profiles = Map.insertWith (\new old -> if snd new > snd old then new else old) + (pubKey event') + (profile, createdAt event') + (profiles s) + } + + when isOwnProfile $ do + let aid = AccountId $ pubKeyXOToBech32 (pubKey event') + updateProfile aid profile + + pure $ emptyUpdates { profilesChanged = True } + Left err -> do + logWarning $ "Failed to decode metadata: " <> pack err + pure emptyUpdates + + FollowList -> do + let followList' = [Follow pk (fmap InboxRelay relay') petName' | PTag pk relay' petName' <- tags event'] + modify $ \st -> st { follows = Map.insert (pubKey event') followList' (follows st) } + pure $ emptyUpdates { followsChanged = True } + + GiftWrap -> do + handleGiftWrapEvent event' + pure $ emptyUpdates { chatsChanged = True } + + RelayListMetadata -> do + logDebug $ pack $ show event' + let validRelayTags = [ r' | RelayTag r' <- tags event', isValidRelayURI (getUri r') ] + ts = createdAt event' + pk = pubKey event' + case validRelayTags of + [] -> do + logWarning $ "No valid relay URIs found in RelayListMetadata event from " + <> (pubKeyXOToBech32 pk) + pure emptyUpdates + relays -> do + importGeneralRelays pk relays ts + -- @todo auto connect to new relays, disconnect from old ones + -- IF the event is from our pubkey + logDebug $ "Updated relay list for " <> (pubKeyXOToBech32 pk) + <> " with " <> pack (show $ length relays) <> " relays" + pure $ emptyUpdates { generalRelaysChanged = True } + + PreferredDMRelays -> do + logDebug $ "Handling PreferredDMRelays event: " <> pack (show event') + let validRelayTags = [ r' | RelayTag r' <- tags event', isValidRelayURI (getUri r') ] + case validRelayTags of + [] -> do + logWarning $ "No valid relay URIs found in PreferredDMRelays event from " + <> (pubKeyXOToBech32 $ pubKey event') + pure emptyUpdates + preferredRelays -> do + st <- get @RelayPoolState + case Map.lookup (pubKey event') (dmRelays st) of + Just (existingRelays, ts) -> do + when (ts < createdAt event') $ do + forM_ existingRelays $ \r -> disconnectRelay $ getUri r + importDMRelays (pubKey event') preferredRelays (createdAt event') + forM_ preferredRelays $ \r -> async $ do + connected <- connectRelay $ getUri r + when connected $ handleRelaySubscription $ getUri r + _ -> do + importDMRelays (pubKey event') preferredRelays (createdAt event') + forM_ preferredRelays $ \r -> async $ do + connected <- connectRelay $ getUri r + when connected $ handleRelaySubscription $ getUri r + + pure $ emptyUpdates { dmRelaysChanged = True } + + _ -> do + logDebug $ "Ignoring event of kind: " <> pack (show (kind event')) + pure emptyUpdates + where + isValidRelayURI :: RelayURI -> Bool + isValidRelayURI uriText = + case parseURI (unpack uriText) of + Just uri -> + let scheme = uriScheme uri + authority = uriAuthority uri + in (scheme == "ws:" || scheme == "wss:") && + maybe False (not . null . uriRegName) authority + Nothing -> False + + +-- | Create a subscription +createSubscription :: SubscriptionEff es + => RelayURI + -> SubscriptionId + -> [Filter] + -> Eff es (Maybe (TQueue SubscriptionEvent)) +createSubscription r subId' fs = do + st <- get @RelayPoolState + case Map.lookup r (activeConnections st) of + Nothing -> do + logWarning $ "Cannot start subscription: no connection found for relay: " <> r + return Nothing + Just rd -> do + let channel = requestChannel rd + atomically $ writeTChan channel (NT.Subscribe $ NT.Subscription subId' fs) + q <- newTQueueIO + modify @RelayPoolState $ \st' -> + st { activeConnections = Map.adjust + (\rd' -> rd' { activeSubscriptions = Map.insert subId' (SubscriptionDetails subId' fs q 0 0) (activeSubscriptions rd') }) + r + (activeConnections st') + } + return $ pure q + + +-- | Determine relay type and start appropriate subscriptions +handleRelaySubscription :: SubscriptionEff es => RelayURI -> Eff es () +handleRelaySubscription r = do + kp <- getKeyPair + let pk = keyPairToPubKeyXO kp + st <- get @AppState + let followPks = maybe [] (map (\(Follow pk' _ _) -> pk')) $ Map.lookup pk (follows st) + st' <- get @RelayPoolState + + -- Check if it's a DM relay + let isDM = any (\(_, (relays, _)) -> + any (\relay -> getUri relay == r) relays) + (Map.toList $ dmRelays st') + + -- Check if it's an inbox-capable relay + let isInbox = any (\(_, (relays, _)) -> + any (\relay -> case relay of + InboxRelay uri -> uri == r + InboxOutboxRelay uri -> uri == r + _ -> False) relays) + (Map.toList $ generalRelays st') + + -- Start appropriate subscriptions based on relay type + let fs = if isDM then Just $ createDMRelayFilters pk followPks + else if isInbox then Just $ createInboxRelayFilters pk followPks + else Nothing + + logInfo $ "Starting subscription for " <> r <> " with filters " <> pack (show fs) + + case fs of + Just fs' -> do + subId' <- generateRandomSubscriptionId + mq <- createSubscription r subId' fs' + case mq of + Just q -> do + shouldStop <- newTVarIO False + + let loop = do + e <- atomically $ readTQueue q + es <- atomically $ flushTQueue q + + updates <- fmap mconcat $ forM (e : es) $ \case + EventAppeared event' -> handleEvent' event' + SubscriptionEose -> return emptyUpdates + SubscriptionClosed _ -> do + atomically $ writeTVar shouldStop True + return emptyUpdates + + notify updates + + shouldStopNow <- atomically $ readTVar shouldStop + + if shouldStopNow + then return () + else loop + + loop + Nothing -> logWarning $ "Failed to start subscription for " <> r + Nothing -> return () -- Outbox only relay or unknown relay, no subscriptions needed + + +-- | Create DM relay subscription filters +createDMRelayFilters :: PubKeyXO -> [PubKeyXO] -> [Filter] +createDMRelayFilters xo followedPubKeys = + [ NT.metadataFilter (xo : followedPubKeys) + , NT.preferredDMRelaysFilter (xo : followedPubKeys) + , NT.giftWrapFilter xo + ] + + +-- | Create inbox relay subscription filters +createInboxRelayFilters :: PubKeyXO -> [PubKeyXO] -> [Filter] +createInboxRelayFilters xo followedPubKeys = + [ NT.followListFilter (xo : followedPubKeys) + , NT.metadataFilter (xo : followedPubKeys) + , NT.preferredDMRelaysFilter (xo : followedPubKeys) + ] + + +-- | Generate a random subscription ID +generateRandomSubscriptionId :: SubscriptionEff es => Eff es SubscriptionId +generateRandomSubscriptionId = do + bytes <- liftIO $ replicateM 8 randomIO + let byteString = BS.pack bytes + return $ decodeUtf8 $ B16.encode byteString diff --git a/src/Presentation/RelayMgmt.hs b/src/Presentation/RelayMgmt.hs index 6559f89..d216f70 100644 --- a/src/Presentation/RelayMgmt.hs +++ b/src/Presentation/RelayMgmt.hs @@ -18,6 +18,7 @@ import Effectful.TH import Graphics.QML hiding (fireSignal, runEngineLoop) import EffectfulQML (EffectfulQMLState(..)) +import Logging import Nostr.Keys (keyPairToPubKeyXO) import Nostr.RelayPool import Nostr.Types hiding (displayName, picture) @@ -33,6 +34,7 @@ type RelayMgmgtUIEff es = , State RelayPoolState :> es , State EffectfulQMLState :> es , RelayPool :> es + , Logging :> es , Concurrent :> es , Util :> es , IOE :> es @@ -124,6 +126,7 @@ runRelayMgmtUI action = interpret handleRelayMgmtUI action contextClass <- newClass [ defPropertySigRO' "dmRelays" changeKey $ \obj -> do + runE $ logDebug "dmRelays property" runE $ modify @EffectfulQMLState $ \s -> s { uiRefs = (uiRefs s) { dmRelaysObjRef = Just obj } } @@ -136,6 +139,7 @@ runRelayMgmtUI action = interpret handleRelayMgmtUI action mapM (\(relay, _status) -> getPoolObject dmRelayPool (getUri relay)) relaysWithStatus, defPropertySigRO' "generalRelays" changeKey $ \obj -> do + runE $ logDebug "generalRelays property" runE $ modify @EffectfulQMLState $ \s -> s { uiRefs = (uiRefs s) { generalRelaysObjRef = Just obj } } diff --git a/src/UI.hs b/src/UI.hs index 56d803b..cf6da38 100644 --- a/src/UI.hs +++ b/src/UI.hs @@ -20,7 +20,6 @@ import EffectfulQML import Graphics.QML hiding (fireSignal, runEngineLoop) import Text.Read (readMaybe) -import EffectfulQML (EffectfulQMLState(..)) import Logging import Nostr.Bech32 import Nostr.Event