Skip to content

Commit

Permalink
add trimStream && trimShard rpc (#1516)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Jul 19, 2023
1 parent 047ef70 commit bdec836
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 78 deletions.
4 changes: 3 additions & 1 deletion common/hstream/HStream/Exception.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ module HStream.Exception
, SQLNotSupportedByParseSQL(SQLNotSupportedByParseSQL)
, ConflictShardReaderOffset (ConflictShardReaderOffset)
, TooManyShardCount (TooManyShardCount)
, InvalidTrimPoint (InvalidTrimPoint)

-- * Exception: SomeDeadlineExceeded
--
Expand Down Expand Up @@ -417,7 +418,8 @@ MAKE_EX_0(SomeInvalidArgument, EmptyBatchedRecord, API.ErrorCodeStreamEmptyBatch
MAKE_EX_1(SomeInvalidArgument, InvalidRecordSize, Int, API.ErrorCodeStreamInvalidRecordSize,
"Record size exceeds the maximum size limit")
MAKE_EX_1_DEFMSG(SomeInvalidArgument, InvalidResourceType, String, API.ErrorCodeInternalError)
MAKE_EX_1_DEFMSG(SomeInvalidArgument, InvalidShardOffset, String, API.ErrorCodeInternalError)
MAKE_EX_1_DEFMSG(SomeInvalidArgument, InvalidShardOffset, String, API.ErrorCodeStreamInvalidOffset)
MAKE_EX_1_DEFMSG(SomeInvalidArgument, InvalidTrimPoint, String, API.ErrorCodeStreamInvalidOffset)
MAKE_EX_0_DEFMSG(SomeInvalidArgument, InvalidSubscriptionOffset, API.ErrorCodeSubscriptionInvalidOffset)
MAKE_EX_1_DEFMSG(SomeInvalidArgument, DecodeHStreamRecordErr, String, API.ErrorCodeInternalError)
MAKE_EX_0(SomeInvalidArgument, NoRecordHeader, API.ErrorCodeInternalError,
Expand Down
2 changes: 1 addition & 1 deletion external/protocol
Submodule protocol updated 1 files
+16 −0 hstream.proto
2 changes: 2 additions & 0 deletions hstream-store/test/HStream/StoreSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ base = describe "Base" $ do
S.trim client logid sn0
readPayload' logid (Just sn0) `shouldReturn` []
readPayload logid (Just sn1) `shouldReturn` "world"
-- trim lsn beyond tailLSN should fail
S.trim client logid (sn1 + 1) `shouldThrow` anyException

loggroupAround' $ do
it "trim last" $ \(_lgname, ranlogid) -> do
Expand Down
8 changes: 2 additions & 6 deletions hstream/src/HStream/Server/ConnectorTypes.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE StrictData #-}

module HStream.Server.ConnectorTypes
( SourceConnector (..),
Expand All @@ -25,8 +23,6 @@ import Data.Time.Clock.POSIX
import Data.Word (Word64)
import qualified HStream.Server.HStreamApi as API

import qualified HStream.Server.HStreamApi as API

type Timestamp = Int64 -- ms

type StreamName = T.Text
Expand Down Expand Up @@ -90,7 +86,7 @@ data SourceConnectorWithoutCkp = SourceConnectorWithoutCkp
-> IO ()
}

data SinkConnector = SinkConnector
newtype SinkConnector = SinkConnector
{ writeRecord :: (BL.ByteString -> Maybe BL.ByteString)
-> (BL.ByteString -> Maybe BL.ByteString)
-> SinkRecord
Expand Down
55 changes: 8 additions & 47 deletions hstream/src/HStream/Server/Core/ShardReader.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ module HStream.Server.Core.ShardReader
where

import Data.Functor ((<&>))
import Proto3.Suite (Enumerated (Enumerated))
import ZooKeeper.Exception (ZNONODE (..), throwIO)

import Control.Concurrent (modifyMVar_, newEmptyMVar, putMVar,
readMVar, takeMVar, withMVar)
import Control.Exception (bracket, catch, throw)
import Control.Exception (bracket, catch)
import Control.Monad (forM, forM_, unless, when)
import Data.ByteString (ByteString)
import Data.Either (isRight)
Expand All @@ -28,7 +27,7 @@ import Data.Int (Int64)
import Data.IORef (IORef, newIORef, readIORef,
writeIORef)
import qualified Data.Map.Strict as M
import Data.Maybe (fromJust, isJust)
import Data.Maybe (isJust)
import qualified Data.Text as T
import Data.Vector (Vector)
import qualified Data.Vector as V
Expand All @@ -43,8 +42,10 @@ import HStream.Server.HStreamApi (CreateShardReaderRequest (..))
import qualified HStream.Server.HStreamApi as API
import qualified HStream.Server.MetaData as P
import HStream.Server.Types (ServerContext (..),
ServerInternalOffset,
ShardReader (..),
StreamReader (..), mkShardReader,
StreamReader (..), ToOffset (..),
getLogLSN, mkShardReader,
mkStreamReader, transToStreamName)
import qualified HStream.Store as S

Expand Down Expand Up @@ -321,46 +322,6 @@ instance StreamSend API.ReadStreamResponse where
instance StreamSend API.ReadSingleShardStreamResponse where
convert = API.ReadSingleShardStreamResponse

data Offset = OffsetEarliest
| OffsetLatest
| OffsetRecordId API.RecordId
| OffsetTimestamp API.TimestampOffset
deriving (Show)

class ToOffset g where
toOffset :: g -> Offset

instance ToOffset API.ShardOffset where
toOffset offset = case fromJust . API.shardOffsetOffset $ offset of
API.ShardOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetEARLIEST)) -> OffsetEarliest
API.ShardOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetLATEST)) -> OffsetLatest
API.ShardOffsetOffsetRecordOffset rid -> OffsetRecordId rid
API.ShardOffsetOffsetTimestampOffset timestamp -> OffsetTimestamp timestamp
_ -> throw $ HE.InvalidShardOffset "UnKnownShardOffset"

instance ToOffset API.StreamOffset where
toOffset offset = case fromJust . API.streamOffsetOffset $ offset of
API.StreamOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetEARLIEST)) -> OffsetEarliest
API.StreamOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetLATEST)) -> OffsetLatest
API.StreamOffsetOffsetTimestampOffset timestamp -> OffsetTimestamp timestamp
_ -> throw $ HE.InvalidShardOffset "UnKnownShardOffset"

-- if the offset is timestampOffset, then return (LSN, Just timestamp)
-- , otherwise return (LSN, Nothing)
getLogLSN :: S.LDClient -> S.C_LogID -> Offset -> IO (S.LSN, Maybe Int64)
getLogLSN scLDClient logId offset =
case offset of
OffsetEarliest -> return (S.LSN_MIN, Nothing)
OffsetLatest -> do
startLSN <- (+ 1) <$> S.getTailLSN scLDClient logId
return (startLSN, Nothing)
OffsetRecordId API.RecordId{..} ->
return (recordIdBatchId, Nothing)
OffsetTimestamp API.TimestampOffset{..} -> do
let accuracy = if timestampOffsetStrictAccuracy then S.FindKeyStrict else S.FindKeyApproximate
startLSN <- S.findTime scLDClient logId timestampOffsetTimestampInMs accuracy
return (startLSN, Just timestampOffsetTimestampInMs)

-- Removes data that is not in the specified timestamp range.
-- If endTimestamp is set, also check if endTimestamp has been reached
filterRecords :: Maybe Int64 -> Maybe Int64 -> [S.DataRecord ByteString] -> ([S.DataRecord ByteString], Bool)
Expand Down Expand Up @@ -396,10 +357,10 @@ filterRecords startTs endTs records =
startReadingShard
:: S.LDClient
-> S.LDReader
-> T.Text -- readerId, use for logging
-> T.Text -- readerId, use for logging
-> S.C_LogID
-> Maybe Offset -- startOffset
-> Maybe Offset -- endOffset
-> Maybe ServerInternalOffset -- startOffset
-> Maybe ServerInternalOffset -- endOffset
-> IO (Maybe Int64, Maybe Int64)
startReadingShard scLDClient reader readerId rShardId rStart rEnd = do
(startLSN, sTimestamp) <- maybe (return (S.LSN_MIN, Nothing)) (getLogLSN scLDClient rShardId) rStart
Expand Down
68 changes: 59 additions & 9 deletions hstream/src/HStream/Server/Core/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ module HStream.Server.Core.Stream
, appendStream
, listShards
, getTailRecordId
, trimShard
, trimStream
) where

import Control.Concurrent (modifyMVar_)
import Control.Exception (catch, throwIO)
import Control.Monad (forM, unless, when)
import Control.Monad (forM, forM_, unless, when)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Foldable (foldl')
import qualified Data.HashMap.Strict as HM
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
Expand All @@ -36,12 +35,11 @@ import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import qualified HStream.Server.HStreamApi as API
import qualified HStream.Server.MetaData as P
import HStream.Server.Shard (Shard (..), createShard,
devideKeySpace,
mkShardWithDefaultId,
mkSharedShardMapWithShards)
import HStream.Server.Shard (createShard, devideKeySpace,
mkShardWithDefaultId)
import HStream.Server.Types (ServerContext (..),
transToStreamName)
ServerInternalOffset (..),
ToOffset (..), transToStreamName)
import qualified HStream.Stats as Stats
import qualified HStream.Store as S
import HStream.Utils
Expand Down Expand Up @@ -133,6 +131,23 @@ listStreamsWithPrefix sc@ServerContext{..} API.ListStreamsWithPrefixRequest{..}
streams <- filter (T.isPrefixOf listStreamsWithPrefixRequestPrefix . T.pack . S.showStreamName) <$> S.findStreams scLDClient S.StreamTypeStream
V.forM (V.fromList streams) (getStreamInfo sc)

trimStream
:: HasCallStack
=> ServerContext
-> T.Text
-> API.StreamOffset
-> IO ()
trimStream ServerContext{..} streamName trimPoint = do
streamExists <- S.doesStreamExist scLDClient streamId
unless streamExists $ do
Log.info $ "trimStream failed because stream " <> Log.build streamName <> " is not found."
throwIO $ HE.StreamNotFound $ "stream " <> T.pack (show streamName) <> " is not found."
shards <- M.elems <$> S.listStreamPartitions scLDClient streamId
forM_ shards $ \shardId -> do
getTrimLSN scLDClient shardId trimPoint >>= S.trim scLDClient shardId
where
streamId = transToStreamName streamName

getStreamInfo :: ServerContext -> S.StreamId -> IO API.Stream
getStreamInfo ServerContext{..} stream = do
attrs <- S.getStreamLogAttrs scLDClient stream
Expand Down Expand Up @@ -246,3 +261,38 @@ listShards ServerContext{..} API.ListShardsRequest{..} = do
endHashRangeKey <- cBytesToText <$> M.lookup endKey mp
shardEpoch <- read . CB.unpack <$> M.lookup epoch mp
return (startHashRangeKey, endHashRangeKey, shardEpoch)

trimShard
:: HasCallStack
=> ServerContext
-> Word64
-> API.ShardOffset
-> IO ()
trimShard ServerContext{..} shardId trimPoint = do
shardExists <- S.logIdHasGroup scLDClient shardId
unless shardExists $ do
Log.info $ "trimShard failed because shard " <> Log.build shardId <> " is not exist."
throwIO $ HE.ShardNotFound $ "Shard with id " <> T.pack (show shardId) <> " is not found."
getTrimLSN scLDClient shardId trimPoint >>= S.trim scLDClient shardId

--------------------------------------------------------------------------------
-- helper

getTrimLSN :: (ToOffset g, Show g) => S.LDClient -> Word64 -> g -> IO S.LSN
getTrimLSN client shardId trimPoint = do
lsn <- getLSN client shardId (toOffset trimPoint)
Log.info $ "getTrimLSN for shard " <> Log.build (show shardId)
<> ", trimPoint: " <> Log.build (show trimPoint)
<> ", lsn: " <> Log.build (show lsn)
return lsn
where
getLSN :: S.LDClient -> S.C_LogID -> ServerInternalOffset -> IO S.LSN
getLSN scLDClient logId offset =
case offset of
OffsetEarliest -> return S.LSN_MIN
OffsetLatest -> S.getTailLSN scLDClient logId
OffsetRecordId API.RecordId{..} -> return recordIdBatchId
OffsetTimestamp API.TimestampOffset{..} -> do
let accuracy = if timestampOffsetStrictAccuracy then S.FindKeyStrict else S.FindKeyApproximate
S.findTime scLDClient logId timestampOffsetTimestampInMs accuracy

2 changes: 2 additions & 0 deletions hstream/src/HStream/Server/Handler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ handlers serverContext@ServerContext{..} =
hstreamApiListStreamsWithPrefix = listStreamsWithPrefixHandler serverContext,
hstreamApiAppend = appendHandler serverContext,
hstreamApiGetTailRecordId = getTailRecordIdHandler serverContext,
hstreamApiTrimStream = trimStreamHandler serverContext,

-- Subscribe
hstreamApiCreateSubscription = createSubscriptionHandler serverContext,
Expand All @@ -58,6 +59,7 @@ handlers serverContext@ServerContext{..} =

-- Shards
hstreamApiListShards = listShardsHandler serverContext,
hstreamApiTrimShard = trimShardHandler serverContext,
-- Reader
hstreamApiListShardReaders = listShardReadersHandler serverContext,
hstreamApiCreateShardReader = createShardReaderHandler serverContext,
Expand Down
45 changes: 44 additions & 1 deletion hstream/src/HStream/Server/Handler/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,34 @@ module HStream.Server.Handler.Stream
( -- * For grpc-haskell
createStreamHandler
, deleteStreamHandler
, trimStreamHandler
, getStreamHandler
, listStreamsHandler
, listStreamsWithPrefixHandler
, listShardsHandler
, trimShardHandler
, appendHandler
, getTailRecordIdHandler
-- * For hs-grpc-server
, handleCreateStream
, handleDeleteStream
, handleTrimStream
, handleGetStream
, handleListStreams
, handleListStreamsWithPrefix
, handleAppend
, handleListShard
, handleTrimShard
, handleGetTailRecordId
) where

import Control.Exception
import Data.Maybe (fromJust)
import Data.Maybe (fromJust, isNothing)
import qualified HsGrpc.Server as G
import qualified HsGrpc.Server.Types as G
import Network.GRPC.HighLevel.Generated

import Control.Monad (when)
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import qualified HStream.Server.Core.Stream as C
Expand Down Expand Up @@ -129,6 +134,26 @@ handleListStreamsWithPrefix sc _ req = catchDefaultEx $ do
validateNameAndThrow ResStream $ listStreamsWithPrefixRequestPrefix req
ListStreamsResponse <$> C.listStreamsWithPrefix sc req

trimStreamHandler
:: ServerContext
-> ServerRequest 'Normal TrimStreamRequest Empty
-> IO (ServerResponse 'Normal Empty)
trimStreamHandler sc (ServerNormalRequest _metadata request@TrimStreamRequest{..}) = defaultExceptionHandle $ do
Log.info $ "Receive trim stream Request: " <> Log.buildString' request
validateNameAndThrow ResStream trimStreamRequestStreamName
when (isNothing trimStreamRequestTrimPoint) $
throwIO . HE.InvalidTrimPoint $ "invalid trim point: " <> show trimStreamRequestTrimPoint
C.trimStream sc trimStreamRequestStreamName (fromJust trimStreamRequestTrimPoint)
returnResp Empty

handleTrimStream :: ServerContext -> G.UnaryHandler TrimStreamRequest Empty
handleTrimStream sc _ request@TrimStreamRequest{..} = catchDefaultEx $ do
Log.info $ "Receive trim stram Request: " <> Log.buildString' request
validateNameAndThrow ResStream trimStreamRequestStreamName
when (isNothing trimStreamRequestTrimPoint) $
throwIO . HE.InvalidTrimPoint $ "invalid trim point: " <> show trimStreamRequestTrimPoint
C.trimStream sc trimStreamRequestStreamName (fromJust trimStreamRequestTrimPoint) >> pure Empty

handleGetTailRecordId :: ServerContext -> G.UnaryHandler GetTailRecordIdRequest GetTailRecordIdResponse
handleGetTailRecordId sc _ req = catchDefaultEx $ do
Log.debug $ "Receive Get TailRecordId Request: " <> Log.buildString' req
Expand Down Expand Up @@ -184,6 +209,24 @@ handleListShard sc _ req = listShardsExHandle $ do
validateNameAndThrow ResStream $ listShardsRequestStreamName req
ListShardsResponse <$> C.listShards sc req

trimShardHandler
:: ServerContext
-> ServerRequest 'Normal TrimShardRequest Empty
-> IO (ServerResponse 'Normal Empty)
trimShardHandler sc (ServerNormalRequest _metadata request@TrimShardRequest{..}) = defaultExceptionHandle $ do
Log.info $ "Receive trim shard Request: " <> Log.buildString' request
when (isNothing trimShardRequestTrimPoint) $
throwIO . HE.InvalidTrimPoint $ "invalid trim point: " <> show trimShardRequestTrimPoint
C.trimShard sc trimShardRequestShardId (fromJust trimShardRequestTrimPoint)
returnResp Empty

handleTrimShard :: ServerContext -> G.UnaryHandler TrimShardRequest Empty
handleTrimShard sc _ request@TrimShardRequest{..} = catchDefaultEx $ do
Log.info $ "Receive trim shard Request: " <> Log.buildString' request
when (isNothing trimShardRequestTrimPoint) $
throwIO . HE.InvalidTrimPoint $ "invalid trim point: " <> show trimShardRequestTrimPoint
C.trimShard sc trimShardRequestShardId (fromJust trimShardRequestTrimPoint) >> pure Empty

--------------------------------------------------------------------------------
-- Exception Handlers

Expand Down
2 changes: 2 additions & 0 deletions hstream/src/HStream/Server/HsGrpcHandler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ handlers sc =
, unary (GRPC :: GRPC P.HStreamApi "getStream") (H.handleGetStream sc)
, unary (GRPC :: GRPC P.HStreamApi "listStreams") (H.handleListStreams sc)
, unary (GRPC :: GRPC P.HStreamApi "listStreamsWithPrefix") (H.handleListStreamsWithPrefix sc)
, unary (GRPC :: GRPC P.HStreamApi "trimStream") (H.handleTrimStream sc)
, unary (GRPC :: GRPC P.HStreamApi "listShards") (H.handleListShard sc)
, unary (GRPC :: GRPC P.HStreamApi "trimShard") (H.handleTrimShard sc)
, unary (GRPC :: GRPC P.HStreamApi "getTailRecordId") (H.handleGetTailRecordId sc)
-- Reader
, unary (GRPC :: GRPC P.HStreamApi "listShardReaders") (H.handleListShardReaders sc)
Expand Down
Loading

0 comments on commit bdec836

Please sign in to comment.