From 24501fac06f9a0ed0ce9ffe8afd771c7af34b04c Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Thu, 31 Aug 2023 21:57:50 +0900 Subject: [PATCH] refactor scheduler --- service/scheduler/app.go | 8 ++++---- .../{entry/entry.go => entry_reader/entry_reader.go} | 10 +++++----- .../entry_reader_test.go} | 2 +- .../testdata/invalid_schedule.yaml | 0 .../testdata/non_scheduled_job.yaml | 0 .../testdata/scheduled_job.yaml | 0 .../{entry => entry_reader}/testdata/start.yaml | 0 .../{entry => entry_reader}/testdata/start_stop.yaml | 0 .../{entry => entry_reader}/testdata/stop.yaml | 0 service/scheduler/scheduler/scheduler.go | 2 +- 10 files changed, 11 insertions(+), 11 deletions(-) rename service/scheduler/{entry/entry.go => entry_reader/entry_reader.go} (92%) rename service/scheduler/{entry/entry_test.go => entry_reader/entry_reader_test.go} (99%) rename service/scheduler/{entry => entry_reader}/testdata/invalid_schedule.yaml (100%) rename service/scheduler/{entry => entry_reader}/testdata/non_scheduled_job.yaml (100%) rename service/scheduler/{entry => entry_reader}/testdata/scheduled_job.yaml (100%) rename service/scheduler/{entry => entry_reader}/testdata/start.yaml (100%) rename service/scheduler/{entry => entry_reader}/testdata/start_stop.yaml (100%) rename service/scheduler/{entry => entry_reader}/testdata/stop.yaml (100%) diff --git a/service/scheduler/app.go b/service/scheduler/app.go index 4fda6d6b..4e95a360 100644 --- a/service/scheduler/app.go +++ b/service/scheduler/app.go @@ -4,7 +4,7 @@ import ( "context" "github.com/dagu-dev/dagu/internal/config" "github.com/dagu-dev/dagu/internal/logger" - "github.com/dagu-dev/dagu/service/scheduler/entry" + "github.com/dagu-dev/dagu/service/scheduler/entry_reader" "github.com/dagu-dev/dagu/service/scheduler/scheduler" "go.uber.org/fx" ) @@ -23,11 +23,11 @@ type Params struct { EntryReader scheduler.EntryReader } -func EntryReaderProvider(cfg *config.Config, jf entry.JobFactory, logger logger.Logger) scheduler.EntryReader { - return entry.NewEntryReader(cfg.DAGs, jf, logger) +func EntryReaderProvider(cfg *config.Config, jf entry_reader.JobFactory, logger logger.Logger) scheduler.EntryReader { + return entry_reader.NewEntryReader(cfg.DAGs, jf, logger) } -func JobFactoryProvider(cfg *config.Config) entry.JobFactory { +func JobFactoryProvider(cfg *config.Config) entry_reader.JobFactory { return &jobFactory{ Command: cfg.Command, WorkDir: cfg.WorkDir, diff --git a/service/scheduler/entry/entry.go b/service/scheduler/entry_reader/entry_reader.go similarity index 92% rename from service/scheduler/entry/entry.go rename to service/scheduler/entry_reader/entry_reader.go index bf22abd8..07595d7d 100644 --- a/service/scheduler/entry/entry.go +++ b/service/scheduler/entry_reader/entry_reader.go @@ -1,4 +1,4 @@ -package entry +package entry_reader import ( "github.com/dagu-dev/dagu/internal/logger" @@ -35,7 +35,7 @@ func NewEntryReader(dagsDir string, jf JobFactory, logger logger.Logger) *EntryR logger: logger, } if err := er.initDags(); err != nil { - er.logger.Error("failed to init entry dags", tag.Error(err)) + er.logger.Error("failed to init entry_reader dags", tag.Error(err)) } go er.watchDags() return er @@ -131,19 +131,19 @@ func (er *EntryReader) watchDags() { er.logger.Error("failed to read workflow cfg", tag.Error(err)) } else { er.dags[filepath.Base(event.Name)] = workflow - er.logger.Info("reload workflow entry", "file", event.Name) + er.logger.Info("reload workflow entry_reader", "file", event.Name) } } if event.Op == fsnotify.Rename || event.Op == fsnotify.Remove { delete(er.dags, filepath.Base(event.Name)) - er.logger.Info("remove workflow entry", "file", event.Name) + er.logger.Info("remove workflow entry_reader", "file", event.Name) } er.dagsLock.Unlock() case err, ok := <-watcher.Errors(): if !ok { return } - er.logger.Error("watch entry dags error", tag.Error(err)) + er.logger.Error("watch entry_reader dags error", tag.Error(err)) } } diff --git a/service/scheduler/entry/entry_test.go b/service/scheduler/entry_reader/entry_reader_test.go similarity index 99% rename from service/scheduler/entry/entry_test.go rename to service/scheduler/entry_reader/entry_reader_test.go index ffe7be94..b453e977 100644 --- a/service/scheduler/entry/entry_test.go +++ b/service/scheduler/entry_reader/entry_reader_test.go @@ -1,4 +1,4 @@ -package entry +package entry_reader import ( "github.com/dagu-dev/dagu/internal/dag" diff --git a/service/scheduler/entry/testdata/invalid_schedule.yaml b/service/scheduler/entry_reader/testdata/invalid_schedule.yaml similarity index 100% rename from service/scheduler/entry/testdata/invalid_schedule.yaml rename to service/scheduler/entry_reader/testdata/invalid_schedule.yaml diff --git a/service/scheduler/entry/testdata/non_scheduled_job.yaml b/service/scheduler/entry_reader/testdata/non_scheduled_job.yaml similarity index 100% rename from service/scheduler/entry/testdata/non_scheduled_job.yaml rename to service/scheduler/entry_reader/testdata/non_scheduled_job.yaml diff --git a/service/scheduler/entry/testdata/scheduled_job.yaml b/service/scheduler/entry_reader/testdata/scheduled_job.yaml similarity index 100% rename from service/scheduler/entry/testdata/scheduled_job.yaml rename to service/scheduler/entry_reader/testdata/scheduled_job.yaml diff --git a/service/scheduler/entry/testdata/start.yaml b/service/scheduler/entry_reader/testdata/start.yaml similarity index 100% rename from service/scheduler/entry/testdata/start.yaml rename to service/scheduler/entry_reader/testdata/start.yaml diff --git a/service/scheduler/entry/testdata/start_stop.yaml b/service/scheduler/entry_reader/testdata/start_stop.yaml similarity index 100% rename from service/scheduler/entry/testdata/start_stop.yaml rename to service/scheduler/entry_reader/testdata/start_stop.yaml diff --git a/service/scheduler/entry/testdata/stop.yaml b/service/scheduler/entry_reader/testdata/stop.yaml similarity index 100% rename from service/scheduler/entry/testdata/stop.yaml rename to service/scheduler/entry_reader/testdata/stop.yaml diff --git a/service/scheduler/scheduler/scheduler.go b/service/scheduler/scheduler/scheduler.go index 134790b4..b4a97dd2 100644 --- a/service/scheduler/scheduler/scheduler.go +++ b/service/scheduler/scheduler/scheduler.go @@ -141,7 +141,7 @@ func (s *Scheduler) run(now time.Time) { go func(e *Entry) { err := e.Invoke() if err != nil { - s.logger.Error("failed to invoke entry", "entry", e.Job, "error", err) + s.logger.Error("failed to invoke entry_reader", "entry_reader", e.Job, "error", err) } }(e) }