Skip to content

Commit

Permalink
update reader timeout setting in subscription (#1387)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Apr 28, 2023
1 parent 722fc61 commit a0799d0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 15 deletions.
23 changes: 8 additions & 15 deletions hstream/src/HStream/Server/Core/Subscription.hs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,9 @@ doSubInit ServerContext{..} subId = do
-- create a ldReader for rereading unacked records
Log.info $ "Create a reader for " <> Log.build subId
reader <- S.newLDReader scLDClient maxReadLogs (Just ldReaderBufferSize)
S.readerSetTimeout reader 10 -- 10 milliseconds
-- reader reads the data and delivers it immediately, otherwise it waits up to 1s
S.readerSetTimeout reader 1000 -- 1 seconds
S.readerSetWaitOnlyWhenNoData reader
ldReader <- newMVar reader

trimCkpWorker <- startCompactedWorker (60 * 1000000){- 60s -} $ do
Expand Down Expand Up @@ -732,11 +734,12 @@ sendRecords ServerContext{..} subState subCtx@SubscribeContext {..} = do
<> ", logId=" <> Log.build logId <> ", batchId=" <> Log.build batchId
dataRecord <- withMVar subLdReader $ \reader -> do
S.readerStartReading reader logId batchId batchId
readLoop reader 3
if length dataRecord /= 1
S.readerRead reader 1
if null dataRecord
then do
Log.fatal $ "read error on `readerRead`. Expect 1 record but got " <> Log.build (length dataRecord)
<> ", logId " <> Log.build logId <> ", batchId " <> Log.build batchId
Log.info $ "sub resend reader read empty records from log "
<> Log.build logId <> ", batchId " <> Log.build batchId
<> ", retry next time"
else do
(_, _, _, ReceivedRecord{..}) <- decodeRecordBatch $ head dataRecord
let batchRecords@BatchedRecord{..} = fromJust receivedRecordRecord
Expand All @@ -753,16 +756,6 @@ sendRecords ServerContext{..} subState subCtx@SubscribeContext {..} = do
resendBatch = mkBatchedRecord batchedRecordCompressionType batchedRecordPublishTime (fromIntegral $ V.length records) records
resendRecords = ReceivedRecord ids (Just resendBatch)
void $ sendReceivedRecords logId batchId resendRecordIds resendRecords True
where
readLoop reader n
| n == 0 = return []
| otherwise = do
res <- S.readerRead reader 1
if null res
then do
Log.warning $ "reader read got empty result, logId " <> Log.build logId <> ", batchId " <> Log.build batchId <> ", retry."
readLoop reader (n - 1)
else return res

filterUnackedRecordIds recordIds ackedRanges windowLowerBound =
flip V.filter recordIds $ \recordId ->
Expand Down
2 changes: 2 additions & 0 deletions hstream/test/HStream/RunSQLSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ viewSpecAround = provideRunTest setup clean
clean api (source1, source2, viewName, qName1, qName2) = do
runTerminateSql api $ "TERMINATE QUERY " <> qName1 <> ";"
runTerminateSql api $ "TERMINATE QUERY " <> qName2 <> ";"
-- FIXME: wait the query terminated
threadDelay 10000000
runDropSql api $ "DROP VIEW " <> viewName <> " IF EXISTS;"
runDropSql api $ "DROP STREAM " <> source2 <> " IF EXISTS;"
runDropSql api $ "DROP STREAM " <> source1 <> " IF EXISTS;"
Expand Down

0 comments on commit a0799d0

Please sign in to comment.