Skip to content

Commit

Permalink
Implement file cache (#564)
Browse files Browse the repository at this point in the history
* fix: minor goroutine leak

* fix: minor goroutine leak

* implement file cache for history data

* implement DAG file cache

* fix cache stale criteria

* update go version to 1.20

* chore: update go version for gh action
  • Loading branch information
yohamta authored May 11, 2024
1 parent cc7f6a2 commit d0938f1
Show file tree
Hide file tree
Showing 17 changed files with 173 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.19.x
go-version: 1.20.x

- name: Installing swagger
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.19.x
go-version: 1.20.x

- name: Installing swagger
run: |
Expand Down
1 change: 0 additions & 1 deletion cmd/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ func testStatusEventual(t *testing.T, e engine.Engine, dagFile string, expected
func testLastStatusEventual(t *testing.T, hs persistence.HistoryStore, dag string, expected scheduler.Status) {
t.Helper()
require.Eventually(t, func() bool {
// TODO: do not use history store directly.
status := hs.ReadStatusRecent(dag, 1)
if len(status) < 1 {
return false
Expand Down
7 changes: 1 addition & 6 deletions cmd/status_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package cmd

import (
"github.com/dagu-dev/dagu/internal/scheduler"
"os"
"testing"
"time"

"github.com/dagu-dev/dagu/internal/scheduler"
)

func TestStatusCommand(t *testing.T) {
Expand All @@ -23,9 +21,6 @@ func TestStatusCommand(t *testing.T) {
close(done)
}()

time.Sleep(time.Millisecond * 50)

// TODO: do not use history store directly.
testLastStatusEventual(t, df.NewHistoryStore(), dagFile, scheduler.StatusRunning)

// Check the current status.
Expand Down
2 changes: 1 addition & 1 deletion cmd/stop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestStopCommand(t *testing.T) {
close(done)
}()

time.Sleep(time.Millisecond * 50)
time.Sleep(time.Millisecond * 100)

// Wait for the DAG running.
// TODO: Do not use history store.
Expand Down
2 changes: 1 addition & 1 deletion cmd/testdata/status.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
steps:
- name: "1"
command: "sleep 1"
command: "sleep 1000"
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/dagu-dev/dagu

go 1.19
go 1.20

require (
github.com/docker/docker v20.10.21+incompatible
Expand Down Expand Up @@ -63,6 +63,7 @@ require (
go.mongodb.org/mongo-driver v1.11.3 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/mod v0.12.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ go.uber.org/dig v1.17.0/go.mod h1:rTxpf7l5I0eBTlE6/9RL+lDybC7WFwY2QH55ZSjy1mU=
go.uber.org/fx v1.20.0 h1:ZMC/pnRvhsthOZh9MZjMq5U8Or3mA9zBSPaLnzs3ihQ=
go.uber.org/fx v1.20.0/go.mod h1:qCUj0btiR3/JnanEr1TYEePfSw6o/4qYJscgvzQ5Ub0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=
Expand Down
22 changes: 15 additions & 7 deletions internal/persistence/client/store_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
)

type dataStoreFactoryImpl struct {
cfg *config.Config
cfg *config.Config
historyStore persistence.HistoryStore
dagStore persistence.DAGStore
}

var _ persistence.DataStoreFactory = (*dataStoreFactoryImpl)(nil)
Expand All @@ -24,7 +26,7 @@ func NewDataStoreFactory(cfg *config.Config) persistence.DataStoreFactory {
return ds
}

func (f dataStoreFactoryImpl) InitDagDir() error {
func (f *dataStoreFactoryImpl) InitDagDir() error {
_, err := os.Stat(f.cfg.DAGs)
if os.IsNotExist(err) {
if err := os.MkdirAll(f.cfg.DAGs, 0755); err != nil {
Expand All @@ -35,16 +37,22 @@ func (f dataStoreFactoryImpl) InitDagDir() error {
return nil
}

func (f dataStoreFactoryImpl) NewHistoryStore() persistence.HistoryStore {
func (f *dataStoreFactoryImpl) NewHistoryStore() persistence.HistoryStore {
// TODO: Add support for other data stores (e.g. sqlite, postgres, etc.)
return jsondb.New(f.cfg.DataDir, f.cfg.DAGs)
if f.historyStore == nil {
f.historyStore = jsondb.New(f.cfg.DataDir, f.cfg.DAGs)
}
return f.historyStore
}

func (f dataStoreFactoryImpl) NewDAGStore() persistence.DAGStore {
return local.NewDAGStore(f.cfg.DAGs)
func (f *dataStoreFactoryImpl) NewDAGStore() persistence.DAGStore {
if f.dagStore == nil {
f.dagStore = local.NewDAGStore(f.cfg.DAGs)
}
return f.dagStore
}

func (f dataStoreFactoryImpl) NewFlagStore() persistence.FlagStore {
func (f *dataStoreFactoryImpl) NewFlagStore() persistence.FlagStore {
s := storage.NewStorage(f.cfg.SuspendFlagsDir)
return local.NewFlagStore(s)
}
76 changes: 76 additions & 0 deletions internal/persistence/filecache/filecache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package filecache

import (
"fmt"
"os"
"sync"
)

type Cache[T any] struct {
entries sync.Map
}

type Entry[T any] struct {
Data T
Size int64
LastModified int64
}

func New[T any]() *Cache[T] {
return &Cache[T]{}
}

func (c *Cache[T]) Store(fileName string, data T, fi os.FileInfo) {
c.entries.Store(fileName, Entry[T]{Data: data, Size: fi.Size(), LastModified: fi.ModTime().Unix()})
}

func (c *Cache[T]) Invalidate(fileName string) {
c.entries.Delete(fileName)
}

func (c *Cache[T]) LoadLatest(fileName string, loader func() (T, error)) (T, error) {
stale, lastModified, err := c.IsStale(fileName, c.Entry(fileName))
if err != nil {
var zero T
return zero, err
}
if stale {
data, err := loader()
if err != nil {
var zero T
return zero, err
}
c.Store(fileName, data, lastModified)
return data, nil
}
item, _ := c.entries.Load(fileName)
entry := item.(Entry[T])
return entry.Data, nil
}

func (c *Cache[T]) Entry(fileName string) Entry[T] {
item, ok := c.entries.Load(fileName)
if !ok {
return Entry[T]{}
}
return item.(Entry[T])
}

func (c *Cache[T]) Load(fileName string) (T, bool) {
item, ok := c.entries.Load(fileName)
if !ok {
var zero T
return zero, false
}
entry := item.(Entry[T])
return entry.Data, true
}

func (c *Cache[T]) IsStale(fileName string, entry Entry[T]) (bool, os.FileInfo, error) {
fi, err := os.Stat(fileName)
if err != nil {
return true, fi, fmt.Errorf("failed to stat file %s: %w", fileName, err)
}
t := fi.ModTime().Unix()
return entry.LastModified < t || entry.Size != fi.Size(), fi, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/dagu-dev/dagu/internal/persistence/filecache"
"io"
"log"
"os"
Expand Down Expand Up @@ -35,6 +36,7 @@ type Store struct {
dir string
dagsDir string
writer *writer
cache *filecache.Cache[*model.Status]
}

var (
Expand All @@ -46,7 +48,11 @@ var (
// New creates a new Store with default configuration.
func New(dir, dagsDir string) *Store {
// dagsDir is used to calculate the directory that is compatible with the old version.
return &Store{dir: dir, dagsDir: dagsDir}
return &Store{
dir: dir,
dagsDir: dagsDir,
cache: filecache.New[*model.Status](),
}
}

func (store *Store) Update(dagFile, requestId string, s *model.Status) error {
Expand All @@ -59,6 +65,7 @@ func (store *Store) Update(dagFile, requestId string, s *model.Status) error {
return err
}
defer func() {
store.cache.Invalidate(f.File)
_ = w.close()
}()
return w.write(s)
Expand Down Expand Up @@ -91,6 +98,7 @@ func (store *Store) Close() error {
if err := store.Compact(store.writer.dagFile, store.writer.target); err != nil {
return err
}
store.cache.Invalidate(store.writer.target)
return store.writer.close()
}

Expand Down Expand Up @@ -142,25 +150,31 @@ func (store *Store) ReadStatusRecent(dagFile string, n int) []*model.StatusFile
var ret []*model.StatusFile
files := store.latest(store.pattern(dagFile)+"*.dat", n)
for _, file := range files {
status, err := ParseFile(file)
if err == nil {
ret = append(ret, &model.StatusFile{
File: file,
Status: status,
})
status, err := store.cache.LoadLatest(file, func() (*model.Status, error) {
return ParseFile(file)
})
if err != nil {
continue
}
ret = append(ret, &model.StatusFile{
File: file,
Status: status,
})
}
return ret
}

// ReadStatusToday returns a list of status files.
func (store *Store) ReadStatusToday(dagFile string) (*model.Status, error) {
// TODO: let's fix below not to use config here
readLatestStatus := config.Get().LatestStatusToday
file, err := store.latestToday(dagFile, time.Now(), readLatestStatus)
if err != nil {
return nil, err
}
return ParseFile(file)
return store.cache.LoadLatest(file, func() (*model.Status, error) {
return ParseFile(file)
})
}

// FindByRequestId finds a status file by requestId.
Expand Down Expand Up @@ -365,7 +379,7 @@ func readLineFrom(f *os.File, offset int64) ([]byte, error) {
return nil, err
}
r := bufio.NewReader(f)
ret := []byte{}
var ret []byte
for {
b, isPrefix, err := r.ReadLine()
if err == io.EOF {
Expand Down
File renamed without changes.
21 changes: 13 additions & 8 deletions internal/persistence/local/dag_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package local
import (
"errors"
"fmt"
"github.com/dagu-dev/dagu/internal/persistence/filecache"
"os"
"path"
"path/filepath"
Expand All @@ -15,11 +16,15 @@ import (
)

type dagStoreImpl struct {
dir string
dir string
metaCache *filecache.Cache[*dag.DAG]
}

func NewDAGStore(dir string) persistence.DAGStore {
return &dagStoreImpl{dir: dir}
return &dagStoreImpl{
dir: dir,
metaCache: filecache.New[*dag.DAG](),
}
}

var (
Expand All @@ -40,12 +45,10 @@ func (d *dagStoreImpl) GetMetadata(name string) (*dag.DAG, error) {
if err != nil {
return nil, fmt.Errorf("%w: %s", errInvalidName, name)
}
cl := dag.Loader{}
dat, err := cl.LoadMetadata(loc)
if err != nil {
return nil, err
}
return dat, nil
return d.metaCache.LoadLatest(loc, func() (*dag.DAG, error) {
cl := dag.Loader{}
return cl.LoadMetadata(loc)
})
}

func (d *dagStoreImpl) GetDetails(name string) (*dag.DAG, error) {
Expand Down Expand Up @@ -91,6 +94,7 @@ func (d *dagStoreImpl) UpdateSpec(name string, spec []byte) error {
if err != nil {
return fmt.Errorf("%w: %s", errFailedToUpdateDAGFile, err)
}
d.metaCache.Invalidate(loc)
return nil
}

Expand All @@ -117,6 +121,7 @@ func (d *dagStoreImpl) Delete(name string) error {
if err != nil {
return fmt.Errorf("%w: %s", errFailedToDeleteDAGFile, err)
}
d.metaCache.Invalidate(loc)
return nil
}

Expand Down
9 changes: 7 additions & 2 deletions service/scheduler/entry_reader/entry_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,13 @@ func New(params Params) *EntryReader {
if err := er.initDags(); err != nil {
er.logger.Error("failed to init entry_reader dags", tag.Error(err))
}
go er.watchDags()
return er
}

func (er *EntryReader) Start(done chan any) {
go er.watchDags(done)
}

func (er *EntryReader) Read(now time.Time) ([]*scheduler.Entry, error) {
var entries []*scheduler.Entry
er.dagsLock.Lock()
Expand Down Expand Up @@ -109,7 +112,7 @@ func (er *EntryReader) initDags() error {
return nil
}

func (er *EntryReader) watchDags() {
func (er *EntryReader) watchDags(done chan any) {
cl := dag.Loader{}
watcher, err := filenotify.New(time.Minute)
if err != nil {
Expand All @@ -122,6 +125,8 @@ func (er *EntryReader) watchDags() {
_ = watcher.Add(er.dagsDir)
for {
select {
case <-done:
return
case event, ok := <-watcher.Events():
if !ok {
return
Expand Down
Loading

0 comments on commit d0938f1

Please sign in to comment.