diff --git a/common/hstream/HStream/Common/ZookeeperSlotAlloc.hs b/common/hstream/HStream/Common/ZookeeperSlotAlloc.hs index e817c257c..dfe771a62 100644 --- a/common/hstream/HStream/Common/ZookeeperSlotAlloc.hs +++ b/common/hstream/HStream/Common/ZookeeperSlotAlloc.hs @@ -9,7 +9,9 @@ module HStream.Common.ZookeeperSlotAlloc , allocateSlot , deallocateSlot , doesSlotExist + , doesSlotValueExist , getSlotByName + , getSlotValueByName ) where import Control.Exception (throwIO, try) @@ -143,12 +145,24 @@ doesSlotExist SlotConfig{..} name = do let path = slotRoot <> "/name/" <> name isJust <$> ZK.zooExists slotZkHandler path +doesSlotValueExist :: SlotConfig -> CBytes -> SlotValue -> IO Bool +doesSlotValueExist c name value = do + let path = slotRoot c <> "/name/" <> name + errmsg = "Impossible empty value: " <> show path + e <- try $ slotGet c path errmsg + case e of + Left (_ :: ZK.ZNONODE) -> pure False + Right HT.Slot{..} -> pure $ Map.member value slotVals + getSlotByName :: HasCallStack => SlotConfig -> CBytes -> IO HT.Slot getSlotByName c name = do let path = slotRoot c <> "/name/" <> name errmsg = "Get slot failed, name: " <> show name slotGet c path errmsg +getSlotValueByName :: HasCallStack => SlotConfig -> CBytes -> IO [SlotValue] +getSlotValueByName c name = Map.keys . HT.slotVals <$> getSlotByName c name + ------------------------------------------------------------------------------- getFree :: HasCallStack => SlotConfig -> IO SlotValue diff --git a/hstream/src/HStream/Server/Core/Stream.hs b/hstream/src/HStream/Server/Core/Stream.hs index c05074d5f..a36455109 100644 --- a/hstream/src/HStream/Server/Core/Stream.hs +++ b/hstream/src/HStream/Server/Core/Stream.hs @@ -18,6 +18,7 @@ module HStream.Server.Core.Stream , createStreamV2 , deleteStreamV2 , listShardsV2 + , getTailRecordIdV2 ) where import Control.Concurrent (modifyMVar_) @@ -228,6 +229,9 @@ getStreamInfo ServerContext{..} stream = do getTailRecordId :: ServerContext -> API.GetTailRecordIdRequest -> IO API.GetTailRecordIdResponse getTailRecordId ServerContext{..} API.GetTailRecordIdRequest{getTailRecordIdRequestShardId=sId} = do + -- FIXME: this should be 'S.doesStreamPartitionValExist', however, at most + -- time S.logIdHasGroup should also work, and is faster than + -- 'S.doesStreamPartitionValExist' shardExists <- S.logIdHasGroup scLDClient sId unless shardExists $ throwIO $ HE.ShardNotFound $ "Shard with id " <> T.pack (show sId) <> " is not found." lsn <- S.getTailLSN scLDClient sId @@ -237,6 +241,24 @@ getTailRecordId ServerContext{..} API.GetTailRecordIdRequest{getTailRecordIdRequ } return $ API.GetTailRecordIdResponse { getTailRecordIdResponseTailRecordId = Just recordId} +getTailRecordIdV2 + :: ServerContext + -> Slot.SlotConfig + -> API.GetTailRecordIdRequest -> IO API.GetTailRecordIdResponse +getTailRecordIdV2 ServerContext{..} slotConfig API.GetTailRecordIdRequest{..} = do + let streamName = textToCBytes getTailRecordIdRequestStreamName + sId = getTailRecordIdRequestShardId + shardExists <- Slot.doesSlotValueExist slotConfig streamName sId + unless shardExists $ throwIO $ HE.ShardNotFound $ + "Stream " <> getTailRecordIdRequestStreamName + <> " with shard id " <> T.pack (show sId) <> " is not found." + lsn <- S.getTailLSN scLDClient sId + let recordId = API.RecordId { recordIdShardId = sId + , recordIdBatchId = lsn + , recordIdBatchIndex = 0 + } + return $ API.GetTailRecordIdResponse { getTailRecordIdResponseTailRecordId = Just recordId} + append :: HasCallStack => ServerContext -> T.Text -- streamName @@ -297,9 +319,9 @@ listShards ServerContext{..} API.ListShardsRequest{..} = do V.foldM' getShardInfo V.empty $ V.fromList shards where streamId = transToStreamName listShardsRequestStreamName - startKey = CB.pack "startKey" - endKey = CB.pack "endKey" - epoch = CB.pack "epoch" + startKey = "startKey" + endKey = "endKey" + epoch = "epoch" getShardInfo shards logId = do attr <- S.getStreamPartitionExtraAttrs scLDClient logId diff --git a/hstream/src/HStream/Server/Experimental/StreamV2.hs b/hstream/src/HStream/Server/Experimental/StreamV2.hs index 31c758fb2..96e9f1b40 100644 --- a/hstream/src/HStream/Server/Experimental/StreamV2.hs +++ b/hstream/src/HStream/Server/Experimental/StreamV2.hs @@ -68,7 +68,7 @@ streamV2Handlers sc slotConfig = , unary (GRPC :: GRPC P.HStreamApi "listShards") (H.handleListShardV2 sc slotConfig) --, unary (GRPC :: GRPC P.HStreamApi "trimStream") (H.handleTrimStream sc) --, unary (GRPC :: GRPC P.HStreamApi "trimShard") (H.handleTrimShard sc) - --, unary (GRPC :: GRPC P.HStreamApi "getTailRecordId") (H.handleGetTailRecordId sc) + , unary (GRPC :: GRPC P.HStreamApi "getTailRecordId") (H.handleGetTailRecordIdV2 sc slotConfig) -- Reader --, unary (GRPC :: GRPC P.HStreamApi "listShardReaders") (H.handleListShardReaders sc) --, unary (GRPC :: GRPC P.HStreamApi "createShardReader") (H.handleCreateShardReader sc) diff --git a/hstream/src/HStream/Server/Handler/Stream.hs b/hstream/src/HStream/Server/Handler/Stream.hs index f6a89b076..77082115f 100644 --- a/hstream/src/HStream/Server/Handler/Stream.hs +++ b/hstream/src/HStream/Server/Handler/Stream.hs @@ -33,6 +33,7 @@ module HStream.Server.Handler.Stream , handleCreateStreamV2 , handleDeleteStreamV2 , handleListShardV2 + , handleGetTailRecordIdV2 ) where import Control.Exception @@ -177,6 +178,13 @@ handleGetTailRecordId sc _ req = catchDefaultEx $ do Log.debug $ "Receive Get TailRecordId Request: " <> Log.buildString' req C.getTailRecordId sc req +handleGetTailRecordIdV2 + :: ServerContext -> Slot.SlotConfig + -> G.UnaryHandler GetTailRecordIdRequest GetTailRecordIdResponse +handleGetTailRecordIdV2 sc slotConfig _ req = catchDefaultEx $ do + Log.debug $ "Receive Get TailRecordId Request: " <> Log.buildString' req + C.getTailRecordIdV2 sc slotConfig req + getTailRecordIdHandler :: ServerContext -> ServerRequest 'Normal GetTailRecordIdRequest GetTailRecordIdResponse