Skip to content

Commit

Permalink
fix delete sub validation (#1160)
Browse files Browse the repository at this point in the history
  • Loading branch information
Time-Hu authored Nov 24, 2022
1 parent afef576 commit 9bcb101
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 140 deletions.
88 changes: 22 additions & 66 deletions hstream/src/HStream/Server/Core/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,30 @@ module HStream.Server.Core.Cluster
, lookupConnector
) where

import Control.Concurrent (tryReadMVar)
import Control.Concurrent.STM (readTVarIO)
import Control.Exception (SomeException, throwIO, try)
import qualified Data.Map.Strict as Map
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Vector as V
import Proto3.Suite (Enumerated (..))
import Control.Concurrent (tryReadMVar)
import Control.Concurrent.STM (readTVarIO)
import Control.Exception (throwIO)
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import qualified Data.Vector as V
import Proto3.Suite (Enumerated (..))

import HStream.Common.ConsistentHashing (HashRing, getAllocatedNode)
import HStream.Common.Types (fromInternalServerNodeWithKey)
import qualified HStream.Exception as HE
import HStream.Gossip (GossipContext (..), getEpoch,
getFailedNodes,
getMemberList)
import HStream.Gossip.Types (ServerStatus (..))
import qualified HStream.Logger as Log
import HStream.MetaStore.Types (MetaStore (..))
import HStream.Server.Core.Common (mkAllocationKey)
import HStream.Common.Types (fromInternalServerNodeWithKey)
import qualified HStream.Exception as HE
import HStream.Gossip (GossipContext (..),
getFailedNodes)
import HStream.Gossip.Types (ServerStatus (..))
import qualified HStream.Logger as Log
import HStream.MetaStore.Types (MetaStore (..))
import HStream.Server.Core.Common (getResNode, lookupResource')
import HStream.Server.HStreamApi
import HStream.Server.MetaData (TaskAllocation (..))
import HStream.Server.MetaData.Value (clusterStartTimeId)
import HStream.Server.Types (ServerContext (..))
import qualified HStream.Server.Types as Types
import qualified HStream.ThirdParty.Protobuf as Proto
import HStream.Utils (ResourceType (..),
getProtoTimestamp,
pattern EnumPB)
import HStream.Server.MetaData.Value (clusterStartTimeId)
import HStream.Server.Types (ServerContext (..))
import qualified HStream.Server.Types as Types
import qualified HStream.ThirdParty.Protobuf as Proto
import HStream.Utils (ResourceType (..),
getProtoTimestamp,
pattern EnumPB)

describeCluster :: ServerContext -> IO DescribeClusterResponse
describeCluster ServerContext{gossipContext = gc@GossipContext{..}, ..} = do
Expand Down Expand Up @@ -121,43 +117,3 @@ lookupShardReader sc req@LookupShardReaderRequest{lookupShardReaderRequestReader
{ lookupShardReaderResponseReaderId = readerId
, lookupShardReaderResponseServerNode = Just theNode
}

-------------------------------------------------------------------------------

lookupResource' :: ServerContext -> ResourceType -> Text -> IO ServerNode
lookupResource' sc@ServerContext{..} rtype rid = do
let metaId = mkAllocationKey rtype rid
-- FIXME: it will insert the results of lookup no matter the resource exists or not
getMetaWithVer @TaskAllocation metaId metaHandle >>= \case
Nothing -> do
hashRing <- readTVarIO loadBalanceHashRing
epoch <- getEpoch gossipContext
theNode <- getResNode hashRing rid scAdvertisedListenersKey
try (insertMeta @TaskAllocation metaId (TaskAllocation epoch theNode) metaHandle) >>=
\case
Left (_e :: SomeException) -> lookupResource' sc rtype rid
Right () -> return theNode
Just (TaskAllocation epoch theNode, version) -> do
serverList <- getMemberList gossipContext >>= fmap V.concat . mapM (fromInternalServerNodeWithKey scAdvertisedListenersKey)
epoch' <- getEpoch gossipContext
if theNode `V.elem` serverList
then return theNode
else do
if epoch' > epoch
then do
hashRing <- readTVarIO loadBalanceHashRing
theNode' <- getResNode hashRing rid scAdvertisedListenersKey
try (updateMeta @TaskAllocation metaId (TaskAllocation epoch' theNode') (Just version) metaHandle) >>=
\case
Left (_e :: SomeException) -> lookupResource' sc rtype rid
Right () -> return theNode'
else do
Log.warning "LookupResource: the server has not yet synced with the latest member list "
throwIO $ HE.ResourceAllocationException "the server has not yet synced with the latest member list"

getResNode :: HashRing -> Text -> Maybe Text -> IO ServerNode
getResNode hashRing hashKey listenerKey = do
let serverNode = getAllocatedNode hashRing hashKey
theNodes <- fromInternalServerNodeWithKey listenerKey serverNode
if V.null theNodes then throwIO $ HE.NodesNotFound "Got empty nodes"
else pure $ V.head theNodes
77 changes: 59 additions & 18 deletions hstream/src/HStream/Server/Core/Common.hs
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
module HStream.Server.Core.Common where

import Control.Concurrent
import Control.Exception (SomeException (..), throwIO, try)
import Control.Concurrent.STM (readTVarIO)
import Control.Exception (SomeException (..), throwIO,
try)
import Control.Monad
import qualified Data.ByteString as BS
import Data.Foldable (foldrM)
import qualified Data.HashMap.Strict as HM
import qualified Data.List as L
import qualified Data.Map.Strict as Map
import qualified Data.Vector as V
import Data.Word (Word32, Word64)

import qualified Data.Text as T
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as M
import qualified Data.ByteString as BS
import Data.Foldable (foldrM)
import qualified Data.HashMap.Strict as HM
import qualified Data.Map.Strict as Map
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Word (Word32, Word64)

import HStream.Common.ConsistentHashing
import HStream.Common.Types (fromInternalServerNodeWithKey)
import qualified HStream.Exception as HE
import HStream.Gossip
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as M
import HStream.Server.HStreamApi
import qualified HStream.Server.MetaData as P
import qualified HStream.Server.MetaData as P
import HStream.Server.Types
import HStream.SQL.Codegen
import qualified HStream.Store as HS
import HStream.ThirdParty.Protobuf (Empty (Empty))
import HStream.Utils (TaskStatus (..), cBytesToText,
decodeByteStringBatch)
import qualified HStream.Store as HS
import HStream.Utils (decodeByteStringBatch)

-- deleteStoreStream
-- :: ServerContext
Expand Down Expand Up @@ -222,3 +225,41 @@ handleQueryTerminate ServerContext{..} (ManyQueries qids) = do

mkAllocationKey :: ResourceType -> T.Text -> T.Text
mkAllocationKey rtype rid = T.pack (show rtype) <> "_" <> rid

lookupResource' :: ServerContext -> ResourceType -> Text -> IO ServerNode
lookupResource' sc@ServerContext{..} rtype rid = do
let metaId = mkAllocationKey rtype rid
-- FIXME: it will insert the results of lookup no matter the resource exists or not
M.getMetaWithVer @P.TaskAllocation metaId metaHandle >>= \case
Nothing -> do
hashRing <- readTVarIO loadBalanceHashRing
epoch <- getEpoch gossipContext
theNode <- getResNode hashRing rid scAdvertisedListenersKey
try (M.insertMeta @P.TaskAllocation metaId (P.TaskAllocation epoch theNode) metaHandle) >>=
\case
Left (_e :: SomeException) -> lookupResource' sc rtype rid
Right () -> return theNode
Just (P.TaskAllocation epoch theNode, version) -> do
serverList <- getMemberList gossipContext >>= fmap V.concat . mapM (fromInternalServerNodeWithKey scAdvertisedListenersKey)
epoch' <- getEpoch gossipContext
if theNode `V.elem` serverList
then return theNode
else do
if epoch' > epoch
then do
hashRing <- readTVarIO loadBalanceHashRing
theNode' <- getResNode hashRing rid scAdvertisedListenersKey
try (M.updateMeta @P.TaskAllocation metaId (P.TaskAllocation epoch' theNode') (Just version) metaHandle) >>=
\case
Left (_e :: SomeException) -> lookupResource' sc rtype rid
Right () -> return theNode'
else do
Log.warning "LookupResource: the server has not yet synced with the latest member list "
throwIO $ HE.ResourceAllocationException "the server has not yet synced with the latest member list"

getResNode :: HashRing -> Text -> Maybe Text -> IO ServerNode
getResNode hashRing hashKey listenerKey = do
let serverNode = getAllocatedNode hashRing hashKey
theNodes <- fromInternalServerNodeWithKey listenerKey serverNode
if V.null theNodes then throwIO $ HE.NodesNotFound "Got empty nodes"
else pure $ V.head theNodes
78 changes: 34 additions & 44 deletions hstream/src/HStream/Server/Core/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,40 @@ module HStream.Server.Core.Stream
, readShard
) where

import Control.Concurrent (modifyMVar_, newEmptyMVar,
putMVar, readMVar, takeMVar,
withMVar)
import Control.Concurrent.STM (readTVarIO)
import Control.Exception (bracket, catch, throw,
throwIO)
import Control.Monad (forM, unless, when)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Foldable (foldl')
import qualified Data.HashMap.Strict as HM
import qualified Data.Map.Strict as M
import Data.Maybe (fromJust, fromMaybe)
import qualified Data.Text as T
import qualified Data.Vector as V
import GHC.Stack (HasCallStack)
import Proto3.Suite (Enumerated (Enumerated))
import qualified Proto3.Suite as PT
import qualified Z.Data.CBytes as CB
import ZooKeeper.Exception (ZNONODE (..))
import Control.Concurrent (modifyMVar_, newEmptyMVar, putMVar,
readMVar, takeMVar, withMVar)
import Control.Exception (bracket, catch, throw, throwIO)
import Control.Monad (forM, unless, when)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Foldable (foldl')
import qualified Data.HashMap.Strict as HM
import qualified Data.Map.Strict as M
import Data.Maybe (fromJust, fromMaybe)
import qualified Data.Text as T
import qualified Data.Vector as V
import GHC.Stack (HasCallStack)
import Google.Protobuf.Timestamp (Timestamp)
import Proto3.Suite (Enumerated (Enumerated))
import qualified Proto3.Suite as PT
import qualified Z.Data.CBytes as CB
import ZooKeeper.Exception (ZNONODE (..))

import Google.Protobuf.Timestamp (Timestamp)
import HStream.Common.ConsistentHashing (getAllocatedNodeId)
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as M
import HStream.Server.Core.Common (decodeRecordBatch)
import HStream.Server.HStreamApi (CreateShardReaderRequest (createShardReaderRequestShardId))
import qualified HStream.Server.HStreamApi as API
import qualified HStream.Server.MetaData as P
import HStream.Server.Shard (Shard (..), createShard,
devideKeySpace,
mkShardWithDefaultId,
mkSharedShardMapWithShards)
import HStream.Server.Types (ServerContext (..),
transToStreamName)
import qualified HStream.Stats as Stats
import qualified HStream.Store as S
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as M
import HStream.Server.Core.Common (decodeRecordBatch)
import HStream.Server.HStreamApi (CreateShardReaderRequest (..))
import qualified HStream.Server.HStreamApi as API
import qualified HStream.Server.MetaData as P
import HStream.Server.Shard (Shard (..), createShard,
devideKeySpace,
mkShardWithDefaultId,
mkSharedShardMapWithShards)
import HStream.Server.Types (ServerContext (..),
transToStreamName)
import qualified HStream.Stats as Stats
import qualified HStream.Store as S
import HStream.Utils

-------------------------------------------------------------------------------
Expand Down Expand Up @@ -225,10 +221,7 @@ deleteShardReader
=> ServerContext
-> API.DeleteShardReaderRequest
-> IO ()
deleteShardReader ServerContext{..} API.DeleteShardReaderRequest{..} = do
hashRing <- readTVarIO loadBalanceHashRing
unless (getAllocatedNodeId hashRing deleteShardReaderRequestReaderId == serverID) $
throwIO $ HE.WrongServer "Send deleteShard request to wrong server."
deleteShardReader ctx@ServerContext{..} API.DeleteShardReaderRequest{..} = do
isSuccess <- catch (M.deleteMeta @P.ShardReader deleteShardReaderRequestReaderId Nothing metaHandle >> return True) $
\ (_ :: ZNONODE) -> return False
modifyMVar_ shardReaderMap $ \mp -> do
Expand Down Expand Up @@ -278,9 +271,6 @@ readShard
-> API.ReadShardRequest
-> IO (V.Vector API.ReceivedRecord)
readShard ServerContext{..} API.ReadShardRequest{..} = do
hashRing <- readTVarIO loadBalanceHashRing
unless (getAllocatedNodeId hashRing readShardRequestReaderId == serverID) $
throwIO $ HE.WrongServer "Send readShard request to wrong server."
bracket getReader putReader readRecords
where
ldReaderBufferSize = 10
Expand Down
20 changes: 16 additions & 4 deletions hstream/src/HStream/Server/Handler/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import qualified HsGrpc.Server as G
import qualified HsGrpc.Server.Types as G
import Network.GRPC.HighLevel.Generated

import Control.Monad (unless)
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import HStream.Server.Core.Common (lookupResource')
import qualified HStream.Server.Core.Stream as C
import HStream.Server.Exception
import HStream.Server.HStreamApi
Expand Down Expand Up @@ -136,27 +138,37 @@ createShardReaderHandler
:: ServerContext
-> ServerRequest 'Normal CreateShardReaderRequest CreateShardReaderResponse
-> IO (ServerResponse 'Normal CreateShardReaderResponse)
createShardReaderHandler sc (ServerNormalRequest _metadata request) = defaultExceptionHandle $ do
createShardReaderHandler sc@ServerContext{..} (ServerNormalRequest _metadata request) = defaultExceptionHandle $ do
Log.debug $ "Receive Create ShardReader Request" <> Log.buildString' (show request)
validateNameAndThrow $ createShardReaderRequestReaderId request
C.createShardReader sc request >>= returnResp

handleCreateShardReader
:: ServerContext
-> G.UnaryHandler CreateShardReaderRequest CreateShardReaderResponse
handleCreateShardReader sc _ req = catchDefaultEx $ C.createShardReader sc req
handleCreateShardReader sc@ServerContext{..} _ req = catchDefaultEx $ do
validateNameAndThrow $ createShardReaderRequestReaderId req
C.createShardReader sc req

deleteShardReaderHandler
:: ServerContext
-> ServerRequest 'Normal DeleteShardReaderRequest Empty
-> IO (ServerResponse 'Normal Empty)
deleteShardReaderHandler sc (ServerNormalRequest _metadata request) = defaultExceptionHandle $ do
deleteShardReaderHandler sc@ServerContext{..} (ServerNormalRequest _metadata request) = defaultExceptionHandle $ do
Log.debug $ "Receive Delete ShardReader Request" <> Log.buildString' (show request)
validateNameAndThrow $ deleteShardReaderRequestReaderId request
ServerNode{..} <- lookupResource' sc ResShardReader (deleteShardReaderRequestReaderId request)
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "ShardReader is bound to a different node"
C.deleteShardReader sc request >> returnResp Empty

handleDeleteShardReader
:: ServerContext
-> G.UnaryHandler DeleteShardReaderRequest Empty
handleDeleteShardReader sc _ req = catchDefaultEx $
handleDeleteShardReader sc@ServerContext{..} _ req = catchDefaultEx $ do
ServerNode{..} <- lookupResource' sc ResShardReader (deleteShardReaderRequestReaderId req)
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "ShardReader is bound to a different node"
C.deleteShardReader sc req >> pure Empty

readShardHandler
Expand Down
Loading

0 comments on commit 9bcb101

Please sign in to comment.