From 088dfc36743713caa4ad5bf71f1103946be6a4fb Mon Sep 17 00:00:00 2001 From: YangKian <45479280+YangKian@users.noreply.github.com> Date: Tue, 22 Aug 2023 21:21:32 +0800 Subject: [PATCH] call trim concurrently in trimShards and trimStream (#1571) --- hstream/src/HStream/Server/Core/Stream.hs | 49 +++++++++++++++-------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/hstream/src/HStream/Server/Core/Stream.hs b/hstream/src/HStream/Server/Core/Stream.hs index 122e7b0e0..59dbd51d4 100644 --- a/hstream/src/HStream/Server/Core/Stream.hs +++ b/hstream/src/HStream/Server/Core/Stream.hs @@ -23,11 +23,17 @@ module HStream.Server.Core.Stream , trimShards ) where -import Control.Exception (catch, throwIO) -import Control.Monad (forM, forM_, unless, when) +import Control.Concurrent (getNumCapabilities) +import Control.Concurrent.Async (mapConcurrently) +import Control.Concurrent.QSem (QSem, newQSem, signalQSem, + waitQSem) +import Control.Exception (bracket_, catch, throwIO) +import Control.Monad (forM, forM_, unless, void, + when) import qualified Data.Attoparsec.Text as AP import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL +import Data.Either (partitionEithers) import Data.Functor ((<&>)) import qualified Data.List as L import qualified Data.Map.Strict as M @@ -38,11 +44,6 @@ import qualified Data.Vector as V import Data.Word (Word32, Word64) import GHC.Stack (HasCallStack) import Google.Protobuf.Timestamp (Timestamp) -import qualified Proto3.Suite as PT -import qualified Z.Data.CBytes as CB -import qualified ZooKeeper.Exception as ZK - -import Data.Either (partitionEithers) import HStream.Base.Time (getSystemNsTimestamp) import HStream.Common.Types import qualified HStream.Common.ZookeeperSlotAlloc as Slot @@ -59,6 +60,9 @@ import HStream.Server.Types (ServerContext (..), import qualified HStream.Stats as Stats import qualified HStream.Store as S import HStream.Utils +import qualified Proto3.Suite as PT +import qualified Z.Data.CBytes as CB +import qualified ZooKeeper.Exception as ZK ------------------------------------------------------------------------------- @@ -213,8 +217,8 @@ trimStream ServerContext{..} streamName trimPoint = do Log.info $ "trimStream failed because stream " <> Log.build streamName <> " is not found." throwIO $ HE.StreamNotFound $ "stream " <> T.pack (show streamName) <> " is not found." shards <- M.elems <$> S.listStreamPartitions scLDClient streamId - forM_ shards $ \shardId -> do - getTrimLSN scLDClient shardId trimPoint >>= S.trim scLDClient shardId + concurrentCap <- getNumCapabilities + void $ limitedMapConcuurently (min 8 concurrentCap) (\shardId -> getTrimLSN scLDClient shardId trimPoint >>= S.trim scLDClient shardId) shards where streamId = transToStreamName streamName @@ -267,15 +271,18 @@ trimShards ServerContext{..} streamName recordIds = do let streamId = transToStreamName streamName shards <- M.elems <$> S.listStreamPartitions scLDClient streamId - res <- forM points $ \r@Rid{..} -> do - unless (rShardId `elem` shards) $ - throwIO . HE.ShardNotExists $ "shard " <> show rShardId <> " doesn't belong to stream " <> show streamName - S.trim scLDClient rShardId (rBatchId - 1) - Log.info $ "trim to " <> Log.build (show $ rBatchId - 1) - <> " for shard " <> Log.build (show rShardId) - <> ", stream " <> Log.build streamName - return (rShardId, T.pack . show $ r) + concurrentCap <- getNumCapabilities + res <- limitedMapConcuurently (min 8 concurrentCap) (trim shards) points return $ M.fromList res + where + trim shards r@Rid{..} = do + unless (rShardId `elem` shards) $ + throwIO . HE.ShardNotExists $ "shard " <> show rShardId <> " doesn't belong to stream " <> show streamName + S.trim scLDClient rShardId (rBatchId - 1) + Log.info $ "trim to " <> Log.build (show $ rBatchId - 1) + <> " for shard " <> Log.build (show rShardId) + <> ", stream " <> Log.build streamName + return (rShardId, T.pack . show $ r) getStreamInfo :: ServerContext -> S.StreamId -> IO API.Stream getStreamInfo ServerContext{..} stream = do @@ -479,3 +486,11 @@ getTrimLSN client shardId trimPoint = do OffsetTimestamp API.TimestampOffset{..} -> do let accuracy = if timestampOffsetStrictAccuracy then S.FindKeyStrict else S.FindKeyApproximate S.findTime scLDClient logId timestampOffsetTimestampInMs accuracy + +limitedMapConcuurently :: Int -> (a -> IO b) -> [a] -> IO [b] +limitedMapConcuurently maxConcurrency f inputs = do + sem <- newQSem maxConcurrency + mapConcurrently (limited sem . f) inputs + where + limited :: QSem -> IO c -> IO c + limited sem = bracket_ (waitQSem sem) (signalQSem sem)