From c7bc55a2d835b81f0f91144ad84b273689efd26e Mon Sep 17 00:00:00 2001 From: daleiz <30970925+daleiz@users.noreply.github.com> Date: Sun, 16 Jul 2023 10:02:51 +0800 Subject: [PATCH] =?UTF-8?q?Fix(sub):=20interrupt=20all=20consumers=20when?= =?UTF-8?q?=20a=20subscription=20is=20deleting=20or=20=E2=80=A6=20(#1513)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/HStream/Server/Core/Subscription.hs | 24 ++++++++++++++----- hstream/src/HStream/Server/Types.hs | 4 +++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/hstream/src/HStream/Server/Core/Subscription.hs b/hstream/src/HStream/Server/Core/Subscription.hs index dc92d68c7..3de7b3e8b 100644 --- a/hstream/src/HStream/Server/Core/Subscription.hs +++ b/hstream/src/HStream/Server/Core/Subscription.hs @@ -11,8 +11,8 @@ module HStream.Server.Core.Subscription where import Control.Concurrent import Control.Concurrent.Async (async, link, wait, withAsync) import Control.Concurrent.STM -import Control.Exception (catch, fromException, handle, - onException, throwIO) +import Control.Exception (Exception, catch, fromException, + handle, onException, throwIO) import Control.Monad import qualified Data.ByteString as BS import Data.Foldable (foldl') @@ -369,12 +369,11 @@ initSub serverCtx@ServerContext {..} subId = M.getMeta subId metaHandle >>= \cas putTMVar scnwContext subCtx writeTVar scnwState SubscribeStateRunning return SubscribeContextWrapper {scwState = scnwState, scwContext = subCtx} - subTid <- myThreadId let errHandler = \case Left e -> do let ex = fromException e :: Maybe HE.SubscriptionIsDeleting case ex of - Just _ -> throwTo subTid e + Just _ -> throwToAllConsumers subCtx e Nothing -> do Log.fatal $ Log.buildString "An unexpected error happened while subscription " <> Log.build subId <> " running: " <> Log.build (show e) @@ -382,7 +381,7 @@ initSub serverCtx@ServerContext {..} subId = M.getMeta subId metaHandle >>= \cas atomically $ writeTVar scnwState SubscribeStateFailed atomically $ removeSubFromCtx serverCtx subId stopCompactedWorker subLdTrimCkpWorker - throwTo subTid e + throwToAllConsumers subCtx e Right _ -> pure () tid <- forkFinally (sendRecords serverCtx scwState scwContext) errHandler return (wrapper, Just tid) @@ -515,18 +514,21 @@ initConsumer -> IO ConsumerContext initConsumer SubscribeContext {subAssignment = Assignment{..}, ..} consumerName uri agent streamSend = do sender <- newMVar streamSend + tid <- myThreadId res <- atomically $ do cMap <- readTVar subConsumerContexts when (HM.member consumerName cMap) $ throwSTM (HE.ConsumerExists $ T.unpack consumerName) modifyTVar' waitingConsumers (\consumers -> consumers ++ [consumerName]) isValid <- newTVar True + let cc = ConsumerContext { ccConsumerName = consumerName, ccConsumerUri = uri, ccConsumerAgent = agent, ccIsValid = isValid, - ccStreamSend = sender + ccStreamSend = sender, + ccThreadId = tid } writeTVar subConsumerContexts (HM.insert consumerName cc cMap) return cc @@ -1238,3 +1240,13 @@ addUnackedRecords SubscribeContext {..} count = do checkSubscriptionExist :: ServerContext -> Text -> IO Bool checkSubscriptionExist ServerContext{..} sid = M.checkMetaExists @SubscriptionWrap sid metaHandle + +throwToAllConsumers :: Exception e => SubscribeContext -> e -> IO () +throwToAllConsumers SubscribeContext{..} e = do + subMasterTid <- myThreadId + consumerCtxs <- atomically $ do + consumerCtxs <- readTVar subConsumerContexts + return $ HM.elems consumerCtxs + forM_ consumerCtxs (\ConsumerContext{..} -> when (ccThreadId /= subMasterTid) $ throwTo ccThreadId e ) + throwIO e + diff --git a/hstream/src/HStream/Server/Types.hs b/hstream/src/HStream/Server/Types.hs index 57e40dba0..14e25a90c 100644 --- a/hstream/src/HStream/Server/Types.hs +++ b/hstream/src/HStream/Server/Types.hs @@ -180,7 +180,9 @@ data ConsumerContext = ConsumerContext ccIsValid :: TVar Bool, -- use MVar for streamSend because only on thread can use streamSend at the -- same time - ccStreamSend :: MVar (StreamSend StreamingFetchResponse) + ccStreamSend :: MVar (StreamSend StreamingFetchResponse), + -- threadId of the thread handling streamingFetchRequest for this consumer + ccThreadId :: ThreadId } data SubscribeShardContext = SubscribeShardContext