Skip to content

Commit

Permalink
Experimental feature stream-v2 (#1523)
Browse files Browse the repository at this point in the history
* Support getTailRecordId
  • Loading branch information
4eUeP authored Jul 21, 2023
1 parent e70633a commit 3460d22
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 4 deletions.
14 changes: 14 additions & 0 deletions common/hstream/HStream/Common/ZookeeperSlotAlloc.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ module HStream.Common.ZookeeperSlotAlloc
, allocateSlot
, deallocateSlot
, doesSlotExist
, doesSlotValueExist
, getSlotByName
, getSlotValueByName
) where

import Control.Exception (throwIO, try)
Expand Down Expand Up @@ -143,12 +145,24 @@ doesSlotExist SlotConfig{..} name = do
let path = slotRoot <> "/name/" <> name
isJust <$> ZK.zooExists slotZkHandler path

doesSlotValueExist :: SlotConfig -> CBytes -> SlotValue -> IO Bool
doesSlotValueExist c name value = do
let path = slotRoot c <> "/name/" <> name
errmsg = "Impossible empty value: " <> show path
e <- try $ slotGet c path errmsg
case e of
Left (_ :: ZK.ZNONODE) -> pure False
Right HT.Slot{..} -> pure $ Map.member value slotVals

getSlotByName :: HasCallStack => SlotConfig -> CBytes -> IO HT.Slot
getSlotByName c name = do
let path = slotRoot c <> "/name/" <> name
errmsg = "Get slot failed, name: " <> show name
slotGet c path errmsg

getSlotValueByName :: HasCallStack => SlotConfig -> CBytes -> IO [SlotValue]
getSlotValueByName c name = Map.keys . HT.slotVals <$> getSlotByName c name

-------------------------------------------------------------------------------

getFree :: HasCallStack => SlotConfig -> IO SlotValue
Expand Down
28 changes: 25 additions & 3 deletions hstream/src/HStream/Server/Core/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module HStream.Server.Core.Stream
, createStreamV2
, deleteStreamV2
, listShardsV2
, getTailRecordIdV2
) where

import Control.Concurrent (modifyMVar_)
Expand Down Expand Up @@ -228,6 +229,9 @@ getStreamInfo ServerContext{..} stream = do

getTailRecordId :: ServerContext -> API.GetTailRecordIdRequest -> IO API.GetTailRecordIdResponse
getTailRecordId ServerContext{..} API.GetTailRecordIdRequest{getTailRecordIdRequestShardId=sId} = do
-- FIXME: this should be 'S.doesStreamPartitionValExist', however, at most
-- time S.logIdHasGroup should also work, and is faster than
-- 'S.doesStreamPartitionValExist'
shardExists <- S.logIdHasGroup scLDClient sId
unless shardExists $ throwIO $ HE.ShardNotFound $ "Shard with id " <> T.pack (show sId) <> " is not found."
lsn <- S.getTailLSN scLDClient sId
Expand All @@ -237,6 +241,24 @@ getTailRecordId ServerContext{..} API.GetTailRecordIdRequest{getTailRecordIdRequ
}
return $ API.GetTailRecordIdResponse { getTailRecordIdResponseTailRecordId = Just recordId}

getTailRecordIdV2
:: ServerContext
-> Slot.SlotConfig
-> API.GetTailRecordIdRequest -> IO API.GetTailRecordIdResponse
getTailRecordIdV2 ServerContext{..} slotConfig API.GetTailRecordIdRequest{..} = do
let streamName = textToCBytes getTailRecordIdRequestStreamName
sId = getTailRecordIdRequestShardId
shardExists <- Slot.doesSlotValueExist slotConfig streamName sId
unless shardExists $ throwIO $ HE.ShardNotFound $
"Stream " <> getTailRecordIdRequestStreamName
<> " with shard id " <> T.pack (show sId) <> " is not found."
lsn <- S.getTailLSN scLDClient sId
let recordId = API.RecordId { recordIdShardId = sId
, recordIdBatchId = lsn
, recordIdBatchIndex = 0
}
return $ API.GetTailRecordIdResponse { getTailRecordIdResponseTailRecordId = Just recordId}

append :: HasCallStack
=> ServerContext
-> T.Text -- streamName
Expand Down Expand Up @@ -297,9 +319,9 @@ listShards ServerContext{..} API.ListShardsRequest{..} = do
V.foldM' getShardInfo V.empty $ V.fromList shards
where
streamId = transToStreamName listShardsRequestStreamName
startKey = CB.pack "startKey"
endKey = CB.pack "endKey"
epoch = CB.pack "epoch"
startKey = "startKey"
endKey = "endKey"
epoch = "epoch"

getShardInfo shards logId = do
attr <- S.getStreamPartitionExtraAttrs scLDClient logId
Expand Down
2 changes: 1 addition & 1 deletion hstream/src/HStream/Server/Experimental/StreamV2.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ streamV2Handlers sc slotConfig =
, unary (GRPC :: GRPC P.HStreamApi "listShards") (H.handleListShardV2 sc slotConfig)
--, unary (GRPC :: GRPC P.HStreamApi "trimStream") (H.handleTrimStream sc)
--, unary (GRPC :: GRPC P.HStreamApi "trimShard") (H.handleTrimShard sc)
--, unary (GRPC :: GRPC P.HStreamApi "getTailRecordId") (H.handleGetTailRecordId sc)
, unary (GRPC :: GRPC P.HStreamApi "getTailRecordId") (H.handleGetTailRecordIdV2 sc slotConfig)
-- Reader
--, unary (GRPC :: GRPC P.HStreamApi "listShardReaders") (H.handleListShardReaders sc)
--, unary (GRPC :: GRPC P.HStreamApi "createShardReader") (H.handleCreateShardReader sc)
Expand Down
8 changes: 8 additions & 0 deletions hstream/src/HStream/Server/Handler/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ module HStream.Server.Handler.Stream
, handleCreateStreamV2
, handleDeleteStreamV2
, handleListShardV2
, handleGetTailRecordIdV2
) where

import Control.Exception
Expand Down Expand Up @@ -177,6 +178,13 @@ handleGetTailRecordId sc _ req = catchDefaultEx $ do
Log.debug $ "Receive Get TailRecordId Request: " <> Log.buildString' req
C.getTailRecordId sc req

handleGetTailRecordIdV2
:: ServerContext -> Slot.SlotConfig
-> G.UnaryHandler GetTailRecordIdRequest GetTailRecordIdResponse
handleGetTailRecordIdV2 sc slotConfig _ req = catchDefaultEx $ do
Log.debug $ "Receive Get TailRecordId Request: " <> Log.buildString' req
C.getTailRecordIdV2 sc slotConfig req

getTailRecordIdHandler
:: ServerContext
-> ServerRequest 'Normal GetTailRecordIdRequest GetTailRecordIdResponse
Expand Down

0 comments on commit 3460d22

Please sign in to comment.