Skip to content

Commit

Permalink
call trim concurrently in trimShards and trimStream (#1571)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Aug 22, 2023
1 parent 531aaab commit 088dfc3
Showing 1 changed file with 32 additions and 17 deletions.
49 changes: 32 additions & 17 deletions hstream/src/HStream/Server/Core/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@ module HStream.Server.Core.Stream
, trimShards
) where

import Control.Exception (catch, throwIO)
import Control.Monad (forM, forM_, unless, when)
import Control.Concurrent (getNumCapabilities)
import Control.Concurrent.Async (mapConcurrently)
import Control.Concurrent.QSem (QSem, newQSem, signalQSem,
waitQSem)
import Control.Exception (bracket_, catch, throwIO)
import Control.Monad (forM, forM_, unless, void,
when)
import qualified Data.Attoparsec.Text as AP
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Either (partitionEithers)
import Data.Functor ((<&>))
import qualified Data.List as L
import qualified Data.Map.Strict as M
Expand All @@ -38,11 +44,6 @@ import qualified Data.Vector as V
import Data.Word (Word32, Word64)
import GHC.Stack (HasCallStack)
import Google.Protobuf.Timestamp (Timestamp)
import qualified Proto3.Suite as PT
import qualified Z.Data.CBytes as CB
import qualified ZooKeeper.Exception as ZK

import Data.Either (partitionEithers)
import HStream.Base.Time (getSystemNsTimestamp)
import HStream.Common.Types
import qualified HStream.Common.ZookeeperSlotAlloc as Slot
Expand All @@ -59,6 +60,9 @@ import HStream.Server.Types (ServerContext (..),
import qualified HStream.Stats as Stats
import qualified HStream.Store as S
import HStream.Utils
import qualified Proto3.Suite as PT
import qualified Z.Data.CBytes as CB
import qualified ZooKeeper.Exception as ZK

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -213,8 +217,8 @@ trimStream ServerContext{..} streamName trimPoint = do
Log.info $ "trimStream failed because stream " <> Log.build streamName <> " is not found."
throwIO $ HE.StreamNotFound $ "stream " <> T.pack (show streamName) <> " is not found."
shards <- M.elems <$> S.listStreamPartitions scLDClient streamId
forM_ shards $ \shardId -> do
getTrimLSN scLDClient shardId trimPoint >>= S.trim scLDClient shardId
concurrentCap <- getNumCapabilities
void $ limitedMapConcuurently (min 8 concurrentCap) (\shardId -> getTrimLSN scLDClient shardId trimPoint >>= S.trim scLDClient shardId) shards
where
streamId = transToStreamName streamName

Expand Down Expand Up @@ -267,15 +271,18 @@ trimShards ServerContext{..} streamName recordIds = do

let streamId = transToStreamName streamName
shards <- M.elems <$> S.listStreamPartitions scLDClient streamId
res <- forM points $ \r@Rid{..} -> do
unless (rShardId `elem` shards) $
throwIO . HE.ShardNotExists $ "shard " <> show rShardId <> " doesn't belong to stream " <> show streamName
S.trim scLDClient rShardId (rBatchId - 1)
Log.info $ "trim to " <> Log.build (show $ rBatchId - 1)
<> " for shard " <> Log.build (show rShardId)
<> ", stream " <> Log.build streamName
return (rShardId, T.pack . show $ r)
concurrentCap <- getNumCapabilities
res <- limitedMapConcuurently (min 8 concurrentCap) (trim shards) points
return $ M.fromList res
where
trim shards r@Rid{..} = do
unless (rShardId `elem` shards) $
throwIO . HE.ShardNotExists $ "shard " <> show rShardId <> " doesn't belong to stream " <> show streamName
S.trim scLDClient rShardId (rBatchId - 1)
Log.info $ "trim to " <> Log.build (show $ rBatchId - 1)
<> " for shard " <> Log.build (show rShardId)
<> ", stream " <> Log.build streamName
return (rShardId, T.pack . show $ r)

getStreamInfo :: ServerContext -> S.StreamId -> IO API.Stream
getStreamInfo ServerContext{..} stream = do
Expand Down Expand Up @@ -479,3 +486,11 @@ getTrimLSN client shardId trimPoint = do
OffsetTimestamp API.TimestampOffset{..} -> do
let accuracy = if timestampOffsetStrictAccuracy then S.FindKeyStrict else S.FindKeyApproximate
S.findTime scLDClient logId timestampOffsetTimestampInMs accuracy

limitedMapConcuurently :: Int -> (a -> IO b) -> [a] -> IO [b]
limitedMapConcuurently maxConcurrency f inputs = do
sem <- newQSem maxConcurrency
mapConcurrently (limited sem . f) inputs
where
limited :: QSem -> IO c -> IO c
limited sem = bracket_ (waitQSem sem) (signalQSem sem)

0 comments on commit 088dfc3

Please sign in to comment.