Skip to content

Commit

Permalink
processing: clean up threads properly on exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed Jul 17, 2023
1 parent c7bc55a commit 8132647
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 11 deletions.
61 changes: 50 additions & 11 deletions hstream-processing/src/HStream/Processing/Processor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ taskBuilderWithName builder taskName =
{ ttcName = taskName
}

-- | Run a task. This function will block the current thread.
-- And this function will throw an exception if any (unrecoverable)
-- error occurs, see the comments at 'go' following.
runTask
:: (ChangeLogger h1, Snapshotter h2)
=> StatsHolder
Expand Down Expand Up @@ -174,13 +177,35 @@ runTask statsHolder SourceConnectorWithoutCkp {..} sinkConnector taskBuilder@Tas
False -> subscribeToStreamWithoutCkp stream API.SpecialOffsetLATEST

chan <- newTChanIO

-- Note: Start many threads of 'f' and one thread of 'g':
-- 1. 'f' is responsible for reading records from source streams and
-- writing them to the channel.
-- 2. 'g' is responsible for reading records from the channel and doing
-- the actual processing

-- [important] we use 'forConcurrently_' to ensure other threads are
-- cancelled when one of them throws an exception.
-- [important] we use 'waitEitherCancel' to ensure both threads are
-- cleaned up when one of them throws an exception.
-- The 'catch' and re-'throwIO' here is just for setting
-- 'connectorClosed'. Otherwise, the consumers will not
-- exit and the subscriptions will not be deleted. We
-- do not do any error handling here and re-thow it to
-- the top level.

-- 'forConcurrently_' here: important! See the comment above.
withAsync (forConcurrently_ sourceStreamNames (f chan connectorClosed)) $ \a ->
withAsync (g task ctx chan) $ \b ->
waitEither_ a b `finally` do
-- 'catch' here: important! See the comment above.
void (waitEitherCancel a b) `catch` \(e :: SomeException) -> do
atomically $ writeTVar connectorClosed True
forM_ sourceStreamNames (\stream -> do
isSubscribedToStreamWithoutCkp stream >>= \case
True -> unSubscribeToStreamWithoutCkp stream
False -> return () )
-- 'throwIO' here: important! See the comment above.
throwIO e
where
f :: TChan ([SourceRecord], MVar ()) -> TVar Bool -> T.Text -> IO ()
f chan consumerClosed sourceStreamName =
Expand All @@ -195,30 +220,44 @@ runTask statsHolder SourceConnectorWithoutCkp {..} sinkConnector taskBuilder@Tas
g :: Task -> TaskContext -> TChan ([SourceRecord], MVar ()) -> IO ()
g task@Task{..} ctx chan = do
timer <- newIORef False
tid <- forkIO . forever $ do
-- Start two threads here. One is used to set the timer of snapshotting
-- and the other is used to do the **snapshotting** and **processing**.

-- [important] we use 'waitEitherCancel' to ensure both threads are
-- cancelled when one of them throws an exception.
withAsync (forever $ do
Control.Concurrent.threadDelay $ 10 * 1000 * 1000
atomicWriteIORef timer True
forever (do
readIORef timer >>= \case
False -> go
True -> do
doSnapshot task
atomicWriteIORef timer False
) `onException` (Control.Concurrent.killThread tid)
atomicWriteIORef timer True) $ \a -> withAsync (forever $ do
readIORef timer >>= \case
False -> go
True -> do
doSnapshot task
atomicWriteIORef timer False) $ \b -> void (waitEitherCancel a b)
where
go = do
(sourceRecords, mvar) <- atomically $ readTChan chan
runRIO ctx $ forM_ sourceRecords $ \r@SourceRecord {..} -> do
runRIO ctx $ forM_ sourceRecords $ \SourceRecord {..} -> do
let acSourceName = iSourceName (taskSourceConfig HM'.! srcStream)
let (sourceEProcessor, _) = taskTopologyForward HM'.! acSourceName
liftIO $ updateTimestampInTaskContext ctx srcTimestamp
-- [WARNING] [FIXME]
-- The following code means that we only consider 'StreamNotFound'
-- as a fatal error so we re-throw it. This causes all threads
-- related to this task to exit. As for other errors, we just
-- log them and continue.
-- HOWEVER, this should be re-considered. Which errors are actually
-- fatal?
catches (runEP sourceEProcessor (mkERecord Record {recordKey = srcKey, recordValue = srcValue, recordTimestamp = srcTimestamp}))
[ Handler $ \(err :: HE.StreamNotFound) -> do
liftIO $ query_stat_add_total_execute_errors statsHolder (textToCBytes qid) 1
-- 'throw' here: very important! Or the threads related to the
-- task will not be cleaned up. See above.
throw err
, Handler $ \(err :: SomeException) -> do
liftIO $ Log.warning $ Log.buildString' err
liftIO $ query_stat_add_total_execute_errors statsHolder (textToCBytes qid) 1
-- No 'throw' here: very important! Just omit the error and
-- continue processing. See above.
]
liftIO $ query_stat_add_total_output_records statsHolder (textToCBytes qid) 1
-- NOTE: tell the server that we have processed this "batch" of records
Expand Down
1 change: 1 addition & 0 deletions hstream/src/HStream/Server/Handler/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ doSnapshot h1 h2 Task{..} = do

--------------------------------------------------------------------------------

-- This function may throw exceptions just like 'runTask'.
runTaskWrapper :: ServerContext -> SourceConnectorWithoutCkp -> SinkConnector -> TaskBuilder -> Text -> S.C_LogID -> IO ()
runTaskWrapper ServerContext{..} sourceConnector sinkConnector taskBuilder queryId logId = do
-- RUN TASK
Expand Down

0 comments on commit 8132647

Please sign in to comment.