Skip to content

Commit

Permalink
refactor scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
yohamta committed Aug 31, 2023
1 parent 94d477d commit 24501fa
Show file tree
Hide file tree
Showing 10 changed files with 11 additions and 11 deletions.
8 changes: 4 additions & 4 deletions service/scheduler/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"github.com/dagu-dev/dagu/internal/config"
"github.com/dagu-dev/dagu/internal/logger"
"github.com/dagu-dev/dagu/service/scheduler/entry"
"github.com/dagu-dev/dagu/service/scheduler/entry_reader"
"github.com/dagu-dev/dagu/service/scheduler/scheduler"
"go.uber.org/fx"
)
Expand All @@ -23,11 +23,11 @@ type Params struct {
EntryReader scheduler.EntryReader
}

func EntryReaderProvider(cfg *config.Config, jf entry.JobFactory, logger logger.Logger) scheduler.EntryReader {
return entry.NewEntryReader(cfg.DAGs, jf, logger)
func EntryReaderProvider(cfg *config.Config, jf entry_reader.JobFactory, logger logger.Logger) scheduler.EntryReader {
return entry_reader.NewEntryReader(cfg.DAGs, jf, logger)
}

func JobFactoryProvider(cfg *config.Config) entry.JobFactory {
func JobFactoryProvider(cfg *config.Config) entry_reader.JobFactory {
return &jobFactory{
Command: cfg.Command,
WorkDir: cfg.WorkDir,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package entry
package entry_reader

import (
"github.com/dagu-dev/dagu/internal/logger"
Expand Down Expand Up @@ -35,7 +35,7 @@ func NewEntryReader(dagsDir string, jf JobFactory, logger logger.Logger) *EntryR
logger: logger,
}
if err := er.initDags(); err != nil {
er.logger.Error("failed to init entry dags", tag.Error(err))
er.logger.Error("failed to init entry_reader dags", tag.Error(err))
}
go er.watchDags()
return er
Expand Down Expand Up @@ -131,19 +131,19 @@ func (er *EntryReader) watchDags() {
er.logger.Error("failed to read workflow cfg", tag.Error(err))
} else {
er.dags[filepath.Base(event.Name)] = workflow
er.logger.Info("reload workflow entry", "file", event.Name)
er.logger.Info("reload workflow entry_reader", "file", event.Name)
}
}
if event.Op == fsnotify.Rename || event.Op == fsnotify.Remove {
delete(er.dags, filepath.Base(event.Name))
er.logger.Info("remove workflow entry", "file", event.Name)
er.logger.Info("remove workflow entry_reader", "file", event.Name)
}
er.dagsLock.Unlock()
case err, ok := <-watcher.Errors():
if !ok {
return
}
er.logger.Error("watch entry dags error", tag.Error(err))
er.logger.Error("watch entry_reader dags error", tag.Error(err))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package entry
package entry_reader

import (
"github.com/dagu-dev/dagu/internal/dag"
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion service/scheduler/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (s *Scheduler) run(now time.Time) {
go func(e *Entry) {
err := e.Invoke()
if err != nil {
s.logger.Error("failed to invoke entry", "entry", e.Job, "error", err)
s.logger.Error("failed to invoke entry_reader", "entry_reader", e.Job, "error", err)
}
}(e)
}
Expand Down

0 comments on commit 24501fa

Please sign in to comment.