From d0938f1198e0c10b7c2ddc7187c6fd43448a542f Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Sat, 11 May 2024 21:59:58 +0900 Subject: [PATCH] Implement file cache (#564) * fix: minor goroutine leak * fix: minor goroutine leak * implement file cache for history data * implement DAG file cache * fix cache stale criteria * update go version to 1.20 * chore: update go version for gh action --- .github/workflows/release.yaml | 2 +- .github/workflows/test.yaml | 2 +- cmd/common_test.go | 1 - cmd/status_test.go | 7 +- cmd/stop_test.go | 2 +- cmd/testdata/status.yaml | 2 +- go.mod | 3 +- go.sum | 2 + internal/persistence/client/store_factory.go | 22 ++++-- internal/persistence/filecache/filecache.go | 76 +++++++++++++++++++ .../jsondb/{database.go => store.go} | 32 +++++--- .../{database_test.go => store_test.go} | 0 internal/persistence/local/dag_store.go | 21 +++-- .../scheduler/entry_reader/entry_reader.go | 9 ++- .../entry_reader/entry_reader_test.go | 12 +++ service/scheduler/scheduler/scheduler.go | 15 +++- service/scheduler/scheduler/scheduler_test.go | 5 ++ 17 files changed, 173 insertions(+), 40 deletions(-) create mode 100644 internal/persistence/filecache/filecache.go rename internal/persistence/jsondb/{database.go => store.go} (93%) rename internal/persistence/jsondb/{database_test.go => store_test.go} (100%) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index fd0b25d7..6ea60a13 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -11,7 +11,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: 1.19.x + go-version: 1.20.x - name: Installing swagger run: | diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index bdb47fd4..f0be5181 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -8,7 +8,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: 1.19.x + go-version: 1.20.x - name: Installing swagger run: | diff --git a/cmd/common_test.go b/cmd/common_test.go index 28068de8..7c05c60a 100644 --- a/cmd/common_test.go +++ b/cmd/common_test.go @@ -117,7 +117,6 @@ func testStatusEventual(t *testing.T, e engine.Engine, dagFile string, expected func testLastStatusEventual(t *testing.T, hs persistence.HistoryStore, dag string, expected scheduler.Status) { t.Helper() require.Eventually(t, func() bool { - // TODO: do not use history store directly. status := hs.ReadStatusRecent(dag, 1) if len(status) < 1 { return false diff --git a/cmd/status_test.go b/cmd/status_test.go index a5df2f1c..45edee94 100644 --- a/cmd/status_test.go +++ b/cmd/status_test.go @@ -1,11 +1,9 @@ package cmd import ( + "github.com/dagu-dev/dagu/internal/scheduler" "os" "testing" - "time" - - "github.com/dagu-dev/dagu/internal/scheduler" ) func TestStatusCommand(t *testing.T) { @@ -23,9 +21,6 @@ func TestStatusCommand(t *testing.T) { close(done) }() - time.Sleep(time.Millisecond * 50) - - // TODO: do not use history store directly. testLastStatusEventual(t, df.NewHistoryStore(), dagFile, scheduler.StatusRunning) // Check the current status. diff --git a/cmd/stop_test.go b/cmd/stop_test.go index 12bd4fe6..1568cee9 100644 --- a/cmd/stop_test.go +++ b/cmd/stop_test.go @@ -23,7 +23,7 @@ func TestStopCommand(t *testing.T) { close(done) }() - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Millisecond * 100) // Wait for the DAG running. // TODO: Do not use history store. diff --git a/cmd/testdata/status.yaml b/cmd/testdata/status.yaml index 90c67a1e..1f868815 100644 --- a/cmd/testdata/status.yaml +++ b/cmd/testdata/status.yaml @@ -1,3 +1,3 @@ steps: - name: "1" - command: "sleep 1" \ No newline at end of file + command: "sleep 1000" diff --git a/go.mod b/go.mod index bac7f168..a924808d 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/dagu-dev/dagu -go 1.19 +go 1.20 require ( github.com/docker/docker v20.10.21+incompatible @@ -63,6 +63,7 @@ require ( go.mongodb.org/mongo-driver v1.11.3 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/dig v1.17.0 // indirect + go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.23.0 // indirect golang.org/x/mod v0.12.0 // indirect diff --git a/go.sum b/go.sum index 1c2dbb2a..684876fd 100644 --- a/go.sum +++ b/go.sum @@ -367,6 +367,8 @@ go.uber.org/dig v1.17.0/go.mod h1:rTxpf7l5I0eBTlE6/9RL+lDybC7WFwY2QH55ZSjy1mU= go.uber.org/fx v1.20.0 h1:ZMC/pnRvhsthOZh9MZjMq5U8Or3mA9zBSPaLnzs3ihQ= go.uber.org/fx v1.20.0/go.mod h1:qCUj0btiR3/JnanEr1TYEePfSw6o/4qYJscgvzQ5Ub0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= diff --git a/internal/persistence/client/store_factory.go b/internal/persistence/client/store_factory.go index 9f7d6931..dcbcea0a 100644 --- a/internal/persistence/client/store_factory.go +++ b/internal/persistence/client/store_factory.go @@ -11,7 +11,9 @@ import ( ) type dataStoreFactoryImpl struct { - cfg *config.Config + cfg *config.Config + historyStore persistence.HistoryStore + dagStore persistence.DAGStore } var _ persistence.DataStoreFactory = (*dataStoreFactoryImpl)(nil) @@ -24,7 +26,7 @@ func NewDataStoreFactory(cfg *config.Config) persistence.DataStoreFactory { return ds } -func (f dataStoreFactoryImpl) InitDagDir() error { +func (f *dataStoreFactoryImpl) InitDagDir() error { _, err := os.Stat(f.cfg.DAGs) if os.IsNotExist(err) { if err := os.MkdirAll(f.cfg.DAGs, 0755); err != nil { @@ -35,16 +37,22 @@ func (f dataStoreFactoryImpl) InitDagDir() error { return nil } -func (f dataStoreFactoryImpl) NewHistoryStore() persistence.HistoryStore { +func (f *dataStoreFactoryImpl) NewHistoryStore() persistence.HistoryStore { // TODO: Add support for other data stores (e.g. sqlite, postgres, etc.) - return jsondb.New(f.cfg.DataDir, f.cfg.DAGs) + if f.historyStore == nil { + f.historyStore = jsondb.New(f.cfg.DataDir, f.cfg.DAGs) + } + return f.historyStore } -func (f dataStoreFactoryImpl) NewDAGStore() persistence.DAGStore { - return local.NewDAGStore(f.cfg.DAGs) +func (f *dataStoreFactoryImpl) NewDAGStore() persistence.DAGStore { + if f.dagStore == nil { + f.dagStore = local.NewDAGStore(f.cfg.DAGs) + } + return f.dagStore } -func (f dataStoreFactoryImpl) NewFlagStore() persistence.FlagStore { +func (f *dataStoreFactoryImpl) NewFlagStore() persistence.FlagStore { s := storage.NewStorage(f.cfg.SuspendFlagsDir) return local.NewFlagStore(s) } diff --git a/internal/persistence/filecache/filecache.go b/internal/persistence/filecache/filecache.go new file mode 100644 index 00000000..a7f46802 --- /dev/null +++ b/internal/persistence/filecache/filecache.go @@ -0,0 +1,76 @@ +package filecache + +import ( + "fmt" + "os" + "sync" +) + +type Cache[T any] struct { + entries sync.Map +} + +type Entry[T any] struct { + Data T + Size int64 + LastModified int64 +} + +func New[T any]() *Cache[T] { + return &Cache[T]{} +} + +func (c *Cache[T]) Store(fileName string, data T, fi os.FileInfo) { + c.entries.Store(fileName, Entry[T]{Data: data, Size: fi.Size(), LastModified: fi.ModTime().Unix()}) +} + +func (c *Cache[T]) Invalidate(fileName string) { + c.entries.Delete(fileName) +} + +func (c *Cache[T]) LoadLatest(fileName string, loader func() (T, error)) (T, error) { + stale, lastModified, err := c.IsStale(fileName, c.Entry(fileName)) + if err != nil { + var zero T + return zero, err + } + if stale { + data, err := loader() + if err != nil { + var zero T + return zero, err + } + c.Store(fileName, data, lastModified) + return data, nil + } + item, _ := c.entries.Load(fileName) + entry := item.(Entry[T]) + return entry.Data, nil +} + +func (c *Cache[T]) Entry(fileName string) Entry[T] { + item, ok := c.entries.Load(fileName) + if !ok { + return Entry[T]{} + } + return item.(Entry[T]) +} + +func (c *Cache[T]) Load(fileName string) (T, bool) { + item, ok := c.entries.Load(fileName) + if !ok { + var zero T + return zero, false + } + entry := item.(Entry[T]) + return entry.Data, true +} + +func (c *Cache[T]) IsStale(fileName string, entry Entry[T]) (bool, os.FileInfo, error) { + fi, err := os.Stat(fileName) + if err != nil { + return true, fi, fmt.Errorf("failed to stat file %s: %w", fileName, err) + } + t := fi.ModTime().Unix() + return entry.LastModified < t || entry.Size != fi.Size(), fi, nil +} diff --git a/internal/persistence/jsondb/database.go b/internal/persistence/jsondb/store.go similarity index 93% rename from internal/persistence/jsondb/database.go rename to internal/persistence/jsondb/store.go index 967db13c..ad90c856 100644 --- a/internal/persistence/jsondb/database.go +++ b/internal/persistence/jsondb/store.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/dagu-dev/dagu/internal/persistence/filecache" "io" "log" "os" @@ -35,6 +36,7 @@ type Store struct { dir string dagsDir string writer *writer + cache *filecache.Cache[*model.Status] } var ( @@ -46,7 +48,11 @@ var ( // New creates a new Store with default configuration. func New(dir, dagsDir string) *Store { // dagsDir is used to calculate the directory that is compatible with the old version. - return &Store{dir: dir, dagsDir: dagsDir} + return &Store{ + dir: dir, + dagsDir: dagsDir, + cache: filecache.New[*model.Status](), + } } func (store *Store) Update(dagFile, requestId string, s *model.Status) error { @@ -59,6 +65,7 @@ func (store *Store) Update(dagFile, requestId string, s *model.Status) error { return err } defer func() { + store.cache.Invalidate(f.File) _ = w.close() }() return w.write(s) @@ -91,6 +98,7 @@ func (store *Store) Close() error { if err := store.Compact(store.writer.dagFile, store.writer.target); err != nil { return err } + store.cache.Invalidate(store.writer.target) return store.writer.close() } @@ -142,25 +150,31 @@ func (store *Store) ReadStatusRecent(dagFile string, n int) []*model.StatusFile var ret []*model.StatusFile files := store.latest(store.pattern(dagFile)+"*.dat", n) for _, file := range files { - status, err := ParseFile(file) - if err == nil { - ret = append(ret, &model.StatusFile{ - File: file, - Status: status, - }) + status, err := store.cache.LoadLatest(file, func() (*model.Status, error) { + return ParseFile(file) + }) + if err != nil { + continue } + ret = append(ret, &model.StatusFile{ + File: file, + Status: status, + }) } return ret } // ReadStatusToday returns a list of status files. func (store *Store) ReadStatusToday(dagFile string) (*model.Status, error) { + // TODO: let's fix below not to use config here readLatestStatus := config.Get().LatestStatusToday file, err := store.latestToday(dagFile, time.Now(), readLatestStatus) if err != nil { return nil, err } - return ParseFile(file) + return store.cache.LoadLatest(file, func() (*model.Status, error) { + return ParseFile(file) + }) } // FindByRequestId finds a status file by requestId. @@ -365,7 +379,7 @@ func readLineFrom(f *os.File, offset int64) ([]byte, error) { return nil, err } r := bufio.NewReader(f) - ret := []byte{} + var ret []byte for { b, isPrefix, err := r.ReadLine() if err == io.EOF { diff --git a/internal/persistence/jsondb/database_test.go b/internal/persistence/jsondb/store_test.go similarity index 100% rename from internal/persistence/jsondb/database_test.go rename to internal/persistence/jsondb/store_test.go diff --git a/internal/persistence/local/dag_store.go b/internal/persistence/local/dag_store.go index b5ba02c1..d0107d9a 100644 --- a/internal/persistence/local/dag_store.go +++ b/internal/persistence/local/dag_store.go @@ -3,6 +3,7 @@ package local import ( "errors" "fmt" + "github.com/dagu-dev/dagu/internal/persistence/filecache" "os" "path" "path/filepath" @@ -15,11 +16,15 @@ import ( ) type dagStoreImpl struct { - dir string + dir string + metaCache *filecache.Cache[*dag.DAG] } func NewDAGStore(dir string) persistence.DAGStore { - return &dagStoreImpl{dir: dir} + return &dagStoreImpl{ + dir: dir, + metaCache: filecache.New[*dag.DAG](), + } } var ( @@ -40,12 +45,10 @@ func (d *dagStoreImpl) GetMetadata(name string) (*dag.DAG, error) { if err != nil { return nil, fmt.Errorf("%w: %s", errInvalidName, name) } - cl := dag.Loader{} - dat, err := cl.LoadMetadata(loc) - if err != nil { - return nil, err - } - return dat, nil + return d.metaCache.LoadLatest(loc, func() (*dag.DAG, error) { + cl := dag.Loader{} + return cl.LoadMetadata(loc) + }) } func (d *dagStoreImpl) GetDetails(name string) (*dag.DAG, error) { @@ -91,6 +94,7 @@ func (d *dagStoreImpl) UpdateSpec(name string, spec []byte) error { if err != nil { return fmt.Errorf("%w: %s", errFailedToUpdateDAGFile, err) } + d.metaCache.Invalidate(loc) return nil } @@ -117,6 +121,7 @@ func (d *dagStoreImpl) Delete(name string) error { if err != nil { return fmt.Errorf("%w: %s", errFailedToDeleteDAGFile, err) } + d.metaCache.Invalidate(loc) return nil } diff --git a/service/scheduler/entry_reader/entry_reader.go b/service/scheduler/entry_reader/entry_reader.go index 48477575..6db084af 100644 --- a/service/scheduler/entry_reader/entry_reader.go +++ b/service/scheduler/entry_reader/entry_reader.go @@ -50,10 +50,13 @@ func New(params Params) *EntryReader { if err := er.initDags(); err != nil { er.logger.Error("failed to init entry_reader dags", tag.Error(err)) } - go er.watchDags() return er } +func (er *EntryReader) Start(done chan any) { + go er.watchDags(done) +} + func (er *EntryReader) Read(now time.Time) ([]*scheduler.Entry, error) { var entries []*scheduler.Entry er.dagsLock.Lock() @@ -109,7 +112,7 @@ func (er *EntryReader) initDags() error { return nil } -func (er *EntryReader) watchDags() { +func (er *EntryReader) watchDags(done chan any) { cl := dag.Loader{} watcher, err := filenotify.New(time.Minute) if err != nil { @@ -122,6 +125,8 @@ func (er *EntryReader) watchDags() { _ = watcher.Add(er.dagsDir) for { select { + case <-done: + return case event, ok := <-watcher.Events(): if !ok { return diff --git a/service/scheduler/entry_reader/entry_reader_test.go b/service/scheduler/entry_reader/entry_reader_test.go index fc8d9b4e..2dc4976e 100644 --- a/service/scheduler/entry_reader/entry_reader_test.go +++ b/service/scheduler/entry_reader/entry_reader_test.go @@ -1,6 +1,7 @@ package entry_reader import ( + "go.uber.org/goleak" "os" "path" "testing" @@ -22,6 +23,12 @@ var ( testdataDir = path.Join(utils.MustGetwd(), "testdata") ) +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) + code := m.Run() + os.Exit(code) +} + // TODO: fix this tests to use mock func setupTest(t *testing.T) (string, engine.Factory) { t.Helper() @@ -67,6 +74,11 @@ func TestReadEntries(t *testing.T) { Logger: logger.NewSlogLogger(), EngineFactory: ef, }) + + done := make(chan any) + defer close(done) + er.Start(done) + entries, err = er.Read(now) require.NoError(t, err) require.GreaterOrEqual(t, len(entries), 1) diff --git a/service/scheduler/scheduler/scheduler.go b/service/scheduler/scheduler/scheduler.go index 3e874d6f..43b30324 100644 --- a/service/scheduler/scheduler/scheduler.go +++ b/service/scheduler/scheduler/scheduler.go @@ -24,6 +24,7 @@ type Scheduler struct { } type EntryReader interface { + Start(done chan any) Read(now time.Time) ([]*Entry, error) } @@ -89,10 +90,20 @@ func (s *Scheduler) Start() error { } sig := make(chan os.Signal, 1) + done := make(chan any) + defer close(done) + + s.entryReader.Start(done) + signal.Notify(sig, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + go func() { - <-sig - s.Stop() + select { + case <-done: + return + case <-sig: + s.Stop() + } }() s.logger.Info("starting scheduler") diff --git a/service/scheduler/scheduler/scheduler_test.go b/service/scheduler/scheduler/scheduler_test.go index fc046c57..fad340d8 100644 --- a/service/scheduler/scheduler/scheduler_test.go +++ b/service/scheduler/scheduler/scheduler_test.go @@ -1,6 +1,7 @@ package scheduler import ( + "go.uber.org/goleak" "os" "sync/atomic" "testing" @@ -20,6 +21,7 @@ var ( ) func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) tempDir := utils.MustTempDir("runner_test") changeHomeDir(tempDir) testHomeDir = tempDir @@ -93,6 +95,7 @@ func TestRestart(t *testing.T) { go func() { _ = r.Start() }() + defer r.Stop() time.Sleep(time.Second + time.Millisecond*100) require.Equal(t, int32(1), er.Entries[0].Job.(*mockJob).RestartCount.Load()) @@ -120,6 +123,8 @@ func (er *mockEntryReader) Read(_ time.Time) ([]*Entry, error) { return er.Entries, nil } +func (er *mockEntryReader) Start(chan any) {} + // TODO: fix to use mock library type mockJob struct { Name string