From ab545f31c0b633ddabbbf4c5eaaca90d4dccef31 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Wed, 19 Jul 2023 16:19:40 +0800 Subject: [PATCH] Server: add a stream-v2 experimental feature (#1518) * Support basic writing & reading --- common/base/HStream/Base/Time.hs | 21 ++- common/hstream/HStream/Utils/RPC.hs | 19 ++- common/hstream/HStream/Utils/Time.hs | 27 +--- hstream-sql/hstream-sql.cabal | 5 +- hstream/app/server.hs | 42 +++-- hstream/hstream.cabal | 2 + hstream/src/HStream/Server/Config.hs | 36 ++++- hstream/src/HStream/Server/Core/Stream.hs | 149 +++++++++++++---- .../src/HStream/Server/Core/Subscription.hs | 5 +- hstream/src/HStream/Server/Experimental.hs | 5 + .../HStream/Server/Experimental/StreamV2.hs | 153 ++++++++++++++++++ hstream/src/HStream/Server/Handler/Stream.hs | 54 +++++-- hstream/src/HStream/Server/Shard.hs | 81 +++++----- hstream/test/HStream/ConfigSpec.hs | 5 + 14 files changed, 481 insertions(+), 123 deletions(-) create mode 100644 hstream/src/HStream/Server/Experimental.hs create mode 100644 hstream/src/HStream/Server/Experimental/StreamV2.hs diff --git a/common/base/HStream/Base/Time.hs b/common/base/HStream/Base/Time.hs index 3f5749589..01fdc5702 100644 --- a/common/base/HStream/Base/Time.hs +++ b/common/base/HStream/Base/Time.hs @@ -1,3 +1,6 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE NumericUnderscores #-} + module HStream.Base.Time ( UT.UnixTime (..) , UT.Format @@ -13,12 +16,15 @@ module HStream.Base.Time , formatSystemTimeGMT , parseSystemTime , parseSystemTimeGMT - , CTime (CTime) + , getSystemMsTimestamp + , getSystemNsTimestamp -- Re-export + , CTime (CTime) , module Data.Time.Clock.System ) where import Data.ByteString (ByteString) +import Data.Int import Data.Time.Clock.System import qualified Data.UnixTime as UT import Foreign.C.Types (CTime (CTime)) @@ -91,6 +97,19 @@ parseSystemTimeGMT fmt str = unixtime2sys $ UT.parseUnixTimeGMT fmt str ------------------------------------------------------------------------------- +getSystemMsTimestamp :: IO Int64 +getSystemMsTimestamp = do + MkSystemTime sec nano <- getSystemTime + let !ts = floor @Double $ (fromIntegral sec * 1e3) + (fromIntegral nano / 1e6) + return ts + +getSystemNsTimestamp :: IO Int64 +getSystemNsTimestamp = do + MkSystemTime sec nano <- getSystemTime + return $ sec * 1_000_000_000 + (fromIntegral nano) + +------------------------------------------------------------------------------- + systime2unix :: SystemTime -> UT.UnixTime systime2unix (MkSystemTime sec _nano) = UT.UnixTime (CTime sec) 0 {-# INLINABLE systime2unix #-} diff --git a/common/hstream/HStream/Utils/RPC.hs b/common/hstream/HStream/Utils/RPC.hs index 3c05745f9..9b5e94827 100644 --- a/common/hstream/HStream/Utils/RPC.hs +++ b/common/hstream/HStream/Utils/RPC.hs @@ -1,10 +1,11 @@ {-# OPTIONS_GHC -Wno-orphans #-} -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE DeriveAnyClass #-} -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE PatternSynonyms #-} module HStream.Utils.RPC ( HStreamClientApi @@ -25,6 +26,7 @@ module HStream.Utils.RPC , getServerResp , getServerRespPure , getProtoTimestamp + , nsTimestampToProto , msTimestampToProto , timestampToMsTimestamp , isSuccessful @@ -171,6 +173,11 @@ getProtoTimestamp = do MkSystemTime sec nano <- getSystemTime return $ Timestamp sec (fromIntegral nano) +nsTimestampToProto :: Int64 -> Timestamp +nsTimestampToProto ns = + let (sec, nano) = ns `divMod` 1_000_000_000 + in Timestamp sec (fromIntegral nano) + msTimestampToProto :: Int64 -> Timestamp msTimestampToProto millis = let (sec, remain) = millis `divMod` 1000 diff --git a/common/hstream/HStream/Utils/Time.hs b/common/hstream/HStream/Utils/Time.hs index d40e99962..76f389f80 100644 --- a/common/hstream/HStream/Utils/Time.hs +++ b/common/hstream/HStream/Utils/Time.hs @@ -1,5 +1,3 @@ -{-# LANGUAGE BangPatterns #-} - module HStream.Utils.Time ( Interval (..) , parserInterval @@ -12,20 +10,17 @@ module HStream.Utils.Time -- * Re-export , getPOSIXTime - , getCurrentMsTimestamp ) where -import Control.Applicative ((<|>)) -import Data.Attoparsec.Text (Parser, choice, endOfInput, parseOnly, - rational, string) -import Data.Int (Int64) -import qualified Data.Text as T -import Data.Time.Clock (NominalDiffTime) -import Data.Time.Clock.POSIX (getPOSIXTime) +import Control.Applicative ((<|>)) +import Data.Attoparsec.Text (Parser, choice, endOfInput, parseOnly, + rational, string) +import Data.Int (Int64) +import qualified Data.Text as T +import Data.Time.Clock (NominalDiffTime) +import Data.Time.Clock.POSIX (getPOSIXTime) -import Data.Time.Clock.System (SystemTime (MkSystemTime), - getSystemTime) -import HStream.Base (rmTrailingZeros) +import HStream.Base (rmTrailingZeros) data Interval = Milliseconds Double @@ -80,9 +75,3 @@ msecSince start = floor . (* 1e3) <$> diffTimeSince start secSince :: NominalDiffTime -> IO Int64 secSince start = floor <$> diffTimeSince start {-# INLINE secSince #-} - -getCurrentMsTimestamp :: IO Int64 -getCurrentMsTimestamp = do - MkSystemTime sec nano <- getSystemTime - let !ts = floor @Double $ (fromIntegral sec * 1e3) + (fromIntegral nano / 1e6) - return ts diff --git a/hstream-sql/hstream-sql.cabal b/hstream-sql/hstream-sql.cabal index 770abdb46..32591df77 100644 --- a/hstream-sql/hstream-sql.cabal +++ b/hstream-sql/hstream-sql.cabal @@ -37,9 +37,8 @@ flag hstream_use_v2_engine common shared-properties ghc-options: - -Wall -Wextra -Wcompat - -Widentities -Wincomplete-record-updates -Wincomplete-uni-patterns - -Wpartial-fields -Wredundant-constraints + -Wall -Wextra -Wcompat -Widentities -Wincomplete-record-updates + -Wincomplete-uni-patterns -Wpartial-fields -Wredundant-constraints if flag(releasebuild) ghc-options: diff --git a/hstream/app/server.hs b/hstream/app/server.hs index 5ae56f05f..7e41c6426 100644 --- a/hstream/app/server.hs +++ b/hstream/app/server.hs @@ -57,6 +57,7 @@ import HStream.MetaStore.Types as M (MetaHandle (..), RHandle (..)) import HStream.Server.Config (AdvertisedListeners, CliOptions (..), + ExperimentalFeature (..), ListenersSecurityProtocolMap, MetaStoreAddr (..), SecurityProtocolMap, @@ -64,8 +65,9 @@ import HStream.Server.Config (AdvertisedListeners, advertisedListenersToPB, cliOptionsParser, getConfig) import qualified HStream.Server.Core.Cluster as Cluster +import qualified HStream.Server.Experimental as Exp import HStream.Server.Handler (handlers) -import qualified HStream.Server.HsGrpcHandler as HsGrpc +import qualified HStream.Server.HsGrpcHandler as HsGrpcHandler import HStream.Server.HStreamApi (NodeState (..), ServerNode (ServerNode), hstreamApiServer) @@ -182,21 +184,31 @@ app config@ServerOpts{..} = do void . forkIO $ updateHashRing gossipContext (loadBalanceHashRing serverContext) Async.withAsync - (serve _serverHost _serverPort _securityProtocolMap serverContext - _serverAdvertisedListeners _listenersSecurityProtocolMap) $ \a -> do - a1 <- startGossip _serverHost gossipContext - Async.link2Only (const True) a a1 - waitGossipBoot gossipContext - Async.wait a + (serve _serverHost _serverPort + serverContext + _securityProtocolMap + _serverAdvertisedListeners + _listenersSecurityProtocolMap + (ExperimentalStreamV2 `elem` experimentalFeatures) + ) $ \a -> do + a1 <- startGossip _serverHost gossipContext + Async.link2Only (const True) a a1 + waitGossipBoot gossipContext + Async.wait a serve :: ByteString -> Word16 - -> SecurityProtocolMap -> ServerContext + -> SecurityProtocolMap -> AdvertisedListeners -> ListenersSecurityProtocolMap + -> Bool + -- ^ Experimental features -> IO () -serve host port securityMap sc@ServerContext{..} listeners listenerSecurityMap = do +serve host port + sc@ServerContext{..} + securityMap listeners listenerSecurityMap + enableExpStreamV2 = do Log.i "************************" hPutStrLn stderr $ [r| _ _ __ _____ ___ ___ __ __ __ @@ -270,7 +282,11 @@ serve host port securityMap sc@ServerContext{..} listeners listenerSecurityMap = , HsGrpc.serverOnStarted = Just listenerOnStarted , HsGrpc.serverSslOptions = newSslOpts } - HsGrpc.runServer grpcOpts' (HsGrpc.handlers sc') + if enableExpStreamV2 + then do Log.info "Enable experimental feature: stream-v2" + slotConfig <- Exp.doStreamV2Init sc' + HsGrpc.runServer grpcOpts' (Exp.streamV2Handlers sc' slotConfig) + else HsGrpc.runServer grpcOpts' (HsGrpcHandler.handlers sc') #endif #ifdef HStreamUseGrpcHaskell @@ -279,7 +295,11 @@ serve host port securityMap sc@ServerContext{..} listeners listenerSecurityMap = hstreamApiServer api grpcOpts #else Log.info "Starting server with hs-grpc-server..." - HsGrpc.runServer grpcOpts (HsGrpc.handlers sc) + if enableExpStreamV2 + then do Log.info "Enable experimental feature: stream-v2" + slotConfig <- Exp.doStreamV2Init sc + HsGrpc.runServer grpcOpts (Exp.streamV2Handlers sc slotConfig) + else HsGrpc.runServer grpcOpts (HsGrpcHandler.handlers sc) #endif -------------------------------------------------------------------------------- diff --git a/hstream/hstream.cabal b/hstream/hstream.cabal index 8c9515468..74a99a628 100644 --- a/hstream/hstream.cabal +++ b/hstream/hstream.cabal @@ -64,6 +64,7 @@ library HStream.Server.Core.Cluster HStream.Server.Core.Common HStream.Server.Exception + HStream.Server.Experimental HStream.Server.Handler HStream.Server.Handler.Common HStream.Server.HsGrpcHandler @@ -81,6 +82,7 @@ library HStream.Server.Core.Stream HStream.Server.Core.Subscription HStream.Server.Core.View + HStream.Server.Experimental.StreamV2 HStream.Server.Handler.Admin HStream.Server.Handler.Cluster HStream.Server.Handler.Connector diff --git a/hstream/src/HStream/Server/Config.hs b/hstream/src/HStream/Server/Config.hs index f652d60b5..f43ad0be8 100644 --- a/hstream/src/HStream/Server/Config.hs +++ b/hstream/src/HStream/Server/Config.hs @@ -6,6 +6,7 @@ module HStream.Server.Config ( ServerOpts (..) + , ExperimentalFeature (..) , CliOptions (..) , cliOptionsParser , TlsConfig (..) @@ -50,6 +51,7 @@ import Options.Applicative as O (Alternative (many, (<|>)), long, maybeReader, metavar, option, optional, short, strOption, value) +import qualified Options.Applicative as O import System.Directory (makeAbsolute) import Text.Read (readEither) import qualified Z.Data.CBytes as CB @@ -92,6 +94,11 @@ data MetaStoreAddr | FileAddr FilePath deriving (Eq) +instance Show MetaStoreAddr where + show (ZkAddr addr) = "zk://" <> CB.unpack addr + show (RqAddr addr) = "rq://" <> T.unpack addr + show (FileAddr addr) = "file://" <> addr + data ServerOpts = ServerOpts { _serverHost :: !ByteString , _serverPort :: !Word16 @@ -126,6 +133,7 @@ data ServerOpts = ServerOpts , _ioOptions :: !IO.IOOptions , _querySnapshotPath :: !FilePath + , experimentalFeatures :: ![ExperimentalFeature] } deriving (Show, Eq) getConfig :: CliOptions -> IO ServerOpts @@ -170,6 +178,8 @@ data CliOptions = CliOptions , _ioConnectorImages_ :: ![Text] , _querySnapshotPath_ :: !(Maybe FilePath) + + , cliExperimentalFeatures :: ![ExperimentalFeature] } deriving Show cliOptionsParser :: O.Parser CliOptions @@ -201,10 +211,11 @@ cliOptionsParser = do _ioTasksNetwork_ <- optional ioTasksNetwork _ioConnectorImages_ <- ioConnectorImage _querySnapshotPath_ <- optional querySnapshotPath - return CliOptions {..} + cliExperimentalFeatures <- many experimentalFeatureParser + return CliOptions{..} parseJSONToOptions :: CliOptions -> Y.Object -> Y.Parser ServerOpts -parseJSONToOptions CliOptions {..} obj = do +parseJSONToOptions CliOptions{..} obj = do nodeCfgObj <- obj .: "hserver" nodeId <- nodeCfgObj .: "id" nodeHost <- fromString <$> nodeCfgObj .:? "bind-address" .!= "0.0.0.0" @@ -318,10 +329,27 @@ parseJSONToOptions CliOptions {..} obj = do processingCfg <- nodeCfgObj .:? "hstream-processing" .!= mempty snapshotPath <- processingCfg .:? "query-snapshot-path" .!= "/data/query_snapshots" let !_querySnapshotPath = fromMaybe snapshotPath _querySnapshotPath_ + + let experimentalFeatures = cliExperimentalFeatures + return ServerOpts {..} ------------------------------------------------------------------------------- +data ExperimentalFeature = ExperimentalStreamV2 + deriving (Show, Eq) + +parseExperimentalFeature :: O.ReadM ExperimentalFeature +parseExperimentalFeature = O.eitherReader $ \case + "stream-v2" -> Right ExperimentalStreamV2 + x -> Left $ "cannot parse experimental feature: " <> x + +experimentalFeatureParser :: O.Parser ExperimentalFeature +experimentalFeatureParser = option parseExperimentalFeature $ + long "experimental" <> metavar "ExperimentalFeature" + +------------------------------------------------------------------------------- + configPath :: O.Parser String configPath = strOption $ long "config-path" @@ -551,10 +579,6 @@ listenerP = do -- Nothing -> errorWithoutStackTrace $ "Invalid meta store address, no Auth: " <> str -- Nothing -> errorWithoutStackTrace $ "Invalid meta store address, no parse: " <> str -instance Show MetaStoreAddr where - show (ZkAddr addr) = "zk://" <> CB.unpack addr - show (RqAddr addr) = "rq://" <> T.unpack addr - readWithErrLog :: Read a => String -> String -> a readWithErrLog opt v = case readEither v of Right x -> x diff --git a/hstream/src/HStream/Server/Core/Stream.hs b/hstream/src/HStream/Server/Core/Stream.hs index 69ad3eda0..c05074d5f 100644 --- a/hstream/src/HStream/Server/Core/Stream.hs +++ b/hstream/src/HStream/Server/Core/Stream.hs @@ -15,33 +15,47 @@ module HStream.Server.Core.Stream , getTailRecordId , trimShard , trimStream + , createStreamV2 + , deleteStreamV2 + , listShardsV2 ) where -import Control.Exception (catch, throwIO) -import Control.Monad (forM, forM_, unless, when) -import qualified Data.ByteString as BS -import qualified Data.ByteString.Lazy as BSL -import qualified Data.Map.Strict as M -import Data.Maybe (fromMaybe) -import qualified Data.Text as T -import qualified Data.Vector as V -import GHC.Stack (HasCallStack) -import Google.Protobuf.Timestamp (Timestamp) -import qualified Proto3.Suite as PT -import qualified Z.Data.CBytes as CB +import Control.Concurrent (modifyMVar_) +import Control.Exception (catch, throwIO) +import Control.Monad (forM, forM_, unless, when) +import qualified Data.ByteString as BS +import qualified Data.ByteString.Lazy as BSL +import Data.Foldable (foldl') +import Data.Functor ((<&>)) +import qualified Data.HashMap.Strict as HM +import qualified Data.Map.Strict as M +import Data.Maybe (fromMaybe) +import qualified Data.Text as T +import qualified Data.Vector as V +import Data.Word (Word64) +import GHC.Stack (HasCallStack) +import Google.Protobuf.Timestamp (Timestamp) +import qualified Proto3.Suite as PT +import qualified Z.Data.CBytes as CB +import qualified ZooKeeper.Exception as ZK -import Data.Word (Word64) -import qualified HStream.Exception as HE -import qualified HStream.Logger as Log -import qualified HStream.Server.HStreamApi as API -import qualified HStream.Server.MetaData as P -import HStream.Server.Shard (createShard, devideKeySpace, - mkShardWithDefaultId) -import HStream.Server.Types (ServerContext (..), - ServerInternalOffset (..), - ToOffset (..), transToStreamName) -import qualified HStream.Stats as Stats -import qualified HStream.Store as S +import HStream.Base.Time (getSystemNsTimestamp) +import qualified HStream.Common.ZookeeperSlotAlloc as Slot +import qualified HStream.Exception as HE +import qualified HStream.Logger as Log +import qualified HStream.Server.HStreamApi as API +import qualified HStream.Server.MetaData as P +import HStream.Server.Shard (Shard (..), createShard, + devideKeySpace, + mkShardAttrs, + mkShardWithDefaultId, + mkSharedShardMapWithShards) +import HStream.Server.Types (ServerContext (..), + ServerInternalOffset (..), + ToOffset (..), + transToStreamName) +import qualified HStream.Stats as Stats +import qualified HStream.Store as S import HStream.Utils ------------------------------------------------------------------------------- @@ -67,6 +81,26 @@ createStream ServerContext{..} stream@API.Stream{ Log.debug $ "create shards for stream " <> Log.build streamStreamName <> ": " <> Log.buildString' (show shards) return stream{API.streamCreationTime = Just timeStamp} +-- NOTE: +-- 1. We will ignore streamReplicationFactor,streamBacklogDuration setting in the request +createStreamV2 + :: HasCallStack + => ServerContext -> Slot.SlotConfig + -> API.Stream -> IO API.Stream +createStreamV2 ServerContext{..} slotConfig stream@API.Stream{..} = do + -- NOTE: the bytestring get from getProtoTimestamp is not a valid utf8 + timeStamp <- getSystemNsTimestamp + let !extraAttr = M.fromList [("createTime", T.pack $ show timeStamp)] + partitions = devideKeySpace (fromIntegral streamShardCount) + shardAttrs = partitions <&> (\(startKey, endKey) -> Slot.SlotValueAttrs $ + mkShardAttrs startKey endKey (fromIntegral streamShardCount)) + + shards <- catch (Slot.allocateSlot slotConfig (textToCBytes streamStreamName) extraAttr shardAttrs) $ + \(_ :: ZK.ZNODEEXISTS) -> throwIO $ HE.StreamExists streamStreamName + + Log.debug $ "create shards for stream " <> Log.build streamStreamName <> ": " <> Log.buildString' (show shards) + return stream{API.streamCreationTime = Just $ nsTimestampToProto timeStamp} + deleteStream :: ServerContext -> API.DeleteStreamRequest -> IO () @@ -93,6 +127,34 @@ deleteStream ServerContext{..} API.DeleteStreamRequest{deleteStreamRequestForce else throwIO HE.FoundSubscription +-- NOTE: +-- 1. do not support archive stream +deleteStreamV2 + :: ServerContext -> Slot.SlotConfig + -> API.DeleteStreamRequest -> IO () +deleteStreamV2 ServerContext{..} slotConfig + API.DeleteStreamRequest{ deleteStreamRequestForce = force + , deleteStreamRequestStreamName = sName + , .. + } = do + storeExists <- Slot.doesSlotExist slotConfig streamName + if storeExists + then doDelete + else unless deleteStreamRequestIgnoreNonExist $ throwIO $ HE.StreamNotFound sName + where + streamName = textToCBytes sName + -- TODO: archive stream + doDelete = do + subs <- P.getSubscriptionWithStream metaHandle sName + if null subs + then deallocate + else if force then deallocate else throwIO HE.FoundSubscription + deallocate = do + logids <- Slot.deallocateSlot slotConfig streamName + -- delete all data in the logid + forM_ logids $ S.trimLast scLDClient + Stats.stream_stat_erase scStatsHolder (textToCBytes sName) + getStream :: ServerContext -> API.GetStreamRequest -> IO API.GetStreamResponse getStream ServerContext{..} API.GetStreamRequest{ getStreamRequestName = sName} = do let streamId = transToStreamName sName @@ -225,8 +287,6 @@ appendStream ServerContext{..} streamName shardId record = do where cStreamName = textToCBytes streamName --------------------------------------------------------------------------------- - listShards :: HasCallStack => ServerContext @@ -262,6 +322,42 @@ listShards ServerContext{..} API.ListShardsRequest{..} = do shardEpoch <- read . CB.unpack <$> M.lookup epoch mp return (startHashRangeKey, endHashRangeKey, shardEpoch) +listShardsV2 + :: HasCallStack + => ServerContext + -> Slot.SlotConfig + -> API.ListShardsRequest + -> IO (V.Vector API.Shard) +listShardsV2 ServerContext{..} slotConfig API.ListShardsRequest{..} = do + let streamName = textToCBytes listShardsRequestStreamName + Slot.Slot{..} <- Slot.getSlotByName slotConfig streamName + V.foldM' getShardInfo V.empty (V.fromList $ M.toList slotVals) + where + startKey = "startKey" + endKey = "endKey" + epoch = "epoch" + + getShardInfo shards (logId, m_attr) = do + case getInfo m_attr of + -- FIXME: should raise an exception when get Nothing + Nothing -> return shards + Just (sKey, eKey, ep) -> return . V.snoc shards $ + API.Shard{ API.shardStreamName = listShardsRequestStreamName + , API.shardShardId = logId + , API.shardStartHashRangeKey = sKey + , API.shardEndHashRangeKey = eKey + , API.shardEpoch = ep + -- FIXME: neet a way to find if this shard is active + , API.shardIsActive = True + } + + getInfo m_mp = do + Slot.SlotValueAttrs mp <- m_mp + startHashRangeKey <- M.lookup startKey mp + endHashRangeKey <- M.lookup endKey mp + shardEpoch <- read . T.unpack <$> M.lookup epoch mp + return (startHashRangeKey, endHashRangeKey, shardEpoch) + trimShard :: HasCallStack => ServerContext @@ -295,4 +391,3 @@ getTrimLSN client shardId trimPoint = do OffsetTimestamp API.TimestampOffset{..} -> do let accuracy = if timestampOffsetStrictAccuracy then S.FindKeyStrict else S.FindKeyApproximate S.findTime scLDClient logId timestampOffsetTimestampInMs accuracy - diff --git a/hstream/src/HStream/Server/Core/Subscription.hs b/hstream/src/HStream/Server/Core/Subscription.hs index 1811454d6..f42c10382 100644 --- a/hstream/src/HStream/Server/Core/Subscription.hs +++ b/hstream/src/HStream/Server/Core/Subscription.hs @@ -32,6 +32,7 @@ import qualified Data.Text as T import qualified Data.Vector as V import Data.Word (Word32, Word64) import GHC.Stack (HasCallStack) +import HStream.Base.Time (getSystemMsTimestamp) import Network.GRPC.HighLevel (StreamRecv, StreamSend) import Proto3.Suite (Enumerated (Enumerated), def) @@ -54,7 +55,6 @@ import qualified HStream.Stats as Stats import qualified HStream.Store as S import HStream.Utils (ResourceType (..), decompressBatchedRecord, - getCurrentMsTimestamp, getProtoTimestamp, mkBatchedRecord, textToCBytes) @@ -606,7 +606,7 @@ sendRecords ServerContext{..} subState subCtx@SubscribeContext {..} = do updateClockAndDoResend = do -- Note: non-strict behaviour in STM! -- Please refer to https://github.com/haskell/stm/issues/30 - newTime <- getCurrentMsTimestamp <&> fromIntegral + newTime <- getSystemMsTimestamp <&> fromIntegral (timeoutList, leftList) <- atomically $ do checkList <- readTVar subWaitingCheckedRecordIds let (!timeoutList, !leftList) = Heap.span (\CheckedRecordIds {..} -> crDeadline <= newTime) checkList @@ -1252,4 +1252,3 @@ throwToAllConsumers SubscribeContext{..} e = do return $ HM.elems consumerCtxs forM_ consumerCtxs (\ConsumerContext{..} -> when (ccThreadId /= subMasterTid) $ throwTo ccThreadId e ) throwIO e - diff --git a/hstream/src/HStream/Server/Experimental.hs b/hstream/src/HStream/Server/Experimental.hs new file mode 100644 index 000000000..8c7ff78fc --- /dev/null +++ b/hstream/src/HStream/Server/Experimental.hs @@ -0,0 +1,5 @@ +module HStream.Server.Experimental + ( module HStream.Server.Experimental.StreamV2 + ) where + +import HStream.Server.Experimental.StreamV2 diff --git a/hstream/src/HStream/Server/Experimental/StreamV2.hs b/hstream/src/HStream/Server/Experimental/StreamV2.hs new file mode 100644 index 000000000..31c758fb2 --- /dev/null +++ b/hstream/src/HStream/Server/Experimental/StreamV2.hs @@ -0,0 +1,153 @@ +{-# LANGUAGE DataKinds #-} + +module HStream.Server.Experimental.StreamV2 + ( doStreamV2Init + , streamV2Handlers + ) where + +import Control.Exception (catch) +import Control.Monad +import HsGrpc.Server +import Z.Data.CBytes (CBytes) +import qualified ZooKeeper as ZK +import qualified ZooKeeper.Exception as ZK +import qualified ZooKeeper.Types as ZK + +import qualified HStream.Common.ZookeeperSlotAlloc as Slot +import qualified HStream.MetaStore.Types as Meta +import qualified HStream.Server.Handler.Admin as H +import qualified HStream.Server.Handler.Cluster as H +import qualified HStream.Server.Handler.Connector as H +import qualified HStream.Server.Handler.Extra as H +import qualified HStream.Server.Handler.Query as H +import qualified HStream.Server.Handler.ShardReader as H +import qualified HStream.Server.Handler.Stats as H +import qualified HStream.Server.Handler.Stream as H +import qualified HStream.Server.Handler.Subscription as H +import qualified HStream.Server.Handler.View as H +import qualified HStream.Server.HStreamApi as A +import HStream.Server.Types (ServerContext (..)) +import qualified HStream.Store as S +import qualified HStream.Store.Internal.LogDevice as LD +import qualified Proto.HStream.Server.HStreamApi as P + +------------------------------------------------------------------------------- + +defLogGroupStart :: S.C_LogID +defLogGroupStart = 1000 + +defLogGroupEnd :: S.C_LogID +defLogGroupEnd = 200000 + +defLogGroupPath :: CBytes +defLogGroupPath = "/hstream/streamgroup" + +defZkRoot :: CBytes +defZkRoot = "/hstream/streamgroup" + +doStreamV2Init :: ServerContext -> IO Slot.SlotConfig +doStreamV2Init sc = do + initDefLogGoup sc + initZkSlot sc + +streamV2Handlers :: ServerContext -> Slot.SlotConfig -> [ServiceHandler] +streamV2Handlers sc slotConfig = + [ unary (GRPC :: GRPC P.HStreamApi "echo") handleEcho + -- Cluster + , unary (GRPC :: GRPC P.HStreamApi "describeCluster") (H.handleDescribeCluster sc) + , unary (GRPC :: GRPC P.HStreamApi "lookupResource") (H.handleLookupResource sc) + , unary (GRPC :: GRPC P.HStreamApi "lookupShard") (H.handleLookupShard sc) + , unary (GRPC :: GRPC P.HStreamApi "lookupSubscription") (H.handleLookupSubscription sc) + , unary (GRPC :: GRPC P.HStreamApi "lookupShardReader") (H.handleLookupShardReader sc) + -- Stream + , unary (GRPC :: GRPC P.HStreamApi "createStream") (H.handleCreateStreamV2 sc slotConfig) + , unary (GRPC :: GRPC P.HStreamApi "deleteStream") (H.handleDeleteStreamV2 sc slotConfig) + --, unary (GRPC :: GRPC P.HStreamApi "getStream") (H.handleGetStream sc) + --, unary (GRPC :: GRPC P.HStreamApi "listStreams") (H.handleListStreams sc) + --, unary (GRPC :: GRPC P.HStreamApi "listStreamsWithPrefix") (H.handleListStreamsWithPrefix sc) + , unary (GRPC :: GRPC P.HStreamApi "listShards") (H.handleListShardV2 sc slotConfig) + --, unary (GRPC :: GRPC P.HStreamApi "trimStream") (H.handleTrimStream sc) + --, unary (GRPC :: GRPC P.HStreamApi "trimShard") (H.handleTrimShard sc) + --, unary (GRPC :: GRPC P.HStreamApi "getTailRecordId") (H.handleGetTailRecordId sc) + -- Reader + --, unary (GRPC :: GRPC P.HStreamApi "listShardReaders") (H.handleListShardReaders sc) + --, unary (GRPC :: GRPC P.HStreamApi "createShardReader") (H.handleCreateShardReader sc) + --, unary (GRPC :: GRPC P.HStreamApi "deleteShardReader") (H.handleDeleteShardReader sc) + -- Subscription + --, unary (GRPC :: GRPC P.HStreamApi "createSubscription") (H.handleCreateSubscription sc) + --, unary (GRPC :: GRPC P.HStreamApi "getSubscription") (H.handleGetSubscription sc) + --, handlerUseThreadPool $ unary (GRPC :: GRPC P.HStreamApi "deleteSubscription") (H.handleDeleteSubscription sc) + --, unary (GRPC :: GRPC P.HStreamApi "listSubscriptions") (H.handleListSubscriptions sc) + --, unary (GRPC :: GRPC P.HStreamApi "listSubscriptionsWithPrefix") (H.handleListSubscriptionsWithPrefix sc) + --, unary (GRPC :: GRPC P.HStreamApi "listConsumers") (H.handleListConsumers sc) + --, unary (GRPC :: GRPC P.HStreamApi "checkSubscriptionExist") (H.handleCheckSubscriptionExist sc) + -- Append + , unary (GRPC :: GRPC P.HStreamApi "append") (H.handleAppend sc) + -- Read + , unary (GRPC :: GRPC P.HStreamApi "readShard") (H.handleReadShard sc) + , serverStream (GRPC :: GRPC P.HStreamApi "readShardStream") (H.handleReadShardStream sc) + , serverStream (GRPC :: GRPC P.HStreamApi "readSingleShardStream") (H.handleReadSingleShardStream sc) + --, serverStream (GRPC :: GRPC P.HStreamApi "readStream") (H.handleReadStream sc) + -- Subscribe + --, bidiStream (GRPC :: GRPC P.HStreamApi "streamingFetch") (H.handleStreamingFetch sc) + -- Stats + , unary (GRPC :: GRPC P.HStreamApi "perStreamTimeSeriesStats") (H.handlePerStreamTimeSeriesStats $ scStatsHolder sc) + , unary (GRPC :: GRPC P.HStreamApi "perStreamTimeSeriesStatsAll") (H.handlePerStreamTimeSeriesStatsAll $ scStatsHolder sc) + , unary (GRPC :: GRPC P.HStreamApi "getStats") (H.handleGetStats $ scStatsHolder sc) + -- Admin + , unary (GRPC :: GRPC P.HStreamApi "sendAdminCommand") (H.handleAdminCommand sc) + -- Connector + --, unary (GRPC :: GRPC P.HStreamApi "createConnector") (H.handleCreateConnector sc) + --, unary (GRPC :: GRPC P.HStreamApi "listConnectors") (H.handleListConnectors sc) + --, unary (GRPC :: GRPC P.HStreamApi "getConnector") (H.handleGetConnector sc) + --, unary (GRPC :: GRPC P.HStreamApi "getConnectorSpec") (H.handleGetConnectorSpec sc) + --, unary (GRPC :: GRPC P.HStreamApi "getConnectorLogs") (H.handleGetConnectorLogs sc) + --, unary (GRPC :: GRPC P.HStreamApi "deleteConnector") (H.handleDeleteConnector sc) + --, unary (GRPC :: GRPC P.HStreamApi "resumeConnector") (H.handleResumeConnector sc) + --, unary (GRPC :: GRPC P.HStreamApi "pauseConnector") (H.handlePauseConnector sc) + -- View + --, unary (GRPC :: GRPC P.HStreamApi "getView") (H.handleGetView sc) + --, unary (GRPC :: GRPC P.HStreamApi "listViews") (H.handleListView sc) + --, unary (GRPC :: GRPC P.HStreamApi "deleteView") (H.handleDeleteView sc) + --, unary (GRPC :: GRPC P.HStreamApi "executeViewQuery") (H.handleExecuteViewQuery sc) + --, unary (GRPC :: GRPC P.HStreamApi "executeViewQueryWithNamespace") (H.handleExecuteViewQueryWithNamespace sc) + -- Query + --, unary (GRPC :: GRPC P.HStreamApi "terminateQuery") (H.handleTerminateQuery sc) + --, unary (GRPC :: GRPC P.HStreamApi "executeQuery") (H.handleExecuteQuery sc) + --, unary (GRPC :: GRPC P.HStreamApi "getQuery") (H.handleGetQuery sc) + --, unary (GRPC :: GRPC P.HStreamApi "createQuery") (H.handleCreateQuery sc) + --, unary (GRPC :: GRPC P.HStreamApi "createQueryWithNamespace") (H.handleCreateQueryWithNamespace sc) + --, unary (GRPC :: GRPC P.HStreamApi "listQueries") (H.handleListQueries sc) + --, unary (GRPC :: GRPC P.HStreamApi "deleteQuery") (H.handleDeleteQuery sc) + --, unary (GRPC :: GRPC P.HStreamApi "resumeQuery") (H.handleResumeQuery sc) + --, unary (GRPC :: GRPC P.HStreamApi "parseSql") (H.handleParseSql sc) + ] + +------------------------------------------------------------------------------- + +initDefLogGoup :: ServerContext -> IO () +initDefLogGoup ServerContext{..} = do + let attrs = S.def{ S.logReplicationFactor = S.defAttr1 1 + , S.logBacklogDuration = S.defAttr1 (Just 600) + } + catch (do group <- LD.makeLogGroup scLDClient defLogGroupPath defLogGroupStart defLogGroupEnd attrs True + LD.syncLogsConfigVersion scLDClient =<< LD.logGroupGetVersion group + ) $ \(_ :: S.EXISTS) -> pure () + +initZkSlot :: ServerContext -> IO Slot.SlotConfig +initZkSlot sc@ServerContext{..} = do + case metaHandle of + Meta.ZkHandle zh -> do + catch (void $ ZK.zooCreate zh "/hstream" Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent) $ + \(_ :: ZK.ZNODEEXISTS) -> pure () + let slotConfig = Slot.SlotConfig{ slotRoot = defZkRoot + , slotZkHandler = zh + , slotOffset = defLogGroupStart + , slotMaxCapbility = defLogGroupEnd - defLogGroupStart + 1 + } + Slot.initSlot slotConfig + pure slotConfig + _ -> errorWithoutStackTrace "This stream-v2 feature should be used with zookeeper meta store!" + +handleEcho :: UnaryHandler A.EchoRequest A.EchoResponse +handleEcho _grpcCtx A.EchoRequest{..} = return $ A.EchoResponse echoRequestMsg diff --git a/hstream/src/HStream/Server/Handler/Stream.hs b/hstream/src/HStream/Server/Handler/Stream.hs index 351a4ab0b..f6a89b076 100644 --- a/hstream/src/HStream/Server/Handler/Stream.hs +++ b/hstream/src/HStream/Server/Handler/Stream.hs @@ -29,25 +29,31 @@ module HStream.Server.Handler.Stream , handleListShard , handleTrimShard , handleGetTailRecordId + -- ** Experimental feature + , handleCreateStreamV2 + , handleDeleteStreamV2 + , handleListShardV2 ) where import Control.Exception -import Data.Maybe (fromJust, isNothing) -import qualified HsGrpc.Server as G -import qualified HsGrpc.Server.Types as G +import Data.Maybe (fromJust, isNothing) +import qualified HsGrpc.Server as G +import qualified HsGrpc.Server.Types as G import Network.GRPC.HighLevel.Generated +import qualified ZooKeeper.Exception as ZK -import Control.Monad (when) -import qualified HStream.Exception as HE -import qualified HStream.Logger as Log -import qualified HStream.Server.Core.Stream as C +import Control.Monad (when) +import qualified HStream.Common.ZookeeperSlotAlloc as Slot +import qualified HStream.Exception as HE +import qualified HStream.Logger as Log +import qualified HStream.Server.Core.Stream as C import HStream.Server.Exception import HStream.Server.HStreamApi -import HStream.Server.Types (ServerContext (..)) +import HStream.Server.Types (ServerContext (..)) import HStream.Server.Validation -import qualified HStream.Stats as Stats -import qualified HStream.Store as Store -import HStream.ThirdParty.Protobuf as PB +import qualified HStream.Stats as Stats +import qualified HStream.Store as Store +import HStream.ThirdParty.Protobuf as PB import HStream.Utils -------------------------------------------------------------------------------- @@ -67,6 +73,12 @@ handleCreateStream sc _ stream = catchDefaultEx $ do validateStream stream C.createStream sc stream +handleCreateStreamV2 :: ServerContext -> Slot.SlotConfig -> G.UnaryHandler Stream Stream +handleCreateStreamV2 sc slotConfig _ stream = catchDefaultEx $ do + Log.debug $ "Receive Create Stream Request: " <> Log.buildString' stream + validateStream stream + C.createStreamV2 sc slotConfig stream + -- DeleteStream have two mod: force delete or normal delete -- For normal delete, if current stream have active subscription, the delete request will return error. -- For force delete, if current stream have active subscription, the stream will be archived. After that, @@ -91,6 +103,12 @@ handleDeleteStream sc _ req = catchDefaultEx $ do validateNameAndThrow ResStream $ deleteStreamRequestStreamName req C.deleteStream sc req >> pure Empty +handleDeleteStreamV2 :: ServerContext -> Slot.SlotConfig -> G.UnaryHandler DeleteStreamRequest Empty +handleDeleteStreamV2 sc slotConfig _ req = catchDefaultEx $ do + Log.debug $ "Receive Delete Stream Request: " <> Log.buildString' req + validateNameAndThrow ResStream $ deleteStreamRequestStreamName req + C.deleteStreamV2 sc slotConfig req >> pure Empty + getStreamHandler :: ServerContext -> ServerRequest 'Normal GetStreamRequest GetStreamResponse @@ -209,6 +227,12 @@ handleListShard sc _ req = listShardsExHandle $ do validateNameAndThrow ResStream $ listShardsRequestStreamName req ListShardsResponse <$> C.listShards sc req +handleListShardV2 :: ServerContext -> Slot.SlotConfig -> G.UnaryHandler ListShardsRequest ListShardsResponse +handleListShardV2 sc slotConfig _ req = listShardsExHandleV2 $ do + Log.debug "Receive List Shards Request" + validateNameAndThrow ResStream $ listShardsRequestStreamName req + ListShardsResponse <$> C.listShardsV2 sc slotConfig req + trimShardHandler :: ServerContext -> ServerRequest 'Normal TrimShardRequest Empty @@ -276,4 +300,12 @@ listShardsExHandle = HE.mkExceptionHandle handlers G.throwGrpcError $ HE.mkGrpcStatus err G.StatusUnavailable ] ++ defaultExHandlers +listShardsExHandleV2 :: IO a -> IO a +listShardsExHandleV2 = HE.mkExceptionHandle handlers + where + handlers = + [ Handler $ \(err :: ZK.ZNONODE) -> do + G.throwGrpcError $ HE.mkGrpcStatus err G.StatusUnavailable + ] ++ defaultExHandlers + #undef MkUnavailable diff --git a/hstream/src/HStream/Server/Shard.hs b/hstream/src/HStream/Server/Shard.hs index 7c5c55c79..af4de0faa 100644 --- a/hstream/src/HStream/Server/Shard.hs +++ b/hstream/src/HStream/Server/Shard.hs @@ -2,42 +2,43 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} -module HStream.Server.Shard( - Shard (..), - ShardKey (..), - mkShard, - mkShardWithDefaultId, - splitShardByKey, - halfSplit, - mergeShard, - devideKeySpace, - createShard, - - ShardMap, - mkShardMap, - getShard, - insertShard, - deleteShard, - - SharedShardMap, - -- mkSharedShardMap, - mkSharedShardMapWithShards, - getShardMap, - putShardMap, - readShardMap, - getShardByKey, - getShardMapIdx, - splitByKey, - splitHalf, - mergeTwoShard, - - hashShardKey, - keyToCBytes, - cBytesToKey, - shardStartKey, - shardEndKey, - shardEpoch -) where +module HStream.Server.Shard + ( Shard (..) + , ShardKey (..) + , mkShard + , mkShardWithDefaultId + , splitShardByKey + , halfSplit + , mergeShard + , devideKeySpace + , createShard + , mkShardAttrs + + , ShardMap + , mkShardMap + , getShard + , insertShard + , deleteShard + + , SharedShardMap + -- , mkSharedShardMap + , mkSharedShardMapWithShards + , getShardMap + , putShardMap + , readShardMap + , getShardByKey + , getShardMapIdx + , splitByKey + , splitHalf + , mergeTwoShard + + , hashShardKey + , keyToCBytes + , cBytesToKey + , shardStartKey + , shardEndKey + , shardEpoch + ) where import Control.Concurrent.STM (STM, TMVar, atomically, newTMVarIO, putTMVar, readTMVar, swapTMVar, @@ -52,6 +53,7 @@ import Data.Hashable (Hashable (hash)) import Data.List (iterate') import Data.Map.Strict (Map) import qualified Data.Map.Strict as M +import Data.Text (Text) import qualified Data.Text as T import Data.Text.Encoding (encodeUtf8) import Data.Typeable (cast) @@ -346,6 +348,13 @@ createShard client shard@Shard{..} = do newShardId <- S.createStreamPartition client streamId (Just $ getShardName startKey endKey) attr return $ shard {shardId = newShardId} +mkShardAttrs :: ShardKey -> ShardKey -> Word64 -> Map Text Text +mkShardAttrs (ShardKey start) (ShardKey end) epoch = M.fromList + [ ("startKey", T.pack $ show start) + , ("endKey", T.pack $ show end) + , ("epoch", T.pack $ show epoch) + ] + getShardName :: ShardKey -> ShardKey -> CB.CBytes getShardName startKey endKey = "shard-" <> keyToCBytes startKey <> "-" <> keyToCBytes endKey diff --git a/hstream/test/HStream/ConfigSpec.hs b/hstream/test/HStream/ConfigSpec.hs index 397358542..6a02bc54d 100644 --- a/hstream/test/HStream/ConfigSpec.hs +++ b/hstream/test/HStream/ConfigSpec.hs @@ -107,6 +107,7 @@ defaultConfig = ServerOpts , _gossipOpts = defaultGossipOpts , _ioOptions = defaultIOOptions , _querySnapshotPath = "/data/query_snapshots" + , experimentalFeatures = [] } defaultIOOptions :: IOOptions @@ -223,6 +224,8 @@ emptyCliOptions = CliOptions { , _ioConnectorImages_ = [] , _querySnapshotPath_ = Nothing + + , cliExperimentalFeatures = [] } @@ -263,6 +266,7 @@ instance Arbitrary ServerOpts where let _querySnapshotPath = "/data/query_snapshots" _listenersSecurityProtocolMap <- M.fromList . zip listenersKeys . repeat <$> elements ["plaintext", "tls"] let _securityProtocolMap = M.fromList [("plaintext", Nothing), ("tls", _tlsConfig)] + let experimentalFeatures = [] pure ServerOpts{..} instance Arbitrary CliOptions where @@ -294,6 +298,7 @@ instance Arbitrary CliOptions where _ioTasksNetwork_ <- genMaybe $ T.pack <$> nameGen _ioConnectorImages_ <- listOf5' $ T.pack <$> connectorImageCliOptGen let _querySnapshotPath_ = Just "/data/query_snapshots" + let cliExperimentalFeatures = [] pure CliOptions{..} instance Arbitrary Listener where