Skip to content

Commit

Permalink
cli: use the interactive shell to execute hstream append (#1573)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Aug 23, 2023
1 parent d544f4a commit f7e6d30
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 59 deletions.
45 changes: 19 additions & 26 deletions hstream/app/client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -Werror=incomplete-patterns #-}
{-# LANGUAGE NamedFieldPuns #-}

module Main where

import Control.Concurrent (threadDelay)
import Control.Monad (when)
import Data.Aeson as Aeson
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import qualified Data.Char as Char
import qualified Data.List as L
import Data.Maybe (mapMaybe, maybeToList)
Expand All @@ -28,7 +27,6 @@ import Network.GRPC.HighLevel.Generated (ClientError (..),
withGRPCClient)
import qualified Options.Applicative as O
import Proto3.Suite (def)
import qualified Proto3.Suite as PB
import System.Exit (exitFailure)
import System.Timeout (timeout)
import Text.RawString.QQ (r)
Expand All @@ -38,21 +36,22 @@ import HStream.Client.Action (createSubscription',
deleteStream,
deleteSubscription,
getStream, getSubscription,
insertIntoStream',
listShards, listStreams,
listSubscriptions, readShard,
readStream)
import HStream.Client.Execute (executeWithLookupResource_,
initCliContext,
simpleExecute)
import HStream.Client.Internal (interactiveAppend)
#ifdef HStreamEnableSchema
import HStream.Client.SQLNew (commandExec,
interactiveSQLApp)
#else
import HStream.Client.SQL (commandExec,
interactiveSQLApp)
#endif
import HStream.Client.Types (AppendArgs (..), CliCmd (..),
import HStream.Client.Types (AppendContext (..),
AppendOpts (..), CliCmd (..),
Command (..),
HStreamCommand (..),
HStreamInitOpts (..),
Expand All @@ -65,13 +64,11 @@ import HStream.Client.Types (AppendArgs (..), CliCmd (..),
Resource (..),
StreamCommand (..),
SubscriptionCommand (..),
cliCmdParser,
cliCmdParser, mkShardMap,
refineCliConnOpts)
import HStream.Client.Utils (calculateShardId,
mkClientNormalRequest',
import HStream.Client.Utils (mkClientNormalRequest',
printResult)
import HStream.Common.Types (getHStreamVersion,
hashShardKey)
import HStream.Common.Types (getHStreamVersion)
import HStream.Server.HStreamApi (DescribeClusterResponse (..),
HStreamApi (..),
ServerNode (..),
Expand All @@ -82,7 +79,6 @@ import HStream.Utils (ResourceType (..),
fillWithJsonString',
formatResult, getServerResp,
handleGRPCIOError,
jsonObjectToStruct,
newRandomText,
pattern EnumPB)
import qualified HStream.Utils.Aeson as AesonComp
Expand Down Expand Up @@ -208,22 +204,19 @@ hstreamStream connOpts@RefinedCliConnOpts{..} cmd = do
}
ctx <- initCliContext connOpts
executeWithLookupResource_ ctx (Resource ResShardReader (API.readStreamRequestReaderId req)) (readStream req)
StreamCmdAppend AppendArgs{..} -> do
StreamCmdAppend AppendOpts{..} -> do
ctx <- initCliContext connOpts
shards <- fmap API.listShardsResponseShards . getServerResp =<< simpleExecute clientConfig (listShards appendStream)
let shardKey = hashShardKey appendRecordKey
case calculateShardId shardKey (V.toList shards) of
Just sid -> do
let payload = if isHRecord then map toHRecord appendRecord else appendRecord
executeWithLookupResource_ ctx (Resource ResShard (T.pack $ show sid))
(insertIntoStream' appendStream sid isHRecord (V.fromList payload) appendCompressionType appendRecordKey)
Nothing -> errorWithoutStackTrace $ "Failed to calculate shardId with stream: "
<> show appendStream <> ", parition key: " <> show appendRecordKey
where
toHRecord payload = case Aeson.eitherDecode . BS.fromStrict $ payload of
Left e -> errorWithoutStackTrace $ "invalied HRecord: " <> show e
Right p -> BSL.toStrict . PB.toLazyByteString . jsonObjectToStruct $ p

shards <- fmap API.listShardsResponseShards . getServerResp =<< simpleExecute clientConfig (listShards _appStream)
let shardMap = mkShardMap shards
let appendCtx = AppendContext
{ cliCtx = ctx
, appStream = _appStream
, appKeySeparator = _appKeySeparator
, appRetryLimit = _appRetryLimit
, appRetryInterval = _appRetryInterval
, appShardMap = shardMap
}
interactiveAppend appendCtx

hstreamSubscription :: RefinedCliConnOpts -> SubscriptionCommand -> IO ()
hstreamSubscription connOpts@RefinedCliConnOpts{..} = \case
Expand Down
56 changes: 53 additions & 3 deletions hstream/src/HStream/Client/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,44 @@ module HStream.Client.Internal
( streamingFetch
, cliFetch
, cliFetch'
, interactiveAppend
) where

import Control.Concurrent (threadDelay)
import Control.Monad (void, when)
import Control.Monad.IO.Class
import Data.Aeson as Aeson
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BSL
import Data.IORef (IORef, newIORef, readIORef,
writeIORef)
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import qualified Data.Vector as V
import Network.GRPC.HighLevel.Generated (ClientRequest (..))
import qualified Proto3.Suite as PB
import qualified System.Console.Haskeline as RL
import Text.StringRandom (stringRandomIO)

import HStream.Client.Action
import HStream.Client.Execute
import HStream.Client.Types (HStreamCliContext,
Resource (..))
import HStream.Client.Types (AppendContext (..),
HStreamCliContext (..),
Resource (..),
getShardIdByKey)
import HStream.Client.Utils
import HStream.Common.Types (hashShardKey)
import qualified HStream.Server.HStreamApi as API
import HStream.SQL (DropObject (..))
import qualified HStream.ThirdParty.Protobuf as PB
import HStream.Utils (ResourceType (..),
clientDefaultKey,
decompressBatchedRecord,
formatResult, getServerResp,
jsonObjectToStruct,
newRandomText)


streamingFetch :: HStreamCliContext -> T.Text -> API.HStreamApi ClientRequest response -> IO ()
streamingFetch = streamingFetch' (putStr . formatResult @PB.Struct) False

Expand Down Expand Up @@ -102,3 +114,41 @@ genRandomSinkStreamSQL sql = do
randomName <- stringRandomIO "[a-zA-Z]{20}"
let streamName = "cli_generated_stream_" <> randomName
return (streamName, "CREATE STREAM " <> streamName <> " AS " <> sql)

interactiveAppend :: AppendContext -> IO ()
interactiveAppend AppendContext{..} = do
RL.runInputT settings loop
where
settings = RL.Settings RL.noCompletion Nothing False

loop = RL.withInterrupt . RL.handleInterrupt loop $ do
RL.getInputLine "> " >>= \case
Nothing -> pure ()
Just str -> do
let items = splitOn appKeySeparator (BC.pack str)
when (null items || length items > 2) $
errorWithoutStackTrace "invalid input: specific multiple keys"

let partitionKey = if length items == 1 then clientDefaultKey else TE.decodeUtf8 . head $ items
let record = if length items == 1 then head items else last items
let shardKey = hashShardKey partitionKey
case getShardIdByKey shardKey appShardMap of
Just sid -> do
let (isHRecord, payload) = toHRecord record
liftIO $ executeWithLookupResource_ cliCtx (Resource ResShard (T.pack $ show sid))
(retry appRetryLimit appRetryInterval $ insertIntoStream' appStream sid isHRecord (V.fromList [payload]) API.CompressionTypeNone partitionKey)
loop
Nothing -> errorWithoutStackTrace $ "Failed to calculate shardId with stream: "
<> show appStream <> ", parition key: " <> show (head items)

toHRecord payload = case Aeson.eitherDecode . BS.fromStrict $ payload of
Left _ -> (False, payload)
Right p -> (True, BSL.toStrict . PB.toLazyByteString . jsonObjectToStruct $ p)

-- Break a ByteString into pieces separated by the first ByteString argument, consuming the delimiter
splitOn :: BS.ByteString -> BS.ByteString -> [BS.ByteString]
splitOn "" = error "delimiter shouldn't be empty."
splitOn delimiter = go
where
go s = let (pre, post) = BS.breakSubstring delimiter s
in pre : if BS.null post then [] else go (BS.drop (BS.length delimiter) post)
73 changes: 43 additions & 30 deletions hstream/src/HStream/Client/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,25 @@ import Control.Concurrent (MVar)
import qualified Data.Attoparsec.Text as AP
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Foldable (foldl')
import Data.Functor (($>))
import Data.Int (Int64)
import qualified Data.Map as M
import Data.Maybe (isNothing)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Vector (Vector)
import Data.Word (Word32, Word64)
import HStream.Common.CliParsers (streamParser,
subscriptionParser)
import HStream.Common.Types
import qualified HStream.Server.HStreamApi as API
import HStream.Server.Types (ServerID)
import qualified HStream.Store as S
import HStream.Utils (ResourceType, SocketAddr (..),
clientDefaultKey,
mkGRPCClientConfWithSSL)
mkGRPCClientConfWithSSL,
textToCBytes)
import Network.GRPC.HighLevel.Client (ClientConfig (..),
ClientSSLConfig (..),
ClientSSLKeyCertPair (..))
Expand Down Expand Up @@ -72,7 +77,7 @@ data StreamCommand
| StreamCmdListShard Text
| StreamCmdReadShard ReadShardArgs
| StreamCmdReadStream ReadStreamArgs
| StreamCmdAppend AppendArgs
| StreamCmdAppend AppendOpts
deriving (Show)

streamCmdParser :: O.Parser StreamCommand
Expand All @@ -88,7 +93,7 @@ streamCmdParser = O.hsubparser
<> O.short 'f'
<> O.help "Whether to enable force deletion" ))
(O.progDesc "Delete a stream"))
<> O.command "append" (O.info (StreamCmdAppend <$> appendArgsParser)
<> O.command "append" (O.info (StreamCmdAppend <$> appendOptsParser)
(O.progDesc "Append record into stream"))
<> O.command "list-shard" (O.info (StreamCmdListShard <$> O.strArgument
( O.metavar "STREAM_NAME"
Expand All @@ -100,14 +105,42 @@ streamCmdParser = O.hsubparser
(O.progDesc "Read records from specific stream"))
)

data AppendArgs = AppendArgs
{ appendStream :: T.Text
, appendRecordKey :: T.Text
, appendRecord :: [ByteString]
, appendCompressionType :: API.CompressionType
, isHRecord :: Bool
data AppendOpts = AppendOpts
{ _appStream :: T.Text
, _appKeySeparator :: BS.ByteString
, _appRetryInterval :: Word32
, _appRetryLimit :: Word32
} deriving (Show)

appendOptsParser :: O.Parser AppendOpts
appendOptsParser = AppendOpts
<$> O.strArgument ( O.metavar "STREAM_NAME" <> O.help "The stream you want to write to")
<*> O.option O.str (O.long "separator" <> O.metavar "String" <> O.showDefault <> O.value "@" <> O.help "Separator of key. e.g. key1@value")
<*> O.option O.auto (O.long "retry-interval" <> O.metavar "INT" <> O.showDefault <> O.value 5 <> O.help "Interval to retry request to server")
<*> O.option O.auto (O.long "retry-limit" <> O.metavar "INT" <> O.showDefault <> O.value 3 <> O.help "Maximum number of retries allowed")

type ShardMap = M.Map ShardKey S.C_LogID

mkShardMap :: Vector API.Shard -> ShardMap
mkShardMap =
foldl'
(
\acc API.Shard{shardShardId=sId, shardStartHashRangeKey=startKey} ->
M.insert (cBytesToKey . textToCBytes $ startKey) sId acc
) M.empty

getShardIdByKey :: ShardKey -> ShardMap ->Maybe S.C_LogID
getShardIdByKey key mp = snd <$> M.lookupLE key mp

data AppendContext = AppendContext
{ cliCtx :: HStreamCliContext
, appStream :: T.Text
, appKeySeparator :: BS.ByteString
, appRetryInterval :: Word32
, appRetryLimit :: Word32
, appShardMap :: M.Map ShardKey S.C_LogID
}

instance Read API.CompressionType where
readPrec = do
l <- Read.lexP
Expand All @@ -117,26 +150,6 @@ instance Read API.CompressionType where
Read.Ident "zstd" -> return API.CompressionTypeZstd
x -> errorWithoutStackTrace $ "cannot parse compression type: " <> show x

appendArgsParser :: O.Parser AppendArgs
appendArgsParser = AppendArgs
<$> O.strArgument ( O.metavar "STREAM_NAME" <> O.help "The stream you want to write to")
<*> O.strOption ( O.long "partition-key"
<> O.short 'k'
<> O.metavar "TEXT"
<> O.value clientDefaultKey
<> O.showDefault
<> O.help "Partition key of append record"
)
<*> O.many (O.strOption ( O.long "payload" <> O.short 'p' <> O.help "Records you want to append"))
<*> O.option O.auto ( O.long "compression"
<> O.short 'o'
<> O.metavar "[none|gzip|zstd]"
<> O.value API.CompressionTypeNone
<> O.showDefault
<> O.help "Compresstion type"
)
<*> O.switch ( O.long "json" <> O.short 'j' <> O.help "Is json record")

data ReadStreamArgs = ReadStreamArgs
{ readStreamStreamNameArgs :: T.Text
, readStreamStartOffset :: Maybe API.StreamOffset
Expand Down

0 comments on commit f7e6d30

Please sign in to comment.