Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async logging #226

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions test/config/test.erigon.seq.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ zkevm.rpc-ratelimit: 0
zkevm.datastream-version: 2

log.console.verbosity: info
log.async: true

#zkevm.executor-urls: xlayer-executor:50071
zkevm.executor-strict: false
Expand Down
119 changes: 119 additions & 0 deletions turbo/logging/async_buffered_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package logging

import (
"bufio"
"github.com/ledgerwatch/log/v3"
"io"
"sync"
"time"
)

const (
_BufferSize = 1024 * 1024
_FlushInterval = time.Second * 10
)

type AsyncBufferedWriter struct {
Size int
FlushInterval time.Duration

mu sync.Mutex

bufferedWriter *bufio.Writer
ticker *time.Ticker
done chan struct{}
stop chan struct{}

initialized bool
stopped bool
}

func AsyncHandler(wr io.Writer, format log.Format) log.Handler {
asyncBufferedWriter := &AsyncBufferedWriter{}
asyncBufferedWriter.initialize(wr)

h := log.FuncHandler(func(r *log.Record) error {
_, err := asyncBufferedWriter.write(format.Format(r))
return err
})
return h
}

func (s *AsyncBufferedWriter) initialize(wr io.Writer) {
if s.initialized {
return
}

s.Size = _BufferSize
s.FlushInterval = _FlushInterval

s.ticker = time.NewTicker(s.FlushInterval)
s.bufferedWriter = bufio.NewWriterSize(wr, s.Size)

s.done = make(chan struct{})
s.stop = make(chan struct{})

s.initialized = true

go s.flushLoop()
}

func (s *AsyncBufferedWriter) write(b []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()

if len(b) >= s.bufferedWriter.Available() && s.bufferedWriter.Buffered() > 0 {
if err := s.bufferedWriter.Flush(); err != nil {
return 0, err
}
}

return s.bufferedWriter.Write(b)
}

func (s *AsyncBufferedWriter) flush() error {
s.mu.Lock()
defer s.mu.Unlock()

return s.bufferedWriter.Flush()
}

func (s *AsyncBufferedWriter) flushLoop() {
defer close(s.done)
for {
select {
case <-s.ticker.C:
_ = s.flush()
case <-s.stop:
return
}
}
}

func (s *AsyncBufferedWriter) Stop() (err error) {
var stopped bool
func() {
s.mu.Lock()
defer s.mu.Unlock()

if !s.initialized {
return
}
stopped = s.stopped

if stopped {
return
}

s.stopped = true

s.ticker.Stop()
close(s.stop)
<-s.done
}()

if !stopped {
err = s.bufferedWriter.Flush()
}
return err
}
6 changes: 6 additions & 0 deletions turbo/logging/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ var (
Name: "log.delays",
Usage: "Enable block delay logging",
}

LogAsyncFlag = cli.BoolFlag{
Name: "log.async",
Usage: "Enable async logging",
}
)

var Flags = []cli.Flag{
Expand All @@ -69,4 +74,5 @@ var Flags = []cli.Flag{
&LogDirPrefixFlag,
&LogDirVerbosityFlag,
&LogBlockDelayFlag,
&LogAsyncFlag,
}
32 changes: 25 additions & 7 deletions turbo/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logging

import (
"flag"
"fmt"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -41,6 +42,7 @@ func SetupLoggerCtx(filePrefix string, ctx *cli.Context,
consoleDefaultLevel log.Lvl, dirDefaultLevel log.Lvl, rootHandler bool) log.Logger {
var consoleJson = ctx.Bool(LogJsonFlag.Name) || ctx.Bool(LogConsoleJsonFlag.Name)
var dirJson = ctx.Bool(LogDirJsonFlag.Name)
var asyncLogging = ctx.Bool(LogAsyncFlag.Name)

metrics.DelayLoggingEnabled = ctx.Bool(LogBlockDelayFlag.Name)

Expand Down Expand Up @@ -79,7 +81,7 @@ func SetupLoggerCtx(filePrefix string, ctx *cli.Context,
logger = log.New()
}

initSeparatedLogging(logger, filePrefix, dirPath, consoleLevel, dirLevel, consoleJson, dirJson)
initSeparatedLogging(logger, filePrefix, dirPath, consoleLevel, dirLevel, consoleJson, dirJson, asyncLogging)
return logger
}

Expand Down Expand Up @@ -141,7 +143,7 @@ func SetupLoggerCmd(filePrefix string, cmd *cobra.Command) log.Logger {
}
}

initSeparatedLogging(log.Root(), filePrefix, dirPath, consoleLevel, dirLevel, consoleJson, dirJson)
initSeparatedLogging(log.Root(), filePrefix, dirPath, consoleLevel, dirLevel, consoleJson, dirJson, false)
return log.Root()
}

Expand Down Expand Up @@ -179,7 +181,7 @@ func SetupLogger(filePrefix string) log.Logger {
filePrefix = *logDirPrefix
}

initSeparatedLogging(log.Root(), filePrefix, *logDirPath, consoleLevel, dirLevel, consoleJson, *dirJson)
initSeparatedLogging(log.Root(), filePrefix, *logDirPath, consoleLevel, dirLevel, consoleJson, *dirJson, false)
return log.Root()
}

Expand All @@ -193,15 +195,25 @@ func initSeparatedLogging(
consoleLevel log.Lvl,
dirLevel log.Lvl,
consoleJson bool,
dirJson bool) {
dirJson bool,
asyncLogging bool) {

var consoleHandler log.Handler

var format log.Format

if consoleJson {
consoleHandler = log.LvlFilterHandler(consoleLevel, log.StreamHandler(os.Stderr, log.JsonFormat()))
format = log.JsonFormat()
} else {
format = log.TerminalFormatNoColor()
}

if asyncLogging {
consoleHandler = log.LvlFilterHandler(consoleLevel, AsyncHandler(os.Stderr, format))
} else {
consoleHandler = log.LvlFilterHandler(consoleLevel, log.StderrHandler)
consoleHandler = log.LvlFilterHandler(consoleLevel, log.StreamHandler(os.Stderr, format))
}

logger.SetHandler(consoleHandler)

if len(dirPath) == 0 {
Expand All @@ -226,10 +238,16 @@ func initSeparatedLogging(
MaxBackups: 3,
MaxAge: 28, //days
}
userLog := log.StreamHandler(lumberjack, dirFormat)
var userLog log.Handler
if asyncLogging {
userLog = AsyncHandler(lumberjack, dirFormat)
} else {
userLog = log.StreamHandler(lumberjack, dirFormat)
}

mux := log.MultiHandler(consoleHandler, log.LvlFilterHandler(dirLevel, userLog))
logger.SetHandler(mux)
logger.Info(fmt.Sprintf("Async logging enabled: %v", asyncLogging))
logger.Info("logging to file system", "log dir", dirPath, "file prefix", filePrefix, "log level", dirLevel, "json", dirJson)
}

Expand Down
Loading