From bdec836008296760b905d9d183cda9f57f721223 Mon Sep 17 00:00:00 2001 From: YangKian <45479280+YangKian@users.noreply.github.com> Date: Wed, 19 Jul 2023 14:25:28 +0800 Subject: [PATCH] add trimStream && trimShard rpc (#1516) --- common/hstream/HStream/Exception.hs | 4 +- external/protocol | 2 +- hstream-store/test/HStream/StoreSpec.hs | 2 + hstream/src/HStream/Server/ConnectorTypes.hs | 8 +-- .../src/HStream/Server/Core/ShardReader.hs | 55 +++------------ hstream/src/HStream/Server/Core/Stream.hs | 68 ++++++++++++++++--- hstream/src/HStream/Server/Handler.hs | 2 + hstream/src/HStream/Server/Handler/Stream.hs | 45 +++++++++++- hstream/src/HStream/Server/HsGrpcHandler.hs | 2 + hstream/src/HStream/Server/Types.hs | 66 ++++++++++++++---- 10 files changed, 176 insertions(+), 78 deletions(-) diff --git a/common/hstream/HStream/Exception.hs b/common/hstream/HStream/Exception.hs index 070909c57..e24461661 100644 --- a/common/hstream/HStream/Exception.hs +++ b/common/hstream/HStream/Exception.hs @@ -51,6 +51,7 @@ module HStream.Exception , SQLNotSupportedByParseSQL(SQLNotSupportedByParseSQL) , ConflictShardReaderOffset (ConflictShardReaderOffset) , TooManyShardCount (TooManyShardCount) + , InvalidTrimPoint (InvalidTrimPoint) -- * Exception: SomeDeadlineExceeded -- @@ -417,7 +418,8 @@ MAKE_EX_0(SomeInvalidArgument, EmptyBatchedRecord, API.ErrorCodeStreamEmptyBatch MAKE_EX_1(SomeInvalidArgument, InvalidRecordSize, Int, API.ErrorCodeStreamInvalidRecordSize, "Record size exceeds the maximum size limit") MAKE_EX_1_DEFMSG(SomeInvalidArgument, InvalidResourceType, String, API.ErrorCodeInternalError) -MAKE_EX_1_DEFMSG(SomeInvalidArgument, InvalidShardOffset, String, API.ErrorCodeInternalError) +MAKE_EX_1_DEFMSG(SomeInvalidArgument, InvalidShardOffset, String, API.ErrorCodeStreamInvalidOffset) +MAKE_EX_1_DEFMSG(SomeInvalidArgument, InvalidTrimPoint, String, API.ErrorCodeStreamInvalidOffset) MAKE_EX_0_DEFMSG(SomeInvalidArgument, InvalidSubscriptionOffset, API.ErrorCodeSubscriptionInvalidOffset) MAKE_EX_1_DEFMSG(SomeInvalidArgument, DecodeHStreamRecordErr, String, API.ErrorCodeInternalError) MAKE_EX_0(SomeInvalidArgument, NoRecordHeader, API.ErrorCodeInternalError, diff --git a/external/protocol b/external/protocol index dcc6de1a1..e43618076 160000 --- a/external/protocol +++ b/external/protocol @@ -1 +1 @@ -Subproject commit dcc6de1a1ebfc68edb3c7c0fd905ba3b32ebe801 +Subproject commit e436180760fa37d5ce691027cebc2f3ad4726e4d diff --git a/hstream-store/test/HStream/StoreSpec.hs b/hstream-store/test/HStream/StoreSpec.hs index 144a90806..0772bd9af 100644 --- a/hstream-store/test/HStream/StoreSpec.hs +++ b/hstream-store/test/HStream/StoreSpec.hs @@ -33,6 +33,8 @@ base = describe "Base" $ do S.trim client logid sn0 readPayload' logid (Just sn0) `shouldReturn` [] readPayload logid (Just sn1) `shouldReturn` "world" + -- trim lsn beyond tailLSN should fail + S.trim client logid (sn1 + 1) `shouldThrow` anyException loggroupAround' $ do it "trim last" $ \(_lgname, ranlogid) -> do diff --git a/hstream/src/HStream/Server/ConnectorTypes.hs b/hstream/src/HStream/Server/ConnectorTypes.hs index a802c72c3..0516d7e41 100644 --- a/hstream/src/HStream/Server/ConnectorTypes.hs +++ b/hstream/src/HStream/Server/ConnectorTypes.hs @@ -1,6 +1,4 @@ -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE StrictData #-} +{-# LANGUAGE StrictData #-} module HStream.Server.ConnectorTypes ( SourceConnector (..), @@ -25,8 +23,6 @@ import Data.Time.Clock.POSIX import Data.Word (Word64) import qualified HStream.Server.HStreamApi as API -import qualified HStream.Server.HStreamApi as API - type Timestamp = Int64 -- ms type StreamName = T.Text @@ -90,7 +86,7 @@ data SourceConnectorWithoutCkp = SourceConnectorWithoutCkp -> IO () } -data SinkConnector = SinkConnector +newtype SinkConnector = SinkConnector { writeRecord :: (BL.ByteString -> Maybe BL.ByteString) -> (BL.ByteString -> Maybe BL.ByteString) -> SinkRecord diff --git a/hstream/src/HStream/Server/Core/ShardReader.hs b/hstream/src/HStream/Server/Core/ShardReader.hs index 484c4e9ec..fbe75f257 100644 --- a/hstream/src/HStream/Server/Core/ShardReader.hs +++ b/hstream/src/HStream/Server/Core/ShardReader.hs @@ -13,12 +13,11 @@ module HStream.Server.Core.ShardReader where import Data.Functor ((<&>)) -import Proto3.Suite (Enumerated (Enumerated)) import ZooKeeper.Exception (ZNONODE (..), throwIO) import Control.Concurrent (modifyMVar_, newEmptyMVar, putMVar, readMVar, takeMVar, withMVar) -import Control.Exception (bracket, catch, throw) +import Control.Exception (bracket, catch) import Control.Monad (forM, forM_, unless, when) import Data.ByteString (ByteString) import Data.Either (isRight) @@ -28,7 +27,7 @@ import Data.Int (Int64) import Data.IORef (IORef, newIORef, readIORef, writeIORef) import qualified Data.Map.Strict as M -import Data.Maybe (fromJust, isJust) +import Data.Maybe (isJust) import qualified Data.Text as T import Data.Vector (Vector) import qualified Data.Vector as V @@ -43,8 +42,10 @@ import HStream.Server.HStreamApi (CreateShardReaderRequest (..)) import qualified HStream.Server.HStreamApi as API import qualified HStream.Server.MetaData as P import HStream.Server.Types (ServerContext (..), + ServerInternalOffset, ShardReader (..), - StreamReader (..), mkShardReader, + StreamReader (..), ToOffset (..), + getLogLSN, mkShardReader, mkStreamReader, transToStreamName) import qualified HStream.Store as S @@ -321,46 +322,6 @@ instance StreamSend API.ReadStreamResponse where instance StreamSend API.ReadSingleShardStreamResponse where convert = API.ReadSingleShardStreamResponse -data Offset = OffsetEarliest - | OffsetLatest - | OffsetRecordId API.RecordId - | OffsetTimestamp API.TimestampOffset - deriving (Show) - -class ToOffset g where - toOffset :: g -> Offset - -instance ToOffset API.ShardOffset where - toOffset offset = case fromJust . API.shardOffsetOffset $ offset of - API.ShardOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetEARLIEST)) -> OffsetEarliest - API.ShardOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetLATEST)) -> OffsetLatest - API.ShardOffsetOffsetRecordOffset rid -> OffsetRecordId rid - API.ShardOffsetOffsetTimestampOffset timestamp -> OffsetTimestamp timestamp - _ -> throw $ HE.InvalidShardOffset "UnKnownShardOffset" - -instance ToOffset API.StreamOffset where - toOffset offset = case fromJust . API.streamOffsetOffset $ offset of - API.StreamOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetEARLIEST)) -> OffsetEarliest - API.StreamOffsetOffsetSpecialOffset (Enumerated (Right API.SpecialOffsetLATEST)) -> OffsetLatest - API.StreamOffsetOffsetTimestampOffset timestamp -> OffsetTimestamp timestamp - _ -> throw $ HE.InvalidShardOffset "UnKnownShardOffset" - --- if the offset is timestampOffset, then return (LSN, Just timestamp) --- , otherwise return (LSN, Nothing) -getLogLSN :: S.LDClient -> S.C_LogID -> Offset -> IO (S.LSN, Maybe Int64) -getLogLSN scLDClient logId offset = - case offset of - OffsetEarliest -> return (S.LSN_MIN, Nothing) - OffsetLatest -> do - startLSN <- (+ 1) <$> S.getTailLSN scLDClient logId - return (startLSN, Nothing) - OffsetRecordId API.RecordId{..} -> - return (recordIdBatchId, Nothing) - OffsetTimestamp API.TimestampOffset{..} -> do - let accuracy = if timestampOffsetStrictAccuracy then S.FindKeyStrict else S.FindKeyApproximate - startLSN <- S.findTime scLDClient logId timestampOffsetTimestampInMs accuracy - return (startLSN, Just timestampOffsetTimestampInMs) - -- Removes data that is not in the specified timestamp range. -- If endTimestamp is set, also check if endTimestamp has been reached filterRecords :: Maybe Int64 -> Maybe Int64 -> [S.DataRecord ByteString] -> ([S.DataRecord ByteString], Bool) @@ -396,10 +357,10 @@ filterRecords startTs endTs records = startReadingShard :: S.LDClient -> S.LDReader - -> T.Text -- readerId, use for logging + -> T.Text -- readerId, use for logging -> S.C_LogID - -> Maybe Offset -- startOffset - -> Maybe Offset -- endOffset + -> Maybe ServerInternalOffset -- startOffset + -> Maybe ServerInternalOffset -- endOffset -> IO (Maybe Int64, Maybe Int64) startReadingShard scLDClient reader readerId rShardId rStart rEnd = do (startLSN, sTimestamp) <- maybe (return (S.LSN_MIN, Nothing)) (getLogLSN scLDClient rShardId) rStart diff --git a/hstream/src/HStream/Server/Core/Stream.hs b/hstream/src/HStream/Server/Core/Stream.hs index 8de3f4687..69ad3eda0 100644 --- a/hstream/src/HStream/Server/Core/Stream.hs +++ b/hstream/src/HStream/Server/Core/Stream.hs @@ -13,15 +13,14 @@ module HStream.Server.Core.Stream , appendStream , listShards , getTailRecordId + , trimShard + , trimStream ) where -import Control.Concurrent (modifyMVar_) import Control.Exception (catch, throwIO) -import Control.Monad (forM, unless, when) +import Control.Monad (forM, forM_, unless, when) import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL -import Data.Foldable (foldl') -import qualified Data.HashMap.Strict as HM import qualified Data.Map.Strict as M import Data.Maybe (fromMaybe) import qualified Data.Text as T @@ -36,12 +35,11 @@ import qualified HStream.Exception as HE import qualified HStream.Logger as Log import qualified HStream.Server.HStreamApi as API import qualified HStream.Server.MetaData as P -import HStream.Server.Shard (Shard (..), createShard, - devideKeySpace, - mkShardWithDefaultId, - mkSharedShardMapWithShards) +import HStream.Server.Shard (createShard, devideKeySpace, + mkShardWithDefaultId) import HStream.Server.Types (ServerContext (..), - transToStreamName) + ServerInternalOffset (..), + ToOffset (..), transToStreamName) import qualified HStream.Stats as Stats import qualified HStream.Store as S import HStream.Utils @@ -133,6 +131,23 @@ listStreamsWithPrefix sc@ServerContext{..} API.ListStreamsWithPrefixRequest{..} streams <- filter (T.isPrefixOf listStreamsWithPrefixRequestPrefix . T.pack . S.showStreamName) <$> S.findStreams scLDClient S.StreamTypeStream V.forM (V.fromList streams) (getStreamInfo sc) +trimStream + :: HasCallStack + => ServerContext + -> T.Text + -> API.StreamOffset + -> IO () +trimStream ServerContext{..} streamName trimPoint = do + streamExists <- S.doesStreamExist scLDClient streamId + unless streamExists $ do + Log.info $ "trimStream failed because stream " <> Log.build streamName <> " is not found." + throwIO $ HE.StreamNotFound $ "stream " <> T.pack (show streamName) <> " is not found." + shards <- M.elems <$> S.listStreamPartitions scLDClient streamId + forM_ shards $ \shardId -> do + getTrimLSN scLDClient shardId trimPoint >>= S.trim scLDClient shardId + where + streamId = transToStreamName streamName + getStreamInfo :: ServerContext -> S.StreamId -> IO API.Stream getStreamInfo ServerContext{..} stream = do attrs <- S.getStreamLogAttrs scLDClient stream @@ -246,3 +261,38 @@ listShards ServerContext{..} API.ListShardsRequest{..} = do endHashRangeKey <- cBytesToText <$> M.lookup endKey mp shardEpoch <- read . CB.unpack <$> M.lookup epoch mp return (startHashRangeKey, endHashRangeKey, shardEpoch) + +trimShard + :: HasCallStack + => ServerContext + -> Word64 + -> API.ShardOffset + -> IO () +trimShard ServerContext{..} shardId trimPoint = do + shardExists <- S.logIdHasGroup scLDClient shardId + unless shardExists $ do + Log.info $ "trimShard failed because shard " <> Log.build shardId <> " is not exist." + throwIO $ HE.ShardNotFound $ "Shard with id " <> T.pack (show shardId) <> " is not found." + getTrimLSN scLDClient shardId trimPoint >>= S.trim scLDClient shardId + +-------------------------------------------------------------------------------- +-- helper + +getTrimLSN :: (ToOffset g, Show g) => S.LDClient -> Word64 -> g -> IO S.LSN +getTrimLSN client shardId trimPoint = do + lsn <- getLSN client shardId (toOffset trimPoint) + Log.info $ "getTrimLSN for shard " <> Log.build (show shardId) + <> ", trimPoint: " <> Log.build (show trimPoint) + <> ", lsn: " <> Log.build (show lsn) + return lsn + where + getLSN :: S.LDClient -> S.C_LogID -> ServerInternalOffset -> IO S.LSN + getLSN scLDClient logId offset = + case offset of + OffsetEarliest -> return S.LSN_MIN + OffsetLatest -> S.getTailLSN scLDClient logId + OffsetRecordId API.RecordId{..} -> return recordIdBatchId + OffsetTimestamp API.TimestampOffset{..} -> do + let accuracy = if timestampOffsetStrictAccuracy then S.FindKeyStrict else S.FindKeyApproximate + S.findTime scLDClient logId timestampOffsetTimestampInMs accuracy + diff --git a/hstream/src/HStream/Server/Handler.hs b/hstream/src/HStream/Server/Handler.hs index 513153b71..3a77b5da4 100644 --- a/hstream/src/HStream/Server/Handler.hs +++ b/hstream/src/HStream/Server/Handler.hs @@ -44,6 +44,7 @@ handlers serverContext@ServerContext{..} = hstreamApiListStreamsWithPrefix = listStreamsWithPrefixHandler serverContext, hstreamApiAppend = appendHandler serverContext, hstreamApiGetTailRecordId = getTailRecordIdHandler serverContext, + hstreamApiTrimStream = trimStreamHandler serverContext, -- Subscribe hstreamApiCreateSubscription = createSubscriptionHandler serverContext, @@ -58,6 +59,7 @@ handlers serverContext@ServerContext{..} = -- Shards hstreamApiListShards = listShardsHandler serverContext, + hstreamApiTrimShard = trimShardHandler serverContext, -- Reader hstreamApiListShardReaders = listShardReadersHandler serverContext, hstreamApiCreateShardReader = createShardReaderHandler serverContext, diff --git a/hstream/src/HStream/Server/Handler/Stream.hs b/hstream/src/HStream/Server/Handler/Stream.hs index 41b9ad77d..351a4ab0b 100644 --- a/hstream/src/HStream/Server/Handler/Stream.hs +++ b/hstream/src/HStream/Server/Handler/Stream.hs @@ -10,29 +10,34 @@ module HStream.Server.Handler.Stream ( -- * For grpc-haskell createStreamHandler , deleteStreamHandler + , trimStreamHandler , getStreamHandler , listStreamsHandler , listStreamsWithPrefixHandler , listShardsHandler + , trimShardHandler , appendHandler , getTailRecordIdHandler -- * For hs-grpc-server , handleCreateStream , handleDeleteStream + , handleTrimStream , handleGetStream , handleListStreams , handleListStreamsWithPrefix , handleAppend , handleListShard + , handleTrimShard , handleGetTailRecordId ) where import Control.Exception -import Data.Maybe (fromJust) +import Data.Maybe (fromJust, isNothing) import qualified HsGrpc.Server as G import qualified HsGrpc.Server.Types as G import Network.GRPC.HighLevel.Generated +import Control.Monad (when) import qualified HStream.Exception as HE import qualified HStream.Logger as Log import qualified HStream.Server.Core.Stream as C @@ -129,6 +134,26 @@ handleListStreamsWithPrefix sc _ req = catchDefaultEx $ do validateNameAndThrow ResStream $ listStreamsWithPrefixRequestPrefix req ListStreamsResponse <$> C.listStreamsWithPrefix sc req +trimStreamHandler + :: ServerContext + -> ServerRequest 'Normal TrimStreamRequest Empty + -> IO (ServerResponse 'Normal Empty) +trimStreamHandler sc (ServerNormalRequest _metadata request@TrimStreamRequest{..}) = defaultExceptionHandle $ do + Log.info $ "Receive trim stream Request: " <> Log.buildString' request + validateNameAndThrow ResStream trimStreamRequestStreamName + when (isNothing trimStreamRequestTrimPoint) $ + throwIO . HE.InvalidTrimPoint $ "invalid trim point: " <> show trimStreamRequestTrimPoint + C.trimStream sc trimStreamRequestStreamName (fromJust trimStreamRequestTrimPoint) + returnResp Empty + +handleTrimStream :: ServerContext -> G.UnaryHandler TrimStreamRequest Empty +handleTrimStream sc _ request@TrimStreamRequest{..} = catchDefaultEx $ do + Log.info $ "Receive trim stram Request: " <> Log.buildString' request + validateNameAndThrow ResStream trimStreamRequestStreamName + when (isNothing trimStreamRequestTrimPoint) $ + throwIO . HE.InvalidTrimPoint $ "invalid trim point: " <> show trimStreamRequestTrimPoint + C.trimStream sc trimStreamRequestStreamName (fromJust trimStreamRequestTrimPoint) >> pure Empty + handleGetTailRecordId :: ServerContext -> G.UnaryHandler GetTailRecordIdRequest GetTailRecordIdResponse handleGetTailRecordId sc _ req = catchDefaultEx $ do Log.debug $ "Receive Get TailRecordId Request: " <> Log.buildString' req @@ -184,6 +209,24 @@ handleListShard sc _ req = listShardsExHandle $ do validateNameAndThrow ResStream $ listShardsRequestStreamName req ListShardsResponse <$> C.listShards sc req +trimShardHandler + :: ServerContext + -> ServerRequest 'Normal TrimShardRequest Empty + -> IO (ServerResponse 'Normal Empty) +trimShardHandler sc (ServerNormalRequest _metadata request@TrimShardRequest{..}) = defaultExceptionHandle $ do + Log.info $ "Receive trim shard Request: " <> Log.buildString' request + when (isNothing trimShardRequestTrimPoint) $ + throwIO . HE.InvalidTrimPoint $ "invalid trim point: " <> show trimShardRequestTrimPoint + C.trimShard sc trimShardRequestShardId (fromJust trimShardRequestTrimPoint) + returnResp Empty + +handleTrimShard :: ServerContext -> G.UnaryHandler TrimShardRequest Empty +handleTrimShard sc _ request@TrimShardRequest{..} = catchDefaultEx $ do + Log.info $ "Receive trim shard Request: " <> Log.buildString' request + when (isNothing trimShardRequestTrimPoint) $ + throwIO . HE.InvalidTrimPoint $ "invalid trim point: " <> show trimShardRequestTrimPoint + C.trimShard sc trimShardRequestShardId (fromJust trimShardRequestTrimPoint) >> pure Empty + -------------------------------------------------------------------------------- -- Exception Handlers diff --git a/hstream/src/HStream/Server/HsGrpcHandler.hs b/hstream/src/HStream/Server/HsGrpcHandler.hs index 1f0721220..d62a1016c 100644 --- a/hstream/src/HStream/Server/HsGrpcHandler.hs +++ b/hstream/src/HStream/Server/HsGrpcHandler.hs @@ -36,7 +36,9 @@ handlers sc = , unary (GRPC :: GRPC P.HStreamApi "getStream") (H.handleGetStream sc) , unary (GRPC :: GRPC P.HStreamApi "listStreams") (H.handleListStreams sc) , unary (GRPC :: GRPC P.HStreamApi "listStreamsWithPrefix") (H.handleListStreamsWithPrefix sc) + , unary (GRPC :: GRPC P.HStreamApi "trimStream") (H.handleTrimStream sc) , unary (GRPC :: GRPC P.HStreamApi "listShards") (H.handleListShard sc) + , unary (GRPC :: GRPC P.HStreamApi "trimShard") (H.handleTrimShard sc) , unary (GRPC :: GRPC P.HStreamApi "getTailRecordId") (H.handleGetTailRecordId sc) -- Reader , unary (GRPC :: GRPC P.HStreamApi "listShardReaders") (H.handleListShardReaders sc) diff --git a/hstream/src/HStream/Server/Types.hs b/hstream/src/HStream/Server/Types.hs index 14e25a90c..8d2e181bf 100644 --- a/hstream/src/HStream/Server/Types.hs +++ b/hstream/src/HStream/Server/Types.hs @@ -30,19 +30,19 @@ import qualified Proto3.Suite as PB #if __GLASGOW_HASKELL__ < 902 import qualified HStream.Admin.Store.API as AA #endif +import Control.Exception (throw) import Data.IORef (IORef) +import Data.Maybe (fromJust) import HStream.Base.Timer (CompactedWorker) import HStream.Common.ConsistentHashing (HashRing) +import qualified HStream.Exception as HE import HStream.Gossip.Types (Epoch, GossipContext) import qualified HStream.IO.Types as IO import qualified HStream.IO.Worker as IO import HStream.MetaStore.Types (MetaHandle) import HStream.Server.Config -import HStream.Server.ConnectorTypes as HCT -import HStream.Server.HStreamApi (NodeState, ResourceType, - SpecialOffset (..), - StreamingFetchResponse, - Subscription (..)) +import qualified HStream.Server.ConnectorTypes as HCT +import qualified HStream.Server.HStreamApi as API import HStream.Server.Shard (ShardKey, SharedShardMap) import qualified HStream.Stats as Stats import qualified HStream.Store as HS @@ -58,7 +58,7 @@ serverVersion :: Text serverVersion = "0.9.1" data SubscriptionWrap = SubscriptionWrap - { originSub :: Subscription + { originSub :: API.Subscription , subOffsets :: HM.HashMap S.C_LogID S.LSN } deriving (Generic, Show, FromJSON, ToJSON) @@ -68,11 +68,11 @@ renderSubscriptionWrapToTable subs = rows = map formatSubscriptionWrap subs in Aeson.object ["headers" Aeson..= headers, "rows" Aeson..= rows] where - formatSubscriptionWrap SubscriptionWrap{originSub=Subscription{..}, ..} = + formatSubscriptionWrap SubscriptionWrap{originSub=API.Subscription{..}, ..} = let offset = case subscriptionOffset of - (PB.Enumerated (Right SpecialOffsetEARLIEST)) -> "EARLIEST" - (PB.Enumerated (Right SpecialOffsetLATEST)) -> "LATEST" - _ -> "UNKNOWN" + (PB.Enumerated (Right API.SpecialOffsetEARLIEST)) -> "EARLIEST" + (PB.Enumerated (Right API.SpecialOffsetLATEST)) -> "LATEST" + _ -> "UNKNOWN" in [ subscriptionSubscriptionId , subscriptionStreamName , T.pack . show $ subscriptionAckTimeoutSeconds @@ -84,7 +84,7 @@ renderSubscriptionWrapToTable subs = type Timestamp = Int64 type ServerID = Word32 -type ServerState = PB.Enumerated NodeState +type ServerState = PB.Enumerated API.NodeState type ShardDict = M.Map ShardKey HS.C_LogID data ServerContext = ServerContext @@ -133,7 +133,7 @@ data SubscribeContext = SubscribeContext , subStreamName :: !T.Text , subAckTimeoutSeconds :: !Int32 , subMaxUnackedRecords :: !Word32 - , subStartOffset :: !(PB.Enumerated SpecialOffset) + , subStartOffset :: !(PB.Enumerated API.SpecialOffset) , subLdCkpReader :: !HS.LDSyncCkpReader , subLdTrimCkpWorker :: !CompactedWorker , subLdReader :: !(MVar HS.LDReader) @@ -180,7 +180,7 @@ data ConsumerContext = ConsumerContext ccIsValid :: TVar Bool, -- use MVar for streamSend because only on thread can use streamSend at the -- same time - ccStreamSend :: MVar (StreamSend StreamingFetchResponse), + ccStreamSend :: MVar (StreamSend API.StreamingFetchResponse), -- threadId of the thread handling streamingFetchRequest for this consumer ccThreadId :: ThreadId } @@ -274,6 +274,46 @@ data StreamReader = StreamReader mkStreamReader :: S.LDReader -> Maybe (IORef Word64) -> HashMap S.C_LogID (Maybe Int64, Maybe Int64) -> StreamReader mkStreamReader streamReader streamReaderTotalBatches streamReaderTsLimits = StreamReader {..} +data ServerInternalOffset = OffsetEarliest + | OffsetLatest + | OffsetRecordId API.RecordId + | OffsetTimestamp API.TimestampOffset + deriving (Show) + +class ToOffset g where + toOffset :: g -> ServerInternalOffset + +instance ToOffset API.ShardOffset where + toOffset offset = case fromJust . API.shardOffsetOffset $ offset of + API.ShardOffsetOffsetSpecialOffset (PB.Enumerated (Right API.SpecialOffsetEARLIEST)) -> OffsetEarliest + API.ShardOffsetOffsetSpecialOffset (PB.Enumerated (Right API.SpecialOffsetLATEST)) -> OffsetLatest + API.ShardOffsetOffsetRecordOffset rid -> OffsetRecordId rid + API.ShardOffsetOffsetTimestampOffset timestamp -> OffsetTimestamp timestamp + _ -> throw $ HE.InvalidShardOffset "UnKnownShardOffset" + +instance ToOffset API.StreamOffset where + toOffset offset = case fromJust . API.streamOffsetOffset $ offset of + API.StreamOffsetOffsetSpecialOffset (PB.Enumerated (Right API.SpecialOffsetEARLIEST)) -> OffsetEarliest + API.StreamOffsetOffsetSpecialOffset (PB.Enumerated (Right API.SpecialOffsetLATEST)) -> OffsetLatest + API.StreamOffsetOffsetTimestampOffset timestamp -> OffsetTimestamp timestamp + _ -> throw $ HE.InvalidShardOffset "UnKnownShardOffset" + +-- if the offset is timestampOffset, then return (LSN, Just timestamp) +-- , otherwise return (LSN, Nothing) +getLogLSN :: S.LDClient -> S.C_LogID -> ServerInternalOffset -> IO (S.LSN, Maybe Int64) +getLogLSN scLDClient logId offset = + case offset of + OffsetEarliest -> return (S.LSN_MIN, Nothing) + OffsetLatest -> do + startLSN <- (+ 1) <$> S.getTailLSN scLDClient logId + return (startLSN, Nothing) + OffsetRecordId API.RecordId{..} -> + return (recordIdBatchId, Nothing) + OffsetTimestamp API.TimestampOffset{..} -> do + let accuracy = if timestampOffsetStrictAccuracy then S.FindKeyStrict else S.FindKeyApproximate + startLSN <- S.findTime scLDClient logId timestampOffsetTimestampInMs accuracy + return (startLSN, Just timestampOffsetTimestampInMs) + -------------------------------------------------------------------------------- transToStreamName :: HCT.StreamName -> S.StreamId