Skip to content

Commit

Permalink
Support file based log (#1708)
Browse files Browse the repository at this point in the history
* logger: support file rotation based

* hstream-server: add cli options to support file based log
  • Loading branch information
4eUeP authored Dec 8, 2023
1 parent 6a3f6ad commit fcc2396
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 14 deletions.
67 changes: 63 additions & 4 deletions common/base/HStream/Logger.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{- Mostly Copy from https://hackage.haskell.org/package/fast-logger and modified.
The original license is BSD-3-Clause.
-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE GADTs #-}
Expand Down Expand Up @@ -42,6 +45,7 @@ module HStream.Logger
-- ** LogType
, LogType
, LogType' (..)
, Log.FileLogSpec (..)
-- ** Formatter
, LogFormatter
-- ** Log Level
Expand All @@ -57,14 +61,15 @@ module HStream.Logger

import Control.Concurrent (threadDelay)
import qualified Control.Concurrent.Async as Async
import Control.Exception (finally)
import Control.Monad (forever, when)
import Data.IORef (IORef, atomicWriteIORef,
newIORef, readIORef)
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
import Data.IORef
import Foreign.C.Types
import GHC.Conc.Sync (ThreadId (..), myThreadId)
import GHC.Exts (ThreadId#)
import GHC.Stack
import System.EasyFile (getFileSize)
import System.IO.Unsafe (unsafePerformIO)
import qualified System.Log.FastLogger as Log
import qualified System.Log.FastLogger.Internal as Log
Expand Down Expand Up @@ -265,6 +270,7 @@ data LogType' a where
LogStdout :: LogType' Log.LogStr
LogStderr :: LogType' Log.LogStr
LogFile :: FilePath -> LogType' Log.LogStr
LogFileRotate :: Log.FileLogSpec -> LogType' Log.LogStr

-- | Logger config type used in this module.
data LoggerConfig = LoggerConfig
Expand Down Expand Up @@ -306,6 +312,7 @@ newLogger !LoggerConfig{..} =
LogStdout -> Log.newStdoutLoggerSet loggerBufSize >>= loggerInit
LogStderr -> Log.newStderrLoggerSet loggerBufSize >>= loggerInit
LogFile fp -> Log.newFileLoggerSet loggerBufSize fp >>= loggerInit
LogFileRotate fspec -> rotateLoggerInit fspec loggerBufSize
where
loggerInit lgrset = return $ Logger
(\level shouldFlush cstack s ->
Expand All @@ -318,6 +325,26 @@ newLogger !LoggerConfig{..} =
)
(Log.rmLoggerSet lgrset)
(loggerFlush lgrset)

-- Rotate file logger
rotateLoggerInit fspec bsize = do
lgrset <- Log.newFileLoggerSet bsize $ Log.log_file fspec
ref <- newIORef (0 :: Int)
mvar <- newMVar ()
return $ Logger
(\level shouldFlush cstack s ->
when (level >= loggerLevel) $ do
cnt <- decrease ref
tid <- myThreadId
time <- defaultTimeCache
Log.pushLogStr lgrset $
loggerFormatter (Log.toLogStr time) level s cstack tid
when (loggerFlushImmediately || shouldFlush) $ loggerFlush lgrset
when (cnt <= 0) $ tryRotate lgrset fspec ref mvar
)
(Log.rmLoggerSet lgrset)
(loggerFlush lgrset)

loggerFlush lgrset = Log.flushLogStr lgrset

globalLogger :: IORef Logger
Expand Down Expand Up @@ -418,3 +445,35 @@ foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CLong
#else
foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt
#endif

decrease :: IORef Int -> IO Int
decrease ref = atomicModifyIORef' ref (\x -> (x - 1, x - 1))

tryRotate :: Log.LoggerSet -> Log.FileLogSpec -> IORef Int -> MVar () -> IO ()
tryRotate lgrset spec ref mvar = bracket lock unlock rotateFiles
where
lock = tryTakeMVar mvar
unlock Nothing = return ()
unlock _ = putMVar mvar ()
rotateFiles Nothing = return ()
rotateFiles _ = do
msiz <- getSize
case msiz of
-- A file is not available.
-- So, let's set a big value to the counter so that
-- this function is not called frequently.
Nothing -> writeIORef ref 1000000
Just siz
| siz > limit -> do
Log.rotate spec
Log.renewLoggerSet lgrset
writeIORef ref $ estimate limit
| otherwise -> writeIORef ref $ estimate (limit - siz)
file = Log.log_file spec
limit = Log.log_file_size spec
getSize = handle (\(SomeException _) -> return Nothing) $
-- The log file is locked by GHC.
-- We need to get its file size by the way not using locks.
Just . fromIntegral <$> getFileSize file
-- 200 is an ad-hoc value for the length of log line.
estimate x = fromInteger (x `div` 200)
8 changes: 7 additions & 1 deletion common/base/hstream-common-base.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ library
, bytestring
, containers
, data-default
, easy-file
, fast-logger
, ghc-prim >=0.5 && <1.0
, primitive ^>=0.7.2
Expand Down Expand Up @@ -126,18 +127,23 @@ test-suite hstream-common-base-test
type: exitcode-stdio-1.0
main-is: Spec.hs
hs-source-dirs: test
other-modules: HStream.BaseSpec
other-modules:
HStream.BaseSpec
HStream.LoggerSpec

build-depends:
, aeson
, base >=4.11 && <5
, bytestring
, containers ^>=0.6
, directory
, filepath
, hspec
, hstream-common-base
, QuickCheck
, quickcheck-instances
, random ^>=1.2
, temporary
, text
, unordered-containers
, vector
Expand Down
30 changes: 30 additions & 0 deletions common/base/test/HStream/LoggerSpec.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
module HStream.LoggerSpec (spec) where

import System.FilePath ((</>))
import System.IO.Temp (withSystemTempDirectory)
import Test.Hspec

import qualified HStream.Logger as Log

spec :: Spec
spec = describe "HStream.Logger" $ do
it "LogFileRotate" $ do
withSystemTempDirectory "HStreamLoggerTest" $ \tmpDir -> do
let logfile = tmpDir </> "test.log"
let logfile0 = tmpDir </> "test.log.0"

Log.withDefaultLogger $ do
let logType = Log.LogFileRotate (Log.FileLogSpec logfile 100 100)
Log.setDefaultLogger Log.INFO False logType True
Log.info "3333333333"
Log.debug "should not be printed"
Log.info "2222222222"
Log.info "1111111111"

-- >>> extract "[INFO][2023-12-06T10:19:53+0000][test/HStream/LoggerSpec.hs:16:7][thread#71]3333333333"
-- 3333333333
let extract line = takeWhile (/= ']') (reverse line)

(map extract . lines) <$> readFile logfile0 `shouldReturn`
["3333333333", "2222222222"]
(map extract . lines) <$> readFile logfile `shouldReturn` ["1111111111"]
1 change: 1 addition & 0 deletions hstream-kafka/HStream/Kafka/Server/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module HStream.Kafka.Server.Config
, runServerConfig
, runServerFromCliOpts

, FileLoggerSettings (..)
, MetaStoreAddr (..)
, AdvertisedListeners
, ListenersSecurityProtocolMap
Expand Down
17 changes: 17 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ cliOptionsParser = do
cliServerLogLevel <- optional logLevelParser
cliServerLogWithColor <- logWithColorParser
cliServerLogFlushImmediately <- logFlushImmediatelyParser
cliServerFileLog <- optional fileLoggerSettingsParser

cliServerGossipAddress <- optional serverGossipAddressParser
cliServerGossipPort <- optional serverGossipPortParser
Expand Down Expand Up @@ -145,6 +146,22 @@ listenersSecurityProtocolMapParser = parserOpt parseListenersSecurityProtocolMap
<> "e.g. public:tls,private:plaintext"
)

fileLoggerSettingsParser :: O.Parser FileLoggerSettings
fileLoggerSettingsParser = do
logpath <- strOption
$ long "file-log-path"
<> metavar "PATH"
<> help "File logger path"
logsize <- option auto
$ long "file-log-size"
<> metavar "INT" <> value 20971520
<> help "The maximum size of the log before it's rolled, in bytes. Default is 20MB"
lognum <- option auto
$ long "file-log-num"
<> metavar "INT" <> value 10
<> help "The maximum number of rolled log files to keep. Default is 10"
return FileLoggerSettings{..}

metaStoreAddrParser :: O.Parser MetaStoreAddr
metaStoreAddrParser = option (O.maybeReader (Just . parseMetaStoreAddr . T.pack))
$ long "metastore-uri"
Expand Down
1 change: 1 addition & 0 deletions hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ parseJSONToOptions CliOptions{..} obj = do
let !_serverLogLevel = fromMaybe (readWithErrLog "log-level" nodeLogLevel) cliServerLogLevel
let !_serverLogWithColor = nodeLogWithColor || cliServerLogWithColor
let !_serverLogFlushImmediately = cliServerLogFlushImmediately
let !serverFileLog = cliServerFileLog

-- Cluster Option
seeds <- flip fromMaybe cliSeedNodes <$> (nodeCfgObj .: "seed-nodes")
Expand Down
9 changes: 9 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Config/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module HStream.Kafka.Server.Config.Types
( ServerOpts (..)
, ServerCli (..), CliOptions (..)

, FileLoggerSettings (..)
, MetaStoreAddr (..)
, AdvertisedListeners
, ListenersSecurityProtocolMap
Expand Down Expand Up @@ -63,6 +64,7 @@ data ServerOpts = ServerOpts
, _serverLogLevel :: !Log.Level
, _serverLogWithColor :: !Bool
, _serverLogFlushImmediately :: !Bool
, serverFileLog :: !(Maybe FileLoggerSettings)

, _serverGossipAddress :: !String
, _serverGossipPort :: !Word16
Expand Down Expand Up @@ -105,6 +107,7 @@ data CliOptions = CliOptions
, cliServerLogLevel :: !(Maybe Log.Level)
, cliServerLogWithColor :: !Bool
, cliServerLogFlushImmediately :: !Bool
, cliServerFileLog :: !(Maybe FileLoggerSettings)

-- Gossip
, cliServerGossipAddress :: !(Maybe String)
Expand Down Expand Up @@ -138,6 +141,12 @@ data CliOptions = CliOptions

-------------------------------------------------------------------------------

data FileLoggerSettings = FileLoggerSettings
{ logpath :: !FilePath
, logsize :: !Integer
, lognum :: !Int
} deriving (Show, Eq)

data MetaStoreAddr
= ZkAddr CBytes
| RqAddr Text
Expand Down
10 changes: 8 additions & 2 deletions hstream/app/lib/KafkaServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE QuasiQuotes #-}
Expand Down Expand Up @@ -47,6 +48,7 @@ import HStream.Gossip (GossipContext (..),
import qualified HStream.Gossip.Types as Gossip
import qualified HStream.Kafka.Network as K
import HStream.Kafka.Server.Config (AdvertisedListeners,
FileLoggerSettings (..),
ListenersSecurityProtocolMap,
MetaStoreAddr (..),
SecurityProtocolMap,
Expand Down Expand Up @@ -81,9 +83,13 @@ runApp = do
app :: ServerOpts -> IO ()
app config@ServerOpts{..} = do
setupFatalSignalHandler
Log.setDefaultLogger _serverLogLevel _serverLogWithColor
Log.LogStderr _serverLogFlushImmediately
S.setLogDeviceDbgLevel' _ldLogLevel
let logType = case config.serverFileLog of
Nothing -> Log.LogStderr
Just FileLoggerSettings{..} -> Log.LogFileRotate $
Log.FileLogSpec logpath logsize lognum
Log.setDefaultLogger _serverLogLevel _serverLogWithColor
logType _serverLogFlushImmediately
case _metaStore of
ZkAddr addr -> do
let zkRes = zookeeperResInit addr Nothing 5000 Nothing 0
Expand Down
16 changes: 9 additions & 7 deletions hstream/app/server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE QuasiQuotes #-}
Expand All @@ -13,8 +14,6 @@
import Control.Concurrent (forkIO, newEmptyMVar,
putMVar, readMVar)
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.STM (TVar, atomically, retry,
writeTVar)
import Control.Exception (bracket, handle)
import Control.Monad (forM, forM_, join, void,
when)
Expand All @@ -36,7 +35,6 @@ import ZooKeeper (withResource,
zookeeperResInit)

import HStream.Base (setupFatalSignalHandler)
import HStream.Common.ConsistentHashing (HashRing, constructServerMap)
import HStream.Common.Server.HashRing (updateHashRing)
import HStream.Common.Server.MetaData (TaskAllocation (..),
clusterStartTimeId)
Expand All @@ -46,15 +44,15 @@ import HStream.Gossip (GossipContext (..),
defaultGossipOpts,
initGossipContext,
startGossip, waitGossipBoot)
import HStream.Gossip.Types (Epoch, InitType (Gossip))
import HStream.Gossip.Utils (getMemberListWithEpochSTM)
import HStream.Gossip.Types (InitType (Gossip))
import qualified HStream.Kafka.Server.Config as Ka
import qualified HStream.Logger as Log
import HStream.MetaStore.Types as M (MetaHandle (..),
MetaStore (..),
RHandle (..))
import HStream.Server.Config (AdvertisedListeners,
ExperimentalFeature (..),
FileLoggerSettings (..),
ListenersSecurityProtocolMap,
MetaStoreAddr (..),
SecurityProtocolMap,
Expand Down Expand Up @@ -110,9 +108,13 @@ main = do
app :: ServerOpts -> IO ()
app config@ServerOpts{..} = do
setupFatalSignalHandler
Log.setDefaultLogger _serverLogLevel _serverLogWithColor
Log.LogStderr _serverLogFlushImmediately
Log.setLogDeviceDbgLevel' _ldLogLevel
let logType = case config.serverFileLog of
Nothing -> Log.LogStderr
Just FileLoggerSettings{..} -> Log.LogFileRotate $
Log.FileLogSpec logpath logsize lognum
Log.setDefaultLogger _serverLogLevel _serverLogWithColor
logType _serverLogFlushImmediately

bracket (openRocksDBHandle _querySnapshotPath) closeRocksDBHandle $ \db_m ->
case _metaStore of
Expand Down
4 changes: 4 additions & 0 deletions hstream/src/HStream/Server/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module HStream.Server.Config

-- *
, TlsConfig (..)
, FileLoggerSettings (..)
, AdvertisedListeners
, advertisedListenersToPB
, ListenersSecurityProtocolMap
Expand Down Expand Up @@ -107,6 +108,7 @@ data ServerOpts = ServerOpts
, _serverLogLevel :: !Log.Level
, _serverLogWithColor :: !Bool
, _serverLogFlushImmediately :: !Bool
, serverFileLog :: !(Maybe FileLoggerSettings)
, _seedNodes :: ![(ByteString, Int)]
, _ldAdminHost :: !ByteString
, _ldAdminPort :: !Int
Expand Down Expand Up @@ -170,9 +172,11 @@ parseJSONToOptions CliOptions{..} obj = do

let !_metaStore = fromMaybe nodeMetaStore cliMetaStore
let !_compression = fromMaybe CompressionNone cliStoreCompression

let !_serverLogLevel = fromMaybe (readWithErrLog "log-level" nodeLogLevel) cliServerLogLevel
let !_serverLogWithColor = nodeLogWithColor || cliServerLogWithColor
let !_serverLogFlushImmediately = cliServerLogFlushImmediately
let !serverFileLog = cliServerFileLog

-- Cluster Option
seeds <- flip fromMaybe cliSeedNodes <$> (nodeCfgObj .: "seed-nodes")
Expand Down
Loading

0 comments on commit fcc2396

Please sign in to comment.