diff --git a/hstream-processing/src/HStream/Processing/Processor.hs b/hstream-processing/src/HStream/Processing/Processor.hs index 5d4f8f950..1e5bd303d 100644 --- a/hstream-processing/src/HStream/Processing/Processor.hs +++ b/hstream-processing/src/HStream/Processing/Processor.hs @@ -129,6 +129,9 @@ taskBuilderWithName builder taskName = { ttcName = taskName } +-- | Run a task. This function will block the current thread. +-- And this function will throw an exception if any (unrecoverable) +-- error occurs, see the comments at 'go' following. runTask :: (ChangeLogger h1, Snapshotter h2) => StatsHolder @@ -174,13 +177,35 @@ runTask statsHolder SourceConnectorWithoutCkp {..} sinkConnector taskBuilder@Tas False -> subscribeToStreamWithoutCkp stream API.SpecialOffsetLATEST chan <- newTChanIO + + -- Note: Start many threads of 'f' and one thread of 'g': + -- 1. 'f' is responsible for reading records from source streams and + -- writing them to the channel. + -- 2. 'g' is responsible for reading records from the channel and doing + -- the actual processing + + -- [important] we use 'forConcurrently_' to ensure other threads are + -- cancelled when one of them throws an exception. + -- [important] we use 'waitEitherCancel' to ensure both threads are + -- cleaned up when one of them throws an exception. + -- The 'catch' and re-'throwIO' here is just for setting + -- 'connectorClosed'. Otherwise, the consumers will not + -- exit and the subscriptions will not be deleted. We + -- do not do any error handling here and re-thow it to + -- the top level. + + -- 'forConcurrently_' here: important! See the comment above. withAsync (forConcurrently_ sourceStreamNames (f chan connectorClosed)) $ \a -> withAsync (g task ctx chan) $ \b -> - waitEither_ a b `finally` do + -- 'catch' here: important! See the comment above. + void (waitEitherCancel a b) `catch` \(e :: SomeException) -> do + atomically $ writeTVar connectorClosed True forM_ sourceStreamNames (\stream -> do isSubscribedToStreamWithoutCkp stream >>= \case True -> unSubscribeToStreamWithoutCkp stream False -> return () ) + -- 'throwIO' here: important! See the comment above. + throwIO e where f :: TChan ([SourceRecord], MVar ()) -> TVar Bool -> T.Text -> IO () f chan consumerClosed sourceStreamName = @@ -195,30 +220,44 @@ runTask statsHolder SourceConnectorWithoutCkp {..} sinkConnector taskBuilder@Tas g :: Task -> TaskContext -> TChan ([SourceRecord], MVar ()) -> IO () g task@Task{..} ctx chan = do timer <- newIORef False - tid <- forkIO . forever $ do + -- Start two threads here. One is used to set the timer of snapshotting + -- and the other is used to do the **snapshotting** and **processing**. + + -- [important] we use 'waitEitherCancel' to ensure both threads are + -- cancelled when one of them throws an exception. + withAsync (forever $ do Control.Concurrent.threadDelay $ 10 * 1000 * 1000 - atomicWriteIORef timer True - forever (do - readIORef timer >>= \case - False -> go - True -> do - doSnapshot task - atomicWriteIORef timer False - ) `onException` (Control.Concurrent.killThread tid) + atomicWriteIORef timer True) $ \a -> withAsync (forever $ do + readIORef timer >>= \case + False -> go + True -> do + doSnapshot task + atomicWriteIORef timer False) $ \b -> void (waitEitherCancel a b) where go = do (sourceRecords, mvar) <- atomically $ readTChan chan - runRIO ctx $ forM_ sourceRecords $ \r@SourceRecord {..} -> do + runRIO ctx $ forM_ sourceRecords $ \SourceRecord {..} -> do let acSourceName = iSourceName (taskSourceConfig HM'.! srcStream) let (sourceEProcessor, _) = taskTopologyForward HM'.! acSourceName liftIO $ updateTimestampInTaskContext ctx srcTimestamp + -- [WARNING] [FIXME] + -- The following code means that we only consider 'StreamNotFound' + -- as a fatal error so we re-throw it. This causes all threads + -- related to this task to exit. As for other errors, we just + -- log them and continue. + -- HOWEVER, this should be re-considered. Which errors are actually + -- fatal? catches (runEP sourceEProcessor (mkERecord Record {recordKey = srcKey, recordValue = srcValue, recordTimestamp = srcTimestamp})) [ Handler $ \(err :: HE.StreamNotFound) -> do liftIO $ query_stat_add_total_execute_errors statsHolder (textToCBytes qid) 1 + -- 'throw' here: very important! Or the threads related to the + -- task will not be cleaned up. See above. throw err , Handler $ \(err :: SomeException) -> do liftIO $ Log.warning $ Log.buildString' err liftIO $ query_stat_add_total_execute_errors statsHolder (textToCBytes qid) 1 + -- No 'throw' here: very important! Just omit the error and + -- continue processing. See above. ] liftIO $ query_stat_add_total_output_records statsHolder (textToCBytes qid) 1 -- NOTE: tell the server that we have processed this "batch" of records diff --git a/hstream/src/HStream/Server/Handler/Common.hs b/hstream/src/HStream/Server/Handler/Common.hs index 261e930c1..8014949b8 100644 --- a/hstream/src/HStream/Server/Handler/Common.hs +++ b/hstream/src/HStream/Server/Handler/Common.hs @@ -348,6 +348,7 @@ doSnapshot h1 h2 Task{..} = do -------------------------------------------------------------------------------- +-- This function may throw exceptions just like 'runTask'. runTaskWrapper :: ServerContext -> SourceConnectorWithoutCkp -> SinkConnector -> TaskBuilder -> Text -> S.C_LogID -> IO () runTaskWrapper ServerContext{..} sourceConnector sinkConnector taskBuilder queryId logId = do -- RUN TASK