diff --git a/common/server/HStream/Common/Server/HashRing.hs b/common/server/HStream/Common/Server/HashRing.hs index aa5ed0880..bbc955284 100644 --- a/common/server/HStream/Common/Server/HashRing.hs +++ b/common/server/HStream/Common/Server/HashRing.hs @@ -1,33 +1,68 @@ module HStream.Common.Server.HashRing ( LoadBalanceHashRing + , readLoadBalanceHashRing , initializeHashRing , updateHashRing ) where +import Control.Concurrent import Control.Concurrent.STM import Control.Monad import Data.List (sort) +import Data.Maybe (fromMaybe) +import System.Environment (lookupEnv) +import Text.Read (readMaybe) import HStream.Common.ConsistentHashing (HashRing, constructServerMap) import HStream.Gossip.Types (Epoch, GossipContext) import HStream.Gossip.Utils (getMemberListWithEpochSTM) +import qualified HStream.Logger as Log -type LoadBalanceHashRing = TVar (Epoch, HashRing) +-- FIXME: The 'Bool' flag means "if we think the HashRing can be used for +-- resource allocation now". This is because a server node can +-- only see a part of the cluster during the early stage of startup. +-- FIXME: This is just a mitigation for the consistency problem. +type LoadBalanceHashRing = TVar (Epoch, HashRing, Bool) + +readLoadBalanceHashRing :: LoadBalanceHashRing -> STM (Epoch, HashRing) +readLoadBalanceHashRing hashRing = do + (epoch, hashRing, isReady) <- readTVar hashRing + if isReady + then return (epoch, hashRing) + else retry initializeHashRing :: GossipContext -> IO LoadBalanceHashRing initializeHashRing gc = atomically $ do (epoch, serverNodes) <- getMemberListWithEpochSTM gc - newTVar (epoch, constructServerMap . sort $ serverNodes) + newTVar (epoch, constructServerMap . sort $ serverNodes, False) -- However, reconstruct hashRing every time can be expensive -- when we have a large number of nodes in the cluster. +-- FIXME: We delayed for several seconds to make sure the node has seen +-- the whole cluster. This is only a mitigation. See the comment +-- above. +-- FIXME: Hard-coded constant. +-- WARNING: This should be called exactly once on startup! updateHashRing :: GossipContext -> LoadBalanceHashRing -> IO () -updateHashRing gc hashRing = loop (0,[]) +updateHashRing gc hashRing = do + let defaultMs = 5000 + delayMs <- lookupEnv "HSTREAM_INTERNAL_STARTUP_EXTRA_DELAY_MS" >>= \case + Nothing -> return defaultMs + Just ms -> return (fromMaybe defaultMs (readMaybe ms)) + void $ forkIO (earlyStageDelay delayMs) + loop (0,[]) where + earlyStageDelay timeoutMs = do + Log.info $ "Delaying for " <> Log.buildString' timeoutMs <> "ms before I can make resource allocation decisions..." + threadDelay (timeoutMs * 1000) + atomically $ modifyTVar' hashRing (\(epoch, hashRing, _) -> (epoch, hashRing, True)) + Log.info "Cluster is ready!" + loop (epoch, list)= loop =<< atomically ( do (epoch', list') <- getMemberListWithEpochSTM gc when (epoch == epoch' && list == list') retry - writeTVar hashRing (epoch', constructServerMap list') + modifyTVar' hashRing + (\(_,_,isReady) -> (epoch', constructServerMap list', isReady)) return (epoch', list') ) diff --git a/common/server/HStream/Common/Server/Lookup.hs b/common/server/HStream/Common/Server/Lookup.hs index 54036da7a..d17487f04 100644 --- a/common/server/HStream/Common/Server/Lookup.hs +++ b/common/server/HStream/Common/Server/Lookup.hs @@ -10,7 +10,6 @@ module HStream.Common.Server.Lookup , kafkaResourceMetaId ) where -import Control.Concurrent (threadDelay) import Control.Concurrent.STM import Control.Exception (SomeException (..), throwIO, try) @@ -19,7 +18,8 @@ import Data.Text (Text) import qualified Data.Vector as V import HStream.Common.ConsistentHashing (getResNode) -import HStream.Common.Server.HashRing (LoadBalanceHashRing) +import HStream.Common.Server.HashRing (LoadBalanceHashRing, + readLoadBalanceHashRing) import HStream.Common.Server.MetaData (TaskAllocation (..)) import HStream.Common.Types (fromInternalServerNodeWithKey) import qualified HStream.Exception as HE @@ -30,7 +30,7 @@ import qualified HStream.Server.HStreamApi as A lookupNode :: LoadBalanceHashRing -> Text -> Maybe Text -> IO A.ServerNode lookupNode loadBalanceHashRing key advertisedListenersKey = do - (_, hashRing) <- readTVarIO loadBalanceHashRing + (_, hashRing) <- atomically (readLoadBalanceHashRing loadBalanceHashRing) theNode <- getResNode hashRing key advertisedListenersKey return theNode @@ -42,62 +42,43 @@ lookupNodePersist -> Text -> Maybe Text -> IO A.ServerNode -lookupNodePersist metaHandle_ gossipContext_ loadBalanceHashRing_ - key_ metaId_ advertisedListenersKey_ = - -- FIXME: This is only a mitigation for the case that the node has not - -- known the full cluster info. Reinvestigate it!!! - -- And as you see, a hard-coded constant... - go metaHandle_ gossipContext_ loadBalanceHashRing_ key_ metaId_ advertisedListenersKey_ 5 - where - -- TODO: Currerntly, 'leftRetries' only works before a re-allocation. It can be also - -- used on other cases such as encountering an exception. - go metaHandle gossipContext loadBalanceHashRing - key metaId advertisedListenersKey leftRetries = do - -- FIXME: it will insert the results of lookup no matter the resource exists - -- or not - M.getMetaWithVer @TaskAllocation metaId metaHandle >>= \case +lookupNodePersist metaHandle gossipContext loadBalanceHashRing + key metaId advertisedListenersKey = do + -- FIXME: it will insert the results of lookup no matter the resource exists + -- or not + M.getMetaWithVer @TaskAllocation metaId metaHandle >>= \case + Nothing -> do + (epoch, hashRing) <- atomically (readLoadBalanceHashRing loadBalanceHashRing) + theNode <- getResNode hashRing key advertisedListenersKey + try (M.insertMeta @TaskAllocation + metaId + (TaskAllocation epoch (A.serverNodeId theNode)) + metaHandle) >>= \case + Left (e :: SomeException) -> do + -- TODO: add a retry limit here + Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e + <> ", retry..." + lookupNodePersist metaHandle gossipContext loadBalanceHashRing + key metaId advertisedListenersKey + Right () -> return theNode + Just (TaskAllocation epoch nodeId, version) -> do + serverList <- getMemberList gossipContext >>= + fmap V.concat . mapM (fromInternalServerNodeWithKey advertisedListenersKey) + case find ((nodeId == ) . A.serverNodeId) serverList of + Just theNode -> return theNode Nothing -> do - (epoch, hashRing) <- readTVarIO loadBalanceHashRing - theNode <- getResNode hashRing key advertisedListenersKey - try (M.insertMeta @TaskAllocation - metaId - (TaskAllocation epoch (A.serverNodeId theNode)) - metaHandle) >>= \case + (epoch', hashRing) <- atomically (readLoadBalanceHashRing loadBalanceHashRing) + theNode' <- getResNode hashRing key advertisedListenersKey + try (M.updateMeta @TaskAllocation metaId + (TaskAllocation epoch' (A.serverNodeId theNode')) + (Just version) metaHandle) >>= \case Left (e :: SomeException) -> do -- TODO: add a retry limit here Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e <> ", retry..." lookupNodePersist metaHandle gossipContext loadBalanceHashRing key metaId advertisedListenersKey - Right () -> return theNode - Just (TaskAllocation epoch nodeId, version) -> do - serverList <- getMemberList gossipContext >>= - fmap V.concat . mapM (fromInternalServerNodeWithKey advertisedListenersKey) - case find ((nodeId == ) . A.serverNodeId) serverList of - Just theNode -> return theNode - Nothing -> do - if leftRetries > 0 - then do - Log.info $ " on Log.buildString' key <> ", metaId=" <> - Log.buildString' metaId <> ">: found on Node=" <> Log.buildString' nodeId <> - ", but not sure if it's really dead. Left " <> Log.buildString' leftRetries <> - " retries before re-allocate it..." - threadDelay (1 * 1000 * 1000) - go metaHandle gossipContext loadBalanceHashRing - key metaId advertisedListenersKey (leftRetries - 1) - else do - (epoch', hashRing) <- readTVarIO loadBalanceHashRing - theNode' <- getResNode hashRing key advertisedListenersKey - try (M.updateMeta @TaskAllocation metaId - (TaskAllocation epoch' (A.serverNodeId theNode')) - (Just version) metaHandle) >>= \case - Left (e :: SomeException) -> do - -- TODO: add a retry limit here - Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e - <> ", retry..." - lookupNodePersist metaHandle gossipContext loadBalanceHashRing - key metaId advertisedListenersKey - Right () -> return theNode' + Right () -> return theNode' data KafkaResource = KafkaResTopic Text diff --git a/common/server/HStream/Common/Server/TaskManager.hs b/common/server/HStream/Common/Server/TaskManager.hs index 8e3ec6070..4dc148ee6 100644 --- a/common/server/HStream/Common/Server/TaskManager.hs +++ b/common/server/HStream/Common/Server/TaskManager.hs @@ -2,23 +2,22 @@ module HStream.Common.Server.TaskManager where -import Control.Concurrent (forkIO) -import qualified Control.Concurrent as C -import qualified Control.Concurrent.STM as C -import qualified Control.Exception as E -import qualified Control.Monad as M +import Control.Concurrent (forkIO) +import qualified Control.Concurrent as C +import qualified Control.Exception as E +import qualified Control.Monad as M import Data.Int -import qualified Data.Set as Set -import qualified Data.Text as T -import qualified Data.Vector as V -import Data.Word (Word32) -import HStream.Common.ConsistentHashing (HashRing) -import HStream.Common.Server.Lookup (lookupNodePersist) -import qualified HStream.Exception as HE -import HStream.Gossip.Types (Epoch, GossipContext) -import qualified HStream.Logger as Log -import HStream.MetaStore.Types (MetaHandle) -import HStream.Server.HStreamApi (ServerNode (serverNodeId)) +import qualified Data.Set as Set +import qualified Data.Text as T +import qualified Data.Vector as V +import Data.Word (Word32) +import HStream.Common.Server.HashRing (LoadBalanceHashRing) +import HStream.Common.Server.Lookup (lookupNodePersist) +import qualified HStream.Exception as HE +import HStream.Gossip.Types (GossipContext) +import qualified HStream.Logger as Log +import HStream.MetaStore.Types (MetaHandle) +import HStream.Server.HStreamApi (ServerNode (serverNodeId)) ------------------------------------------------------------------------------- @@ -51,7 +50,7 @@ data TaskDetector , metaHandle :: MetaHandle , gossipContext :: GossipContext - , loadBalanceHashRing :: C.TVar (Epoch, HashRing) + , loadBalanceHashRing :: LoadBalanceHashRing , advertisedListenersKey :: Maybe T.Text , serverID :: Word32 } diff --git a/hstream/app/server.hs b/hstream/app/server.hs index ebdec4d4d..1dfe62724 100644 --- a/hstream/app/server.hs +++ b/hstream/app/server.hs @@ -200,7 +200,7 @@ serve sc@ServerContext{..} rpcOpts enableStreamV2 = do #endif <> ", waiting for cluster to get ready" void $ forkIO $ do - void (readMVar (clusterReady gossipContext)) >> Log.info "Cluster is ready!" + void (readMVar (clusterReady gossipContext)) readMVar (clusterInited gossipContext) >>= \case Gossip -> return () _ -> do diff --git a/hstream/src/HStream/Server/Types.hs b/hstream/src/HStream/Server/Types.hs index a05f197c8..f67f6c3fa 100644 --- a/hstream/src/HStream/Server/Types.hs +++ b/hstream/src/HStream/Server/Types.hs @@ -36,7 +36,7 @@ import Control.Monad (when) import Data.IORef (IORef) import Data.Maybe (fromJust) import HStream.Base.Timer (CompactedWorker) -import HStream.Common.ConsistentHashing (HashRing) +import HStream.Common.Server.HashRing (LoadBalanceHashRing) import HStream.Common.Types (ShardKey) import qualified HStream.Exception as HE import HStream.Gossip.Types (Epoch, GossipContext) @@ -100,7 +100,7 @@ data ServerContext = ServerContext , headerConfig :: AA.HeaderConfig AA.AdminAPI #endif , scStatsHolder :: Stats.StatsHolder - , loadBalanceHashRing :: TVar (Epoch, HashRing) + , loadBalanceHashRing :: LoadBalanceHashRing , scIOWorker :: IO.Worker , gossipContext :: GossipContext , serverOpts :: ServerOpts