Skip to content

Commit

Permalink
Fix(sub): interrupt all consumers when a subscription is deleting or … (
Browse files Browse the repository at this point in the history
  • Loading branch information
daleiz authored Jul 16, 2023
1 parent cdafea5 commit c7bc55a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
24 changes: 18 additions & 6 deletions hstream/src/HStream/Server/Core/Subscription.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ module HStream.Server.Core.Subscription where
import Control.Concurrent
import Control.Concurrent.Async (async, link, wait, withAsync)
import Control.Concurrent.STM
import Control.Exception (catch, fromException, handle,
onException, throwIO)
import Control.Exception (Exception, catch, fromException,
handle, onException, throwIO)
import Control.Monad
import qualified Data.ByteString as BS
import Data.Foldable (foldl')
Expand Down Expand Up @@ -369,20 +369,19 @@ initSub serverCtx@ServerContext {..} subId = M.getMeta subId metaHandle >>= \cas
putTMVar scnwContext subCtx
writeTVar scnwState SubscribeStateRunning
return SubscribeContextWrapper {scwState = scnwState, scwContext = subCtx}
subTid <- myThreadId
let errHandler = \case
Left e -> do
let ex = fromException e :: Maybe HE.SubscriptionIsDeleting
case ex of
Just _ -> throwTo subTid e
Just _ -> throwToAllConsumers subCtx e
Nothing -> do
Log.fatal $ Log.buildString "An unexpected error happened while subscription "
<> Log.build subId <> " running: " <> Log.build (show e)
let SubscribeContext{..} = subCtx
atomically $ writeTVar scnwState SubscribeStateFailed
atomically $ removeSubFromCtx serverCtx subId
stopCompactedWorker subLdTrimCkpWorker
throwTo subTid e
throwToAllConsumers subCtx e
Right _ -> pure ()
tid <- forkFinally (sendRecords serverCtx scwState scwContext) errHandler
return (wrapper, Just tid)
Expand Down Expand Up @@ -515,18 +514,21 @@ initConsumer
-> IO ConsumerContext
initConsumer SubscribeContext {subAssignment = Assignment{..}, ..} consumerName uri agent streamSend = do
sender <- newMVar streamSend
tid <- myThreadId
res <- atomically $ do
cMap <- readTVar subConsumerContexts
when (HM.member consumerName cMap) $ throwSTM (HE.ConsumerExists $ T.unpack consumerName)
modifyTVar' waitingConsumers (\consumers -> consumers ++ [consumerName])

isValid <- newTVar True

let cc = ConsumerContext
{ ccConsumerName = consumerName,
ccConsumerUri = uri,
ccConsumerAgent = agent,
ccIsValid = isValid,
ccStreamSend = sender
ccStreamSend = sender,
ccThreadId = tid
}
writeTVar subConsumerContexts (HM.insert consumerName cc cMap)
return cc
Expand Down Expand Up @@ -1238,3 +1240,13 @@ addUnackedRecords SubscribeContext {..} count = do
checkSubscriptionExist :: ServerContext -> Text -> IO Bool
checkSubscriptionExist ServerContext{..} sid =
M.checkMetaExists @SubscriptionWrap sid metaHandle

throwToAllConsumers :: Exception e => SubscribeContext -> e -> IO ()
throwToAllConsumers SubscribeContext{..} e = do
subMasterTid <- myThreadId
consumerCtxs <- atomically $ do
consumerCtxs <- readTVar subConsumerContexts
return $ HM.elems consumerCtxs
forM_ consumerCtxs (\ConsumerContext{..} -> when (ccThreadId /= subMasterTid) $ throwTo ccThreadId e )
throwIO e

4 changes: 3 additions & 1 deletion hstream/src/HStream/Server/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ data ConsumerContext = ConsumerContext
ccIsValid :: TVar Bool,
-- use MVar for streamSend because only on thread can use streamSend at the
-- same time
ccStreamSend :: MVar (StreamSend StreamingFetchResponse)
ccStreamSend :: MVar (StreamSend StreamingFetchResponse),
-- threadId of the thread handling streamingFetchRequest for this consumer
ccThreadId :: ThreadId
}

data SubscribeShardContext = SubscribeShardContext
Expand Down

0 comments on commit c7bc55a

Please sign in to comment.