From 94d477d8a987580d5e5b77846c24e67ce9c34d4f Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Thu, 31 Aug 2023 21:56:14 +0900 Subject: [PATCH] refactor scheduler --- service/scheduler/app.go | 8 ++++--- service/scheduler/entry/entry.go | 27 ++++++++++++++---------- service/scheduler/scheduler/scheduler.go | 23 ++++++++++---------- 3 files changed, 33 insertions(+), 25 deletions(-) diff --git a/service/scheduler/app.go b/service/scheduler/app.go index 0a18923f..4fda6d6b 100644 --- a/service/scheduler/app.go +++ b/service/scheduler/app.go @@ -23,8 +23,8 @@ type Params struct { EntryReader scheduler.EntryReader } -func EntryReaderProvider(cfg *config.Config, jf entry.JobFactory) scheduler.EntryReader { - return entry.NewEntryReader(cfg.DAGs, jf) +func EntryReaderProvider(cfg *config.Config, jf entry.JobFactory, logger logger.Logger) scheduler.EntryReader { + return entry.NewEntryReader(cfg.DAGs, jf, logger) } func JobFactoryProvider(cfg *config.Config) entry.JobFactory { @@ -37,7 +37,9 @@ func JobFactoryProvider(cfg *config.Config) entry.JobFactory { func New(params Params) *scheduler.Scheduler { return scheduler.New(scheduler.Params{ EntryReader: params.EntryReader, - LogDir: params.Config.LogDir, + Logger: params.Logger, + // TODO: check this is used + LogDir: params.Config.LogDir, }) } diff --git a/service/scheduler/entry/entry.go b/service/scheduler/entry/entry.go index 699a4896..bf22abd8 100644 --- a/service/scheduler/entry/entry.go +++ b/service/scheduler/entry/entry.go @@ -1,9 +1,10 @@ package entry import ( + "github.com/dagu-dev/dagu/internal/logger" + "github.com/dagu-dev/dagu/internal/logger/tag" "github.com/dagu-dev/dagu/service/scheduler/filenotify" "github.com/dagu-dev/dagu/service/scheduler/scheduler" - "log" "os" "path/filepath" "strings" @@ -22,7 +23,7 @@ type JobFactory interface { NewJob(dag *dag.DAG, next time.Time) scheduler.Job } -func NewEntryReader(dagsDir string, jf JobFactory) *EntryReader { +func NewEntryReader(dagsDir string, jf JobFactory, logger logger.Logger) *EntryReader { er := &EntryReader{ dagsDir: dagsDir, suspendChecker: suspend.NewSuspendChecker( @@ -31,9 +32,10 @@ func NewEntryReader(dagsDir string, jf JobFactory) *EntryReader { dagsLock: sync.Mutex{}, dags: map[string]*dag.DAG{}, jf: jf, + logger: logger, } if err := er.initDags(); err != nil { - log.Printf("failed to init entry dags %v", err) + er.logger.Error("failed to init entry dags", tag.Error(err)) } go er.watchDags() return er @@ -45,6 +47,7 @@ type EntryReader struct { dagsLock sync.Mutex dags map[string]*dag.DAG jf JobFactory + logger logger.Logger } func (er *EntryReader) Read(now time.Time) ([]*scheduler.Entry, error) { @@ -60,6 +63,7 @@ func (er *EntryReader) Read(now time.Time) ([]*scheduler.Entry, error) { // TODO: fix this Job: er.jf.NewJob(d, next), EntryType: e, + Logger: er.logger, }) } } @@ -84,19 +88,19 @@ func (er *EntryReader) initDags() error { if err != nil { return err } - fileNames := []string{} + var fileNames []string for _, fi := range fis { if utils.MatchExtension(fi.Name(), dag.EXTENSIONS) { workflow, err := cl.LoadMetadataOnly(filepath.Join(er.dagsDir, fi.Name())) if err != nil { - log.Printf("init dags failed to read workflow cfg: %s", err) + er.logger.Error("failed to read workflow cfg", tag.Error(err)) continue } er.dags[fi.Name()] = workflow fileNames = append(fileNames, fi.Name()) } } - log.Printf("init backend dags: %s", strings.Join(fileNames, ",")) + er.logger.Info("init backend dags", "files", strings.Join(fileNames, ",")) return nil } @@ -104,7 +108,8 @@ func (er *EntryReader) watchDags() { cl := dag.Loader{} watcher, err := filenotify.New(time.Minute) if err != nil { - log.Fatal(err) + er.logger.Error("failed to init file watcher", tag.Error(err)) + return } defer func() { _ = watcher.Close() @@ -123,22 +128,22 @@ func (er *EntryReader) watchDags() { if event.Op == fsnotify.Create || event.Op == fsnotify.Write { workflow, err := cl.LoadMetadataOnly(filepath.Join(er.dagsDir, filepath.Base(event.Name))) if err != nil { - log.Printf("failed to read workflow cfg: %s", err) + er.logger.Error("failed to read workflow cfg", tag.Error(err)) } else { er.dags[filepath.Base(event.Name)] = workflow - log.Printf("reload workflow entry %s", event.Name) + er.logger.Info("reload workflow entry", "file", event.Name) } } if event.Op == fsnotify.Rename || event.Op == fsnotify.Remove { delete(er.dags, filepath.Base(event.Name)) - log.Printf("remove dag entry %s", event.Name) + er.logger.Info("remove workflow entry", "file", event.Name) } er.dagsLock.Unlock() case err, ok := <-watcher.Errors(): if !ok { return } - log.Println("watch entry dags error:", err) + er.logger.Error("watch entry dags error", tag.Error(err)) } } diff --git a/service/scheduler/scheduler/scheduler.go b/service/scheduler/scheduler/scheduler.go index 2674d056..134790b4 100644 --- a/service/scheduler/scheduler/scheduler.go +++ b/service/scheduler/scheduler/scheduler.go @@ -3,15 +3,14 @@ package scheduler import ( "fmt" "github.com/dagu-dev/dagu/internal/dag" - "log" + "github.com/dagu-dev/dagu/internal/logger" + "github.com/dagu-dev/dagu/internal/utils" "os" "os/signal" "path" "sort" "syscall" "time" - - "github.com/dagu-dev/dagu/internal/utils" ) type Scheduler struct { @@ -19,6 +18,7 @@ type Scheduler struct { logDir string stop chan struct{} running bool + logger logger.Logger } type EntryReader interface { @@ -29,6 +29,7 @@ type Entry struct { Next time.Time Job Job EntryType Type + Logger logger.Logger } type Job interface { @@ -53,13 +54,13 @@ func (e *Entry) Invoke() error { } switch e.EntryType { case Start: - log.Printf("[%s] start %s", e.Next.Format("2006-01-02 15:04:05"), e.Job.String()) + e.Logger.Info("start job", "job", e.Job.String(), "time", e.Next.Format("2006-01-02 15:04:05")) return e.Job.Start() case Stop: - log.Printf("[%s] stop %s", e.Next.Format("2006-01-02 15:04:05"), e.Job.String()) + e.Logger.Info("stop job", "job", e.Job.String(), "time", e.Next.Format("2006-01-02 15:04:05")) return e.Job.Stop() case Restart: - log.Printf("[%s] restart %s", e.Next.Format("2006-01-02 15:04:05"), e.Job.String()) + e.Logger.Info("restart job", "job", e.Job.String(), "time", e.Next.Format("2006-01-02 15:04:05")) return e.Job.Restart() } return nil @@ -67,6 +68,7 @@ func (e *Entry) Invoke() error { type Params struct { EntryReader EntryReader + Logger logger.Logger LogDir string } @@ -76,6 +78,7 @@ func New(params Params) *Scheduler { logDir: params.LogDir, stop: make(chan struct{}), running: false, + logger: params.Logger, } } @@ -91,7 +94,7 @@ func (s *Scheduler) Start() error { s.Stop() }() - log.Printf("starting dagu scheduler") + s.logger.Info("starting scheduler") s.start() return nil @@ -103,9 +106,7 @@ func (s *Scheduler) setupLogFile() (err error) { if err := os.MkdirAll(dir, 0755); err != nil { return err } - // TODO: fix this to use logger - log.Printf("setup log file: %s", filename) - log.Print("log file is ready") + s.logger.Info("setup log", "filename", filename) return } @@ -140,7 +141,7 @@ func (s *Scheduler) run(now time.Time) { go func(e *Entry) { err := e.Invoke() if err != nil { - log.Printf("backend: entry failed %s: %v", e.Job, err) + s.logger.Error("failed to invoke entry", "entry", e.Job, "error", err) } }(e) }