Skip to content

Commit

Permalink
Server: add a stream-v2 experimental feature (#1518)
Browse files Browse the repository at this point in the history
* Support basic writing & reading
  • Loading branch information
4eUeP authored Jul 19, 2023
1 parent bdec836 commit ab545f3
Show file tree
Hide file tree
Showing 14 changed files with 481 additions and 123 deletions.
21 changes: 20 additions & 1 deletion common/base/HStream/Base/Time.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE NumericUnderscores #-}

module HStream.Base.Time
( UT.UnixTime (..)
, UT.Format
Expand All @@ -13,12 +16,15 @@ module HStream.Base.Time
, formatSystemTimeGMT
, parseSystemTime
, parseSystemTimeGMT
, CTime (CTime)
, getSystemMsTimestamp
, getSystemNsTimestamp
-- Re-export
, CTime (CTime)
, module Data.Time.Clock.System
) where

import Data.ByteString (ByteString)
import Data.Int
import Data.Time.Clock.System
import qualified Data.UnixTime as UT
import Foreign.C.Types (CTime (CTime))
Expand Down Expand Up @@ -91,6 +97,19 @@ parseSystemTimeGMT fmt str = unixtime2sys $ UT.parseUnixTimeGMT fmt str

-------------------------------------------------------------------------------

getSystemMsTimestamp :: IO Int64
getSystemMsTimestamp = do
MkSystemTime sec nano <- getSystemTime
let !ts = floor @Double $ (fromIntegral sec * 1e3) + (fromIntegral nano / 1e6)
return ts

getSystemNsTimestamp :: IO Int64
getSystemNsTimestamp = do
MkSystemTime sec nano <- getSystemTime
return $ sec * 1_000_000_000 + (fromIntegral nano)

-------------------------------------------------------------------------------

systime2unix :: SystemTime -> UT.UnixTime
systime2unix (MkSystemTime sec _nano) = UT.UnixTime (CTime sec) 0
{-# INLINABLE systime2unix #-}
Expand Down
19 changes: 13 additions & 6 deletions common/hstream/HStream/Utils/RPC.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{-# OPTIONS_GHC -Wno-orphans #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE PatternSynonyms #-}

module HStream.Utils.RPC
( HStreamClientApi
Expand All @@ -25,6 +26,7 @@ module HStream.Utils.RPC
, getServerResp
, getServerRespPure
, getProtoTimestamp
, nsTimestampToProto
, msTimestampToProto
, timestampToMsTimestamp
, isSuccessful
Expand Down Expand Up @@ -171,6 +173,11 @@ getProtoTimestamp = do
MkSystemTime sec nano <- getSystemTime
return $ Timestamp sec (fromIntegral nano)

nsTimestampToProto :: Int64 -> Timestamp
nsTimestampToProto ns =
let (sec, nano) = ns `divMod` 1_000_000_000
in Timestamp sec (fromIntegral nano)

msTimestampToProto :: Int64 -> Timestamp
msTimestampToProto millis =
let (sec, remain) = millis `divMod` 1000
Expand Down
27 changes: 8 additions & 19 deletions common/hstream/HStream/Utils/Time.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
{-# LANGUAGE BangPatterns #-}

module HStream.Utils.Time
( Interval (..)
, parserInterval
Expand All @@ -12,20 +10,17 @@ module HStream.Utils.Time

-- * Re-export
, getPOSIXTime
, getCurrentMsTimestamp
) where

import Control.Applicative ((<|>))
import Data.Attoparsec.Text (Parser, choice, endOfInput, parseOnly,
rational, string)
import Data.Int (Int64)
import qualified Data.Text as T
import Data.Time.Clock (NominalDiffTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Control.Applicative ((<|>))
import Data.Attoparsec.Text (Parser, choice, endOfInput, parseOnly,
rational, string)
import Data.Int (Int64)
import qualified Data.Text as T
import Data.Time.Clock (NominalDiffTime)
import Data.Time.Clock.POSIX (getPOSIXTime)

import Data.Time.Clock.System (SystemTime (MkSystemTime),
getSystemTime)
import HStream.Base (rmTrailingZeros)
import HStream.Base (rmTrailingZeros)

data Interval
= Milliseconds Double
Expand Down Expand Up @@ -80,9 +75,3 @@ msecSince start = floor . (* 1e3) <$> diffTimeSince start
secSince :: NominalDiffTime -> IO Int64
secSince start = floor <$> diffTimeSince start
{-# INLINE secSince #-}

getCurrentMsTimestamp :: IO Int64
getCurrentMsTimestamp = do
MkSystemTime sec nano <- getSystemTime
let !ts = floor @Double $ (fromIntegral sec * 1e3) + (fromIntegral nano / 1e6)
return ts
5 changes: 2 additions & 3 deletions hstream-sql/hstream-sql.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ flag hstream_use_v2_engine

common shared-properties
ghc-options:
-Wall -Wextra -Wcompat
-Widentities -Wincomplete-record-updates -Wincomplete-uni-patterns
-Wpartial-fields -Wredundant-constraints
-Wall -Wextra -Wcompat -Widentities -Wincomplete-record-updates
-Wincomplete-uni-patterns -Wpartial-fields -Wredundant-constraints

if flag(releasebuild)
ghc-options:
Expand Down
42 changes: 31 additions & 11 deletions hstream/app/server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,17 @@ import HStream.MetaStore.Types as M (MetaHandle (..),
RHandle (..))
import HStream.Server.Config (AdvertisedListeners,
CliOptions (..),
ExperimentalFeature (..),
ListenersSecurityProtocolMap,
MetaStoreAddr (..),
SecurityProtocolMap,
ServerOpts (..), TlsConfig,
advertisedListenersToPB,
cliOptionsParser, getConfig)
import qualified HStream.Server.Core.Cluster as Cluster
import qualified HStream.Server.Experimental as Exp
import HStream.Server.Handler (handlers)
import qualified HStream.Server.HsGrpcHandler as HsGrpc
import qualified HStream.Server.HsGrpcHandler as HsGrpcHandler
import HStream.Server.HStreamApi (NodeState (..),
ServerNode (ServerNode),
hstreamApiServer)
Expand Down Expand Up @@ -182,21 +184,31 @@ app config@ServerOpts{..} = do
void . forkIO $ updateHashRing gossipContext (loadBalanceHashRing serverContext)

Async.withAsync
(serve _serverHost _serverPort _securityProtocolMap serverContext
_serverAdvertisedListeners _listenersSecurityProtocolMap) $ \a -> do
a1 <- startGossip _serverHost gossipContext
Async.link2Only (const True) a a1
waitGossipBoot gossipContext
Async.wait a
(serve _serverHost _serverPort
serverContext
_securityProtocolMap
_serverAdvertisedListeners
_listenersSecurityProtocolMap
(ExperimentalStreamV2 `elem` experimentalFeatures)
) $ \a -> do
a1 <- startGossip _serverHost gossipContext
Async.link2Only (const True) a a1
waitGossipBoot gossipContext
Async.wait a

serve :: ByteString
-> Word16
-> SecurityProtocolMap
-> ServerContext
-> SecurityProtocolMap
-> AdvertisedListeners
-> ListenersSecurityProtocolMap
-> Bool
-- ^ Experimental features
-> IO ()
serve host port securityMap sc@ServerContext{..} listeners listenerSecurityMap = do
serve host port
sc@ServerContext{..}
securityMap listeners listenerSecurityMap
enableExpStreamV2 = do
Log.i "************************"
hPutStrLn stderr $ [r|
_ _ __ _____ ___ ___ __ __ __
Expand Down Expand Up @@ -270,7 +282,11 @@ serve host port securityMap sc@ServerContext{..} listeners listenerSecurityMap =
, HsGrpc.serverOnStarted = Just listenerOnStarted
, HsGrpc.serverSslOptions = newSslOpts
}
HsGrpc.runServer grpcOpts' (HsGrpc.handlers sc')
if enableExpStreamV2
then do Log.info "Enable experimental feature: stream-v2"
slotConfig <- Exp.doStreamV2Init sc'
HsGrpc.runServer grpcOpts' (Exp.streamV2Handlers sc' slotConfig)
else HsGrpc.runServer grpcOpts' (HsGrpcHandler.handlers sc')
#endif

#ifdef HStreamUseGrpcHaskell
Expand All @@ -279,7 +295,11 @@ serve host port securityMap sc@ServerContext{..} listeners listenerSecurityMap =
hstreamApiServer api grpcOpts
#else
Log.info "Starting server with hs-grpc-server..."
HsGrpc.runServer grpcOpts (HsGrpc.handlers sc)
if enableExpStreamV2
then do Log.info "Enable experimental feature: stream-v2"
slotConfig <- Exp.doStreamV2Init sc
HsGrpc.runServer grpcOpts (Exp.streamV2Handlers sc slotConfig)
else HsGrpc.runServer grpcOpts (HsGrpcHandler.handlers sc)
#endif

--------------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions hstream/hstream.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ library
HStream.Server.Core.Cluster
HStream.Server.Core.Common
HStream.Server.Exception
HStream.Server.Experimental
HStream.Server.Handler
HStream.Server.Handler.Common
HStream.Server.HsGrpcHandler
Expand All @@ -81,6 +82,7 @@ library
HStream.Server.Core.Stream
HStream.Server.Core.Subscription
HStream.Server.Core.View
HStream.Server.Experimental.StreamV2
HStream.Server.Handler.Admin
HStream.Server.Handler.Cluster
HStream.Server.Handler.Connector
Expand Down
36 changes: 30 additions & 6 deletions hstream/src/HStream/Server/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

module HStream.Server.Config
( ServerOpts (..)
, ExperimentalFeature (..)
, CliOptions (..)
, cliOptionsParser
, TlsConfig (..)
Expand Down Expand Up @@ -50,6 +51,7 @@ import Options.Applicative as O (Alternative (many, (<|>)),
long, maybeReader,
metavar, option, optional,
short, strOption, value)
import qualified Options.Applicative as O
import System.Directory (makeAbsolute)
import Text.Read (readEither)
import qualified Z.Data.CBytes as CB
Expand Down Expand Up @@ -92,6 +94,11 @@ data MetaStoreAddr
| FileAddr FilePath
deriving (Eq)

instance Show MetaStoreAddr where
show (ZkAddr addr) = "zk://" <> CB.unpack addr
show (RqAddr addr) = "rq://" <> T.unpack addr
show (FileAddr addr) = "file://" <> addr

data ServerOpts = ServerOpts
{ _serverHost :: !ByteString
, _serverPort :: !Word16
Expand Down Expand Up @@ -126,6 +133,7 @@ data ServerOpts = ServerOpts
, _ioOptions :: !IO.IOOptions

, _querySnapshotPath :: !FilePath
, experimentalFeatures :: ![ExperimentalFeature]
} deriving (Show, Eq)

getConfig :: CliOptions -> IO ServerOpts
Expand Down Expand Up @@ -170,6 +178,8 @@ data CliOptions = CliOptions
, _ioConnectorImages_ :: ![Text]

, _querySnapshotPath_ :: !(Maybe FilePath)

, cliExperimentalFeatures :: ![ExperimentalFeature]
} deriving Show

cliOptionsParser :: O.Parser CliOptions
Expand Down Expand Up @@ -201,10 +211,11 @@ cliOptionsParser = do
_ioTasksNetwork_ <- optional ioTasksNetwork
_ioConnectorImages_ <- ioConnectorImage
_querySnapshotPath_ <- optional querySnapshotPath
return CliOptions {..}
cliExperimentalFeatures <- many experimentalFeatureParser
return CliOptions{..}

parseJSONToOptions :: CliOptions -> Y.Object -> Y.Parser ServerOpts
parseJSONToOptions CliOptions {..} obj = do
parseJSONToOptions CliOptions{..} obj = do
nodeCfgObj <- obj .: "hserver"
nodeId <- nodeCfgObj .: "id"
nodeHost <- fromString <$> nodeCfgObj .:? "bind-address" .!= "0.0.0.0"
Expand Down Expand Up @@ -318,10 +329,27 @@ parseJSONToOptions CliOptions {..} obj = do
processingCfg <- nodeCfgObj .:? "hstream-processing" .!= mempty
snapshotPath <- processingCfg .:? "query-snapshot-path" .!= "/data/query_snapshots"
let !_querySnapshotPath = fromMaybe snapshotPath _querySnapshotPath_

let experimentalFeatures = cliExperimentalFeatures

return ServerOpts {..}

-------------------------------------------------------------------------------

data ExperimentalFeature = ExperimentalStreamV2
deriving (Show, Eq)

parseExperimentalFeature :: O.ReadM ExperimentalFeature
parseExperimentalFeature = O.eitherReader $ \case
"stream-v2" -> Right ExperimentalStreamV2
x -> Left $ "cannot parse experimental feature: " <> x

experimentalFeatureParser :: O.Parser ExperimentalFeature
experimentalFeatureParser = option parseExperimentalFeature $
long "experimental" <> metavar "ExperimentalFeature"

-------------------------------------------------------------------------------

configPath :: O.Parser String
configPath = strOption
$ long "config-path"
Expand Down Expand Up @@ -551,10 +579,6 @@ listenerP = do
-- Nothing -> errorWithoutStackTrace $ "Invalid meta store address, no Auth: " <> str
-- Nothing -> errorWithoutStackTrace $ "Invalid meta store address, no parse: " <> str

instance Show MetaStoreAddr where
show (ZkAddr addr) = "zk://" <> CB.unpack addr
show (RqAddr addr) = "rq://" <> T.unpack addr

readWithErrLog :: Read a => String -> String -> a
readWithErrLog opt v = case readEither v of
Right x -> x
Expand Down
Loading

0 comments on commit ab545f3

Please sign in to comment.