Skip to content

Commit

Permalink
refactor scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
yohamta committed Aug 31, 2023
1 parent 0878358 commit 94d477d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 25 deletions.
8 changes: 5 additions & 3 deletions service/scheduler/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type Params struct {
EntryReader scheduler.EntryReader
}

func EntryReaderProvider(cfg *config.Config, jf entry.JobFactory) scheduler.EntryReader {
return entry.NewEntryReader(cfg.DAGs, jf)
func EntryReaderProvider(cfg *config.Config, jf entry.JobFactory, logger logger.Logger) scheduler.EntryReader {
return entry.NewEntryReader(cfg.DAGs, jf, logger)
}

func JobFactoryProvider(cfg *config.Config) entry.JobFactory {
Expand All @@ -37,7 +37,9 @@ func JobFactoryProvider(cfg *config.Config) entry.JobFactory {
func New(params Params) *scheduler.Scheduler {
return scheduler.New(scheduler.Params{
EntryReader: params.EntryReader,
LogDir: params.Config.LogDir,
Logger: params.Logger,
// TODO: check this is used
LogDir: params.Config.LogDir,
})
}

Expand Down
27 changes: 16 additions & 11 deletions service/scheduler/entry/entry.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package entry

import (
"github.com/dagu-dev/dagu/internal/logger"
"github.com/dagu-dev/dagu/internal/logger/tag"
"github.com/dagu-dev/dagu/service/scheduler/filenotify"
"github.com/dagu-dev/dagu/service/scheduler/scheduler"
"log"
"os"
"path/filepath"
"strings"
Expand All @@ -22,7 +23,7 @@ type JobFactory interface {
NewJob(dag *dag.DAG, next time.Time) scheduler.Job
}

func NewEntryReader(dagsDir string, jf JobFactory) *EntryReader {
func NewEntryReader(dagsDir string, jf JobFactory, logger logger.Logger) *EntryReader {
er := &EntryReader{
dagsDir: dagsDir,
suspendChecker: suspend.NewSuspendChecker(
Expand All @@ -31,9 +32,10 @@ func NewEntryReader(dagsDir string, jf JobFactory) *EntryReader {
dagsLock: sync.Mutex{},
dags: map[string]*dag.DAG{},
jf: jf,
logger: logger,
}
if err := er.initDags(); err != nil {
log.Printf("failed to init entry dags %v", err)
er.logger.Error("failed to init entry dags", tag.Error(err))
}
go er.watchDags()
return er
Expand All @@ -45,6 +47,7 @@ type EntryReader struct {
dagsLock sync.Mutex
dags map[string]*dag.DAG
jf JobFactory
logger logger.Logger
}

func (er *EntryReader) Read(now time.Time) ([]*scheduler.Entry, error) {
Expand All @@ -60,6 +63,7 @@ func (er *EntryReader) Read(now time.Time) ([]*scheduler.Entry, error) {
// TODO: fix this
Job: er.jf.NewJob(d, next),
EntryType: e,
Logger: er.logger,
})
}
}
Expand All @@ -84,27 +88,28 @@ func (er *EntryReader) initDags() error {
if err != nil {
return err
}
fileNames := []string{}
var fileNames []string
for _, fi := range fis {
if utils.MatchExtension(fi.Name(), dag.EXTENSIONS) {
workflow, err := cl.LoadMetadataOnly(filepath.Join(er.dagsDir, fi.Name()))
if err != nil {
log.Printf("init dags failed to read workflow cfg: %s", err)
er.logger.Error("failed to read workflow cfg", tag.Error(err))
continue
}
er.dags[fi.Name()] = workflow
fileNames = append(fileNames, fi.Name())
}
}
log.Printf("init backend dags: %s", strings.Join(fileNames, ","))
er.logger.Info("init backend dags", "files", strings.Join(fileNames, ","))
return nil
}

func (er *EntryReader) watchDags() {
cl := dag.Loader{}
watcher, err := filenotify.New(time.Minute)
if err != nil {
log.Fatal(err)
er.logger.Error("failed to init file watcher", tag.Error(err))
return
}
defer func() {
_ = watcher.Close()
Expand All @@ -123,22 +128,22 @@ func (er *EntryReader) watchDags() {
if event.Op == fsnotify.Create || event.Op == fsnotify.Write {
workflow, err := cl.LoadMetadataOnly(filepath.Join(er.dagsDir, filepath.Base(event.Name)))
if err != nil {
log.Printf("failed to read workflow cfg: %s", err)
er.logger.Error("failed to read workflow cfg", tag.Error(err))
} else {
er.dags[filepath.Base(event.Name)] = workflow
log.Printf("reload workflow entry %s", event.Name)
er.logger.Info("reload workflow entry", "file", event.Name)
}
}
if event.Op == fsnotify.Rename || event.Op == fsnotify.Remove {
delete(er.dags, filepath.Base(event.Name))
log.Printf("remove dag entry %s", event.Name)
er.logger.Info("remove workflow entry", "file", event.Name)
}
er.dagsLock.Unlock()
case err, ok := <-watcher.Errors():
if !ok {
return
}
log.Println("watch entry dags error:", err)
er.logger.Error("watch entry dags error", tag.Error(err))
}
}

Expand Down
23 changes: 12 additions & 11 deletions service/scheduler/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ package scheduler
import (
"fmt"
"github.com/dagu-dev/dagu/internal/dag"
"log"
"github.com/dagu-dev/dagu/internal/logger"
"github.com/dagu-dev/dagu/internal/utils"
"os"
"os/signal"
"path"
"sort"
"syscall"
"time"

"github.com/dagu-dev/dagu/internal/utils"
)

type Scheduler struct {
entryReader EntryReader
logDir string
stop chan struct{}
running bool
logger logger.Logger
}

type EntryReader interface {
Expand All @@ -29,6 +29,7 @@ type Entry struct {
Next time.Time
Job Job
EntryType Type
Logger logger.Logger
}

type Job interface {
Expand All @@ -53,20 +54,21 @@ func (e *Entry) Invoke() error {
}
switch e.EntryType {
case Start:
log.Printf("[%s] start %s", e.Next.Format("2006-01-02 15:04:05"), e.Job.String())
e.Logger.Info("start job", "job", e.Job.String(), "time", e.Next.Format("2006-01-02 15:04:05"))
return e.Job.Start()
case Stop:
log.Printf("[%s] stop %s", e.Next.Format("2006-01-02 15:04:05"), e.Job.String())
e.Logger.Info("stop job", "job", e.Job.String(), "time", e.Next.Format("2006-01-02 15:04:05"))
return e.Job.Stop()
case Restart:
log.Printf("[%s] restart %s", e.Next.Format("2006-01-02 15:04:05"), e.Job.String())
e.Logger.Info("restart job", "job", e.Job.String(), "time", e.Next.Format("2006-01-02 15:04:05"))
return e.Job.Restart()
}
return nil
}

type Params struct {
EntryReader EntryReader
Logger logger.Logger
LogDir string
}

Expand All @@ -76,6 +78,7 @@ func New(params Params) *Scheduler {
logDir: params.LogDir,
stop: make(chan struct{}),
running: false,
logger: params.Logger,
}
}

Expand All @@ -91,7 +94,7 @@ func (s *Scheduler) Start() error {
s.Stop()
}()

log.Printf("starting dagu scheduler")
s.logger.Info("starting scheduler")
s.start()

return nil
Expand All @@ -103,9 +106,7 @@ func (s *Scheduler) setupLogFile() (err error) {
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
// TODO: fix this to use logger
log.Printf("setup log file: %s", filename)
log.Print("log file is ready")
s.logger.Info("setup log", "filename", filename)
return
}

Expand Down Expand Up @@ -140,7 +141,7 @@ func (s *Scheduler) run(now time.Time) {
go func(e *Entry) {
err := e.Invoke()
if err != nil {
log.Printf("backend: entry failed %s: %v", e.Job, err)
s.logger.Error("failed to invoke entry", "entry", e.Job, "error", err)
}
}(e)
}
Expand Down

0 comments on commit 94d477d

Please sign in to comment.