diff --git a/hstream/app/server.hs b/hstream/app/server.hs index 1dfe62724..2de90993b 100644 --- a/hstream/app/server.hs +++ b/hstream/app/server.hs @@ -2,6 +2,7 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} @@ -35,7 +36,8 @@ import HStream.Common.Server.HashRing (updateHashRing) import HStream.Common.Server.MetaData (TaskAllocation (..), clusterStartTimeId) import HStream.Common.Types (getHStreamVersion) -import HStream.Common.ZookeeperClient (withZookeeperClient) +import HStream.Common.ZookeeperClient (unsafeGetZHandle, + withZookeeperClient) import HStream.Exception import HStream.Gossip (GossipContext (..), defaultGossipOpts, @@ -61,6 +63,8 @@ import HStream.Server.Config (AdvertisedListeners, getConfig, runServerCli) import qualified HStream.Server.Core.Cluster as Cluster import qualified HStream.Server.Experimental as Exp +import HStream.Server.HealthMonitor (mkHealthMonitor, + startMonitor) import qualified HStream.Server.HsGrpcHandler as HsGrpcHandler import qualified HStream.Server.HStreamApi as API import qualified HStream.Server.HStreamInternal as I @@ -174,6 +178,13 @@ app config@ServerOpts{..} = do forM_ as (Async.link2Only (const True) a) -- wati the default server waitGossipBoot gossipContext + + let ServerContext{scLDClient, metaHandle} = serverContext + healthMonitor <- mkHealthMonitor scLDClient metaHandle 1 + aMonitor <- Async.async $ startMonitor serverContext healthMonitor 3 + Log.info $ "Start healthy monitor" + Async.link2Only (const True) a aMonitor + Async.wait a serve diff --git a/hstream/src/HStream/Server/Core/Stream.hs b/hstream/src/HStream/Server/Core/Stream.hs index 95d97c851..ce197bdd5 100644 --- a/hstream/src/HStream/Server/Core/Stream.hs +++ b/hstream/src/HStream/Server/Core/Stream.hs @@ -36,6 +36,7 @@ import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL import Data.Either (partitionEithers) import Data.Functor ((<&>)) +import Data.IORef (readIORef) import qualified Data.List as L import qualified Data.Map.Strict as M import Data.Maybe (fromJust, fromMaybe) @@ -52,10 +53,12 @@ import HStream.Common.Types import qualified HStream.Common.ZookeeperSlotAlloc as Slot import qualified HStream.Exception as HE import qualified HStream.Logger as Log +import qualified HStream.Server.CacheStore as DB import qualified HStream.Server.HStreamApi as API import qualified HStream.Server.MetaData as P import HStream.Server.Types (ServerContext (..), ServerInternalOffset (..), + ServerMode (..), ToOffset (..)) import qualified HStream.Stats as Stats import qualified HStream.Store as S @@ -385,7 +388,11 @@ appendStream ServerContext{..} streamName shardId record = do recordSize = API.batchedRecordBatchSize record payloadSize = BS.length payload when (payloadSize > scMaxRecordSize) $ throwIO $ HE.InvalidRecordSize payloadSize - S.AppendCompletion {..} <- S.appendCompressedBS scLDClient shardId payload cmpStrategy Nothing + -- S.AppendCompletion {..} <- S.appendCompressedBS scLDClient shardId payload cmpStrategy Nothing + state <- readIORef serverState + S.AppendCompletion {..} <- case state of + ServerNormal -> S.appendCompressedBS scLDClient shardId payload cmpStrategy Nothing + ServerBackup -> DB.writeRecord cachedStore streamName shardId payload Stats.stream_stat_add_append_in_bytes scStatsHolder cStreamName (fromIntegral payloadSize) Stats.stream_stat_add_append_in_records scStatsHolder cStreamName (fromIntegral recordSize) Stats.stream_time_series_add_append_in_bytes scStatsHolder cStreamName (fromIntegral payloadSize) diff --git a/hstream/src/HStream/Server/Initialization.hs b/hstream/src/HStream/Server/Initialization.hs index 37c21ab1e..48ac5ef27 100644 --- a/hstream/src/HStream/Server/Initialization.hs +++ b/hstream/src/HStream/Server/Initialization.hs @@ -40,6 +40,7 @@ import qualified Z.Data.CBytes as CB #if __GLASGOW_HASKELL__ < 902 import qualified HStream.Admin.Store.API as AA #endif +import Data.IORef (newIORef) import HStream.Common.ConsistentHashing (HashRing, constructServerMap, getAllocatedNodeId) import HStream.Common.Server.HashRing (initializeHashRing) @@ -49,6 +50,7 @@ import qualified HStream.IO.Types as IO import qualified HStream.IO.Worker as IO import qualified HStream.Logger as Log import HStream.MetaStore.Types (MetaHandle (..)) +import HStream.Server.CacheStore (mkCacheStore) import HStream.Server.Config (ServerOpts (..), TlsConfig (..)) import HStream.Server.Types @@ -88,6 +90,11 @@ initializeServer opts@ServerOpts{..} gossipContext hh db_m = do shardReaderMap <- newMVar HM.empty + serverMode <- newIORef ServerNormal + let dbOption = def { RocksDB.createIfMissing = True } + let path = _cacheStorePath <> show _serverID + cachedStore <- mkCacheStore path dbOption def def + -- recovery tasks return @@ -112,6 +119,8 @@ initializeServer opts@ServerOpts{..} gossipContext hh db_m = do , shardReaderMap = shardReaderMap , querySnapshotPath = _querySnapshotPath , querySnapshotter = db_m + , serverState = serverMode + , cachedStore = cachedStore } --------------------------------------------------------------------------------