From 2160fa31bd71a188806ee53430f6439282fbb6e3 Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Sun, 27 Aug 2023 17:41:06 +0900 Subject: [PATCH 1/4] refactor cmd package --- cmd/commands.go | 306 ------------------------------------------ cmd/commands_test.go | 243 --------------------------------- cmd/common.go | 66 +++++++++ cmd/common_test.go | 14 ++ cmd/dry.go | 17 +++ cmd/dry_test.go | 16 +++ cmd/restart.go | 78 +++++++++++ cmd/restart_test.go | 53 ++++++++ cmd/retry.go | 37 +++++ cmd/retry_test.go | 30 +++++ cmd/root.go | 12 ++ cmd/scheduler.go | 26 ++++ cmd/scheduler_test.go | 25 ++++ cmd/server.go | 46 +++++++ cmd/server_test.go | 38 ++++++ cmd/start.go | 17 +++ cmd/start_test.go | 24 ++++ cmd/status.go | 27 ++++ cmd/status_test.go | 33 +++++ cmd/stop.go | 23 ++++ cmd/stop_test.go | 32 +++++ cmd/version.go | 18 +++ cmd/version_test.go | 13 ++ 23 files changed, 645 insertions(+), 549 deletions(-) delete mode 100644 cmd/commands.go delete mode 100644 cmd/commands_test.go create mode 100644 cmd/common.go create mode 100644 cmd/dry.go create mode 100644 cmd/dry_test.go create mode 100644 cmd/restart.go create mode 100644 cmd/restart_test.go create mode 100644 cmd/retry.go create mode 100644 cmd/retry_test.go create mode 100644 cmd/scheduler.go create mode 100644 cmd/scheduler_test.go create mode 100644 cmd/server.go create mode 100644 cmd/server_test.go create mode 100644 cmd/start.go create mode 100644 cmd/start_test.go create mode 100644 cmd/status.go create mode 100644 cmd/status_test.go create mode 100644 cmd/stop.go create mode 100644 cmd/stop_test.go create mode 100644 cmd/version.go create mode 100644 cmd/version_test.go diff --git a/cmd/commands.go b/cmd/commands.go deleted file mode 100644 index 6fc48ccc..00000000 --- a/cmd/commands.go +++ /dev/null @@ -1,306 +0,0 @@ -package cmd - -import ( - "context" - "fmt" - "log" - "os" - "os/signal" - "path/filepath" - "syscall" - "time" - - "github.com/spf13/cobra" - "github.com/spf13/viper" - "github.com/yohamta/dagu/internal/agent" - "github.com/yohamta/dagu/internal/config" - "github.com/yohamta/dagu/internal/constants" - "github.com/yohamta/dagu/internal/controller" - "github.com/yohamta/dagu/internal/dag" - "github.com/yohamta/dagu/internal/database" - "github.com/yohamta/dagu/internal/models" - "github.com/yohamta/dagu/internal/runner" - "github.com/yohamta/dagu/internal/scheduler" - "github.com/yohamta/dagu/internal/web" -) - -func registerCommands(root *cobra.Command) { - rootCmd.AddCommand(createStartCommand()) - rootCmd.AddCommand(createStopCommand()) - rootCmd.AddCommand(createRestartCommand()) - rootCmd.AddCommand(createDryCommand()) - rootCmd.AddCommand(createStatusCommand()) - rootCmd.AddCommand(createVersionCommand()) - rootCmd.AddCommand(createServerCommand()) - rootCmd.AddCommand(createSchedulerCommand()) - rootCmd.AddCommand(createRetryCommand()) -} - -func createStartCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "start [flags] ", - Short: "Runs the DAG", - Long: `dagu start [--params="param1 param2"] `, - Args: cobra.ExactArgs(1), - Run: func(cmd *cobra.Command, args []string) { - executeDAGCommand(cmd.Context(), cmd, args, false) - }, - } - cmd.Flags().StringP("params", "p", "", "parameters") - return cmd -} - -func createDryCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "dry [flags] ", - Short: "Dry-runs specified DAG", - Long: `dagu dry [--params="param1 param2"] `, - Args: cobra.ExactArgs(1), - Run: func(cmd *cobra.Command, args []string) { - executeDAGCommand(cmd.Context(), cmd, args, true) - }, - } - cmd.Flags().StringP("params", "p", "", "parameters") - return cmd -} - -func createRestartCommand() *cobra.Command { - return &cobra.Command{ - Use: "restart ", - Short: "Restart the DAG", - Long: `dagu restart `, - Args: cobra.ExactArgs(1), - Run: func(cmd *cobra.Command, args []string) { - dagFile := args[0] - loadedDAG, err := loadDAG(dagFile, "") - checkError(err) - - ctrl := controller.NewDAGController(loadedDAG) - - // Check the current status and stop the DAG if it is running. - stopDAGIfRunning(ctrl) - - // Wait for the specified amount of time before restarting. - waitForRestart(loadedDAG.RestartWait) - - // Retrieve the parameter of the previous execution. - log.Printf("Restarting %s...", loadedDAG.Name) - params := getPreviousExecutionParams(ctrl) - - // Start the DAG with the same parameter. - loadedDAG, err = loadDAG(dagFile, params) - checkError(err) - cobra.CheckErr(start(cmd.Context(), loadedDAG, false)) - }, - } -} - -func stopDAGIfRunning(ctrl *controller.DAGController) { - st, err := ctrl.GetStatus() - checkError(err) - - // Stop the DAG if it is running. - if st.Status == scheduler.SchedulerStatus_Running { - log.Printf("Stopping %s for restart...", ctrl.DAG.Name) - cobra.CheckErr(stopRunningDAG(ctrl)) - } -} - -func waitForRestart(restartWait time.Duration) { - if restartWait > 0 { - log.Printf("Waiting for %s...", restartWait) - time.Sleep(restartWait) - } -} - -func getPreviousExecutionParams(ctrl *controller.DAGController) string { - st, err := ctrl.GetLastStatus() - checkError(err) - - return st.Params -} - -func createRetryCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "retry --req= ", - Short: "Retry the DAG execution", - Long: `dagu retry --req= `, - Args: cobra.ExactArgs(1), - Run: func(cmd *cobra.Command, args []string) { - f, _ := filepath.Abs(args[0]) - reqID, err := cmd.Flags().GetString("req") - checkError(err) - - status, err := database.New().FindByRequestId(f, reqID) - checkError(err) - - loadedDAG, err := loadDAG(args[0], status.Status.Params) - checkError(err) - - a := &agent.Agent{AgentConfig: &agent.AgentConfig{DAG: loadedDAG}, - RetryConfig: &agent.RetryConfig{Status: status.Status}} - ctx := cmd.Context() - go listenSignals(ctx, a) - checkError(a.Run(ctx)) - }, - } - cmd.Flags().StringP("req", "r", "", "request-id") - cmd.MarkFlagRequired("req") - return cmd -} - -func createSchedulerCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "scheduler", - Short: "Start the scheduler", - Long: `dagu scheduler [--dags=]`, - Run: func(cmd *cobra.Command, args []string) { - config.Get().DAGs = getFlagString(cmd, "dags", config.Get().DAGs) - agent := runner.NewAgent(config.Get()) - go listenSignals(cmd.Context(), agent) - checkError(agent.Start()) - }, - } - cmd.Flags().StringP("dags", "d", "", "location of DAG files (default is $HOME/.dagu/dags)") - viper.BindPFlag("dags", cmd.Flags().Lookup("dags")) - - return cmd -} - -func createServerCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "server", - Short: "Start the server", - Long: `dagu server [--dags=] [--host=] [--port=]`, - Run: func(cmd *cobra.Command, args []string) { - server := web.NewServer(config.Get()) - go listenSignals(cmd.Context(), server) - checkError(server.Serve()) - }, - } - bindServerCommandFlags(cmd) - return cmd -} - -func bindServerCommandFlags(cmd *cobra.Command) { - cmd.Flags().StringP("dags", "d", "", "location of DAG files (default is $HOME/.dagu/dags)") - cmd.Flags().StringP("host", "s", "", "server host (default is localhost)") - cmd.Flags().StringP("port", "p", "", "server port (default is 8080)") - - viper.BindPFlag("port", cmd.Flags().Lookup("port")) - viper.BindPFlag("host", cmd.Flags().Lookup("host")) - viper.BindPFlag("dags", cmd.Flags().Lookup("dags")) -} - -func createStatusCommand() *cobra.Command { - return &cobra.Command{ - Use: "status ", - Short: "Display current status of the DAG", - Long: `dagu status `, - Args: cobra.ExactArgs(1), - Run: func(cmd *cobra.Command, args []string) { - loadedDAG, err := loadDAG(args[0], "") - checkError(err) - - status, err := controller.NewDAGController(loadedDAG).GetStatus() - checkError(err) - - res := &models.StatusResponse{Status: status} - log.Printf("Pid=%d Status=%s", res.Status.Pid, res.Status.Status) - }, - } -} - -func createStopCommand() *cobra.Command { - return &cobra.Command{ - Use: "stop ", - Short: "Stop the running DAG", - Long: `dagu stop `, - Args: cobra.ExactArgs(1), - Run: func(cmd *cobra.Command, args []string) { - loadedDAG, err := loadDAG(args[0], "") - checkError(err) - - log.Printf("Stopping...") - checkError(controller.NewDAGController(loadedDAG).Stop()) - }, - } -} - -func createVersionCommand() *cobra.Command { - return &cobra.Command{ - Use: "version", - Short: "Display the binary version", - Long: `dagu version`, - Run: func(cmd *cobra.Command, args []string) { - fmt.Println(constants.Version) - }, - } -} - -func stopRunningDAG(ctrl *controller.DAGController) error { - for { - st, err := ctrl.GetStatus() - checkError(err) - - if st.Status != scheduler.SchedulerStatus_Running { - return nil - } - checkError(ctrl.Stop()) - time.Sleep(time.Millisecond * 100) - } -} - -func executeDAGCommand(ctx context.Context, cmd *cobra.Command, args []string, dry bool) { - params, err := cmd.Flags().GetString("params") - checkError(err) - - loadedDAG, err := loadDAG(args[0], removeQuotes(params)) - checkError(err) - - err = start(ctx, loadedDAG, dry) - if err != nil { - log.Fatalf("Failed to start DAG: %v", err) - } -} - -func start(ctx context.Context, d *dag.DAG, dry bool) error { - a := &agent.Agent{AgentConfig: &agent.AgentConfig{DAG: d, Dry: dry}} - go listenSignals(ctx, a) - return a.Run(ctx) -} - -type signalListener interface { - Signal(os.Signal) -} - -var ( - signalChan chan os.Signal -) - -func listenSignals(ctx context.Context, a signalListener) error { - signalChan = make(chan os.Signal, 100) - signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) - - select { - case <-ctx.Done(): - a.Signal(os.Interrupt) - return ctx.Err() - case sig := <-signalChan: - a.Signal(sig) - return fmt.Errorf("received signal: %v", sig) - } -} - -func checkError(err error) { - if err != nil { - log.Fatal(err) - } -} - -func removeQuotes(s string) string { - if len(s) > 1 && s[0] == '"' && s[len(s)-1] == '"' { - return s[1 : len(s)-1] - } - return s -} diff --git a/cmd/commands_test.go b/cmd/commands_test.go deleted file mode 100644 index 2e8e45ab..00000000 --- a/cmd/commands_test.go +++ /dev/null @@ -1,243 +0,0 @@ -package cmd - -import ( - "fmt" - "net" - "net/http" - "os" - "syscall" - "testing" - "time" - - "github.com/stretchr/testify/require" - "github.com/yohamta/dagu/internal/config" - "github.com/yohamta/dagu/internal/constants" - "github.com/yohamta/dagu/internal/controller" - "github.com/yohamta/dagu/internal/scheduler" - "github.com/yohamta/dagu/internal/utils" -) - -func TestMain(m *testing.M) { - tmpDir := utils.MustTempDir("dagu_test") - changeHomeDir(tmpDir) - code := m.Run() - os.RemoveAll(tmpDir) - os.Exit(code) -} - -func changeHomeDir(homeDir string) { - os.Setenv("HOME", homeDir) - _ = config.LoadConfig(homeDir) -} - -func TestStartCommand(t *testing.T) { - tests := []cmdTest{ - { - args: []string{"start", testDAGFile("start.yaml")}, - expectedOut: []string{"1 finished"}, - }, - { - args: []string{"start", testDAGFile("start_with_params.yaml")}, - expectedOut: []string{"params is p1 and p2"}, - }, - { - args: []string{"start", `--params="p3 p4"`, testDAGFile("start_with_params.yaml")}, - expectedOut: []string{"params is p3 and p4"}, - }, - } - - for _, tc := range tests { - testRunCommand(t, createStartCommand(), tc) - } -} - -func TestDryCommand(t *testing.T) { - tests := []cmdTest{ - { - args: []string{"dry", testDAGFile("dry.yaml")}, - expectedOut: []string{"Starting DRY-RUN"}, - }, - } - - for _, tc := range tests { - testRunCommand(t, createDryCommand(), tc) - } -} - -func TestRestartCommand(t *testing.T) { - dagFile := testDAGFile("restart.yaml") - - // Start the DAG. - go func() { - testRunCommand(t, createStartCommand(), cmdTest{args: []string{"start", `--params="foo"`, dagFile}}) - }() - - time.Sleep(time.Millisecond * 100) - - // Wait for the DAG running. - testStatusEventual(t, dagFile, scheduler.SchedulerStatus_Running) - - // Restart the DAG. - done := make(chan struct{}) - go func() { - testRunCommand(t, createRestartCommand(), cmdTest{args: []string{"restart", dagFile}}) - close(done) - }() - - time.Sleep(time.Millisecond * 100) - - // Wait for the DAG running again. - testStatusEventual(t, dagFile, scheduler.SchedulerStatus_Running) - - // Stop the restarted DAG. - testRunCommand(t, createStopCommand(), cmdTest{args: []string{"stop", dagFile}}) - - time.Sleep(time.Millisecond * 100) - - // Wait for the DAG is stopped. - testStatusEventual(t, dagFile, scheduler.SchedulerStatus_None) - - // Check parameter was the same as the first execution - d, err := loadDAG(dagFile, "") - require.NoError(t, err) - ctrl := controller.NewDAGController(d) - sts := ctrl.GetRecentStatuses(2) - require.Len(t, sts, 2) - require.Equal(t, sts[0].Status.Params, sts[1].Status.Params) - - <-done -} - -func TestRetryCommand(t *testing.T) { - dagFile := testDAGFile("retry.yaml") - - // Run a DAG. - testRunCommand(t, createStartCommand(), cmdTest{args: []string{"start", `--params="foo"`, dagFile}}) - - // Find the request ID. - dsts, err := controller.NewDAGStatusReader().ReadStatus(dagFile, false) - require.NoError(t, err) - require.Equal(t, dsts.Status.Status, scheduler.SchedulerStatus_Success) - require.NotNil(t, dsts.Status) - - reqID := dsts.Status.RequestId - - // Retry with the request ID. - testRunCommand(t, createRetryCommand(), cmdTest{ - args: []string{"retry", fmt.Sprintf("--req=%s", reqID), dagFile}, - expectedOut: []string{"param is foo"}, - }) -} - -func TestSchedulerCommand(t *testing.T) { - // Start the scheduler. - done := make(chan struct{}) - go func() { - testRunCommand(t, createSchedulerCommand(), cmdTest{ - args: []string{"scheduler"}, - expectedOut: []string{"starting dagu scheduler"}, - }) - close(done) - }() - - time.Sleep(time.Millisecond * 300) - - // Stop the scheduler. - signalChan <- syscall.SIGTERM - <-done -} - -func TestServerCommand(t *testing.T) { - port := findPort(t) - - // Start the server. - done := make(chan struct{}) - go func() { - testRunCommand(t, createServerCommand(), cmdTest{ - args: []string{"server", fmt.Sprintf("--port=%s", port)}, - expectedOut: []string{"server is running"}, - }) - close(done) - }() - - time.Sleep(time.Millisecond * 300) - - // Stop the server. - res, err := http.Post( - fmt.Sprintf("http://%s:%s/shutdown", "localhost", port), - "application/json", - nil, - ) - - require.NoError(t, err) - require.Equal(t, http.StatusOK, res.StatusCode) - - <-done -} - -func TestStatusCommand(t *testing.T) { - dagFile := testDAGFile("status.yaml") - - // Start the DAG. - done := make(chan struct{}) - go func() { - testRunCommand(t, createStartCommand(), cmdTest{args: []string{"start", dagFile}}) - close(done) - }() - - time.Sleep(time.Millisecond * 50) - - // Wait for the DAG running. - testLastStatusEventual(t, dagFile, scheduler.SchedulerStatus_Running) - - // Check the current status. - testRunCommand(t, createStatusCommand(), cmdTest{ - args: []string{"status", dagFile}, - expectedOut: []string{"Status=running"}, - }) - - // Stop the DAG. - testRunCommand(t, createStopCommand(), cmdTest{args: []string{"stop", dagFile}}) - <-done -} - -func TestStopCommand(t *testing.T) { - dagFile := testDAGFile("stop.yaml") - - // Start the DAG. - done := make(chan struct{}) - go func() { - testRunCommand(t, createStartCommand(), cmdTest{args: []string{"start", dagFile}}) - close(done) - }() - - time.Sleep(time.Millisecond * 50) - - // Wait for the DAG running. - testLastStatusEventual(t, dagFile, scheduler.SchedulerStatus_Running) - - // Stop the DAG. - testRunCommand(t, createStopCommand(), cmdTest{ - args: []string{"stop", dagFile}, - expectedOut: []string{"Stopping..."}}) - - // Check the last execution is cancelled. - testLastStatusEventual(t, dagFile, scheduler.SchedulerStatus_Cancel) - <-done -} - -func TestVersionCommand(t *testing.T) { - constants.Version = "1.2.3" - testRunCommand(t, createVersionCommand(), cmdTest{ - args: []string{"version"}, - expectedOut: []string{"1.2.3"}}) -} - -func findPort(t *testing.T) string { - t.Helper() - ln, err := net.Listen("tcp", ":0") - require.NoError(t, err) - port := ln.Addr().(*net.TCPAddr).Port - _ = ln.Close() - return fmt.Sprintf("%d", port) -} diff --git a/cmd/common.go b/cmd/common.go new file mode 100644 index 00000000..6032e183 --- /dev/null +++ b/cmd/common.go @@ -0,0 +1,66 @@ +package cmd + +import ( + "context" + "github.com/spf13/cobra" + "github.com/yohamta/dagu/internal/agent" + "github.com/yohamta/dagu/internal/dag" + "log" + "os" + "os/signal" + "syscall" +) + +func execDAG(ctx context.Context, cmd *cobra.Command, args []string, dry bool) { + params, err := cmd.Flags().GetString("params") + checkError(err) + + loadedDAG, err := loadDAG(args[0], removeQuotes(params)) + checkError(err) + + err = start(ctx, loadedDAG, dry) + if err != nil { + log.Fatalf("Failed to start DAG: %v", err) + } +} + +func start(ctx context.Context, d *dag.DAG, dry bool) error { + a := &agent.Agent{AgentConfig: &agent.AgentConfig{DAG: d, Dry: dry}} + listenSignals(ctx, a) + return a.Run(ctx) +} + +type signalListener interface { + Signal(os.Signal) +} + +var ( + signalChan chan os.Signal +) + +func listenSignals(ctx context.Context, a signalListener) { + go func() { + signalChan = make(chan os.Signal, 100) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + + select { + case <-ctx.Done(): + a.Signal(os.Interrupt) + case sig := <-signalChan: + a.Signal(sig) + } + }() +} + +func checkError(err error) { + if err != nil { + log.Fatal(err) + } +} + +func removeQuotes(s string) string { + if len(s) > 1 && s[0] == '"' && s[len(s)-1] == '"' { + return s[1 : len(s)-1] + } + return s +} diff --git a/cmd/common_test.go b/cmd/common_test.go index 6edb5430..645fd107 100644 --- a/cmd/common_test.go +++ b/cmd/common_test.go @@ -2,6 +2,7 @@ package cmd import ( "bytes" + "github.com/yohamta/dagu/internal/config" "io" "log" "os" @@ -17,6 +18,19 @@ import ( "github.com/yohamta/dagu/internal/utils" ) +func TestMain(m *testing.M) { + tmpDir := utils.MustTempDir("dagu_test") + changeHomeDir(tmpDir) + code := m.Run() + os.RemoveAll(tmpDir) + os.Exit(code) +} + +func changeHomeDir(homeDir string) { + os.Setenv("HOME", homeDir) + _ = config.LoadConfig(homeDir) +} + type cmdTest struct { args []string expectedOut []string diff --git a/cmd/dry.go b/cmd/dry.go new file mode 100644 index 00000000..2b4242d4 --- /dev/null +++ b/cmd/dry.go @@ -0,0 +1,17 @@ +package cmd + +import "github.com/spf13/cobra" + +func dryCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "dry [flags] ", + Short: "Dry-runs specified DAG", + Long: `dagu dry [--params="param1 param2"] `, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + execDAG(cmd.Context(), cmd, args, true) + }, + } + cmd.Flags().StringP("params", "p", "", "parameters") + return cmd +} diff --git a/cmd/dry_test.go b/cmd/dry_test.go new file mode 100644 index 00000000..b96664ac --- /dev/null +++ b/cmd/dry_test.go @@ -0,0 +1,16 @@ +package cmd + +import "testing" + +func TestDryCommand(t *testing.T) { + tests := []cmdTest{ + { + args: []string{"dry", testDAGFile("dry.yaml")}, + expectedOut: []string{"Starting DRY-RUN"}, + }, + } + + for _, tc := range tests { + testRunCommand(t, dryCmd(), tc) + } +} diff --git a/cmd/restart.go b/cmd/restart.go new file mode 100644 index 00000000..07b30e84 --- /dev/null +++ b/cmd/restart.go @@ -0,0 +1,78 @@ +package cmd + +import ( + "github.com/spf13/cobra" + "github.com/yohamta/dagu/internal/controller" + "github.com/yohamta/dagu/internal/scheduler" + "log" + "time" +) + +func restartCmd() *cobra.Command { + return &cobra.Command{ + Use: "restart ", + Short: "Restart the DAG", + Long: `dagu restart `, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + dagFile := args[0] + loadedDAG, err := loadDAG(dagFile, "") + checkError(err) + + ctrl := controller.NewDAGController(loadedDAG) + + // Check the current status and stop the DAG if it is running. + stopDAGIfRunning(ctrl) + + // Wait for the specified amount of time before restarting. + waitForRestart(loadedDAG.RestartWait) + + // Retrieve the parameter of the previous execution. + log.Printf("Restarting %s...", loadedDAG.Name) + params := getPreviousExecutionParams(ctrl) + + // Start the DAG with the same parameter. + loadedDAG, err = loadDAG(dagFile, params) + checkError(err) + cobra.CheckErr(start(cmd.Context(), loadedDAG, false)) + }, + } +} + +func stopDAGIfRunning(ctrl *controller.DAGController) { + st, err := ctrl.GetStatus() + checkError(err) + + // Stop the DAG if it is running. + if st.Status == scheduler.SchedulerStatus_Running { + log.Printf("Stopping %s for restart...", ctrl.DAG.Name) + cobra.CheckErr(stopRunningDAG(ctrl)) + } +} + +func stopRunningDAG(ctrl *controller.DAGController) error { + for { + st, err := ctrl.GetStatus() + checkError(err) + + if st.Status != scheduler.SchedulerStatus_Running { + return nil + } + checkError(ctrl.Stop()) + time.Sleep(time.Millisecond * 100) + } +} + +func waitForRestart(restartWait time.Duration) { + if restartWait > 0 { + log.Printf("Waiting for %s...", restartWait) + time.Sleep(restartWait) + } +} + +func getPreviousExecutionParams(ctrl *controller.DAGController) string { + st, err := ctrl.GetLastStatus() + checkError(err) + + return st.Params +} diff --git a/cmd/restart_test.go b/cmd/restart_test.go new file mode 100644 index 00000000..aa7b280d --- /dev/null +++ b/cmd/restart_test.go @@ -0,0 +1,53 @@ +package cmd + +import ( + "github.com/stretchr/testify/require" + "github.com/yohamta/dagu/internal/controller" + "github.com/yohamta/dagu/internal/scheduler" + "testing" + "time" +) + +func TestRestartCommand(t *testing.T) { + dagFile := testDAGFile("restart.yaml") + + // Start the DAG. + go func() { + testRunCommand(t, startCmd(), cmdTest{args: []string{"start", `--params="foo"`, dagFile}}) + }() + + time.Sleep(time.Millisecond * 100) + + // Wait for the DAG running. + testStatusEventual(t, dagFile, scheduler.SchedulerStatus_Running) + + // Restart the DAG. + done := make(chan struct{}) + go func() { + testRunCommand(t, restartCmd(), cmdTest{args: []string{"restart", dagFile}}) + close(done) + }() + + time.Sleep(time.Millisecond * 100) + + // Wait for the DAG running again. + testStatusEventual(t, dagFile, scheduler.SchedulerStatus_Running) + + // Stop the restarted DAG. + testRunCommand(t, stopCmd(), cmdTest{args: []string{"stop", dagFile}}) + + time.Sleep(time.Millisecond * 100) + + // Wait for the DAG is stopped. + testStatusEventual(t, dagFile, scheduler.SchedulerStatus_None) + + // Check parameter was the same as the first execution + d, err := loadDAG(dagFile, "") + require.NoError(t, err) + ctrl := controller.NewDAGController(d) + sts := ctrl.GetRecentStatuses(2) + require.Len(t, sts, 2) + require.Equal(t, sts[0].Status.Params, sts[1].Status.Params) + + <-done +} diff --git a/cmd/retry.go b/cmd/retry.go new file mode 100644 index 00000000..bbd0e560 --- /dev/null +++ b/cmd/retry.go @@ -0,0 +1,37 @@ +package cmd + +import ( + "github.com/spf13/cobra" + "github.com/yohamta/dagu/internal/agent" + "github.com/yohamta/dagu/internal/database" + "path/filepath" +) + +func retryCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "retry --req= ", + Short: "Retry the DAG execution", + Long: `dagu retry --req= `, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + f, _ := filepath.Abs(args[0]) + reqID, err := cmd.Flags().GetString("req") + checkError(err) + + status, err := database.New().FindByRequestId(f, reqID) + checkError(err) + + loadedDAG, err := loadDAG(args[0], status.Status.Params) + checkError(err) + + a := &agent.Agent{AgentConfig: &agent.AgentConfig{DAG: loadedDAG}, + RetryConfig: &agent.RetryConfig{Status: status.Status}} + ctx := cmd.Context() + listenSignals(ctx, a) + checkError(a.Run(ctx)) + }, + } + cmd.Flags().StringP("req", "r", "", "request-id") + _ = cmd.MarkFlagRequired("req") + return cmd +} diff --git a/cmd/retry_test.go b/cmd/retry_test.go new file mode 100644 index 00000000..aac3222a --- /dev/null +++ b/cmd/retry_test.go @@ -0,0 +1,30 @@ +package cmd + +import ( + "fmt" + "github.com/stretchr/testify/require" + "github.com/yohamta/dagu/internal/controller" + "github.com/yohamta/dagu/internal/scheduler" + "testing" +) + +func TestRetryCommand(t *testing.T) { + dagFile := testDAGFile("retry.yaml") + + // Run a DAG. + testRunCommand(t, startCmd(), cmdTest{args: []string{"start", `--params="foo"`, dagFile}}) + + // Find the request ID. + s, err := controller.NewDAGStatusReader().ReadStatus(dagFile, false) + require.NoError(t, err) + require.Equal(t, s.Status.Status, scheduler.SchedulerStatus_Success) + require.NotNil(t, s.Status) + + reqID := s.Status.RequestId + + // Retry with the request ID. + testRunCommand(t, retryCmd(), cmdTest{ + args: []string{"retry", fmt.Sprintf("--req=%s", reqID), dagFile}, + expectedOut: []string{"param is foo"}, + }) +} diff --git a/cmd/root.go b/cmd/root.go index 5f562977..db50c1c9 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -75,3 +75,15 @@ func getFlagString(cmd *cobra.Command, name, fallback string) string { } return fallback } + +func registerCommands(root *cobra.Command) { + rootCmd.AddCommand(startCmd()) + rootCmd.AddCommand(stopCmd()) + rootCmd.AddCommand(restartCmd()) + rootCmd.AddCommand(dryCmd()) + rootCmd.AddCommand(createStatusCommand()) + rootCmd.AddCommand(versionCmd()) + rootCmd.AddCommand(serverCmd()) + rootCmd.AddCommand(createSchedulerCommand()) + rootCmd.AddCommand(retryCmd()) +} diff --git a/cmd/scheduler.go b/cmd/scheduler.go new file mode 100644 index 00000000..e172f6fa --- /dev/null +++ b/cmd/scheduler.go @@ -0,0 +1,26 @@ +package cmd + +import ( + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/yohamta/dagu/internal/config" + "github.com/yohamta/dagu/internal/runner" +) + +func createSchedulerCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "scheduler", + Short: "Start the scheduler", + Long: `dagu scheduler [--dags=]`, + Run: func(cmd *cobra.Command, args []string) { + config.Get().DAGs = getFlagString(cmd, "dags", config.Get().DAGs) + agent := runner.NewAgent(config.Get()) + listenSignals(cmd.Context(), agent) + checkError(agent.Start()) + }, + } + cmd.Flags().StringP("dags", "d", "", "location of DAG files (default is $HOME/.dagu/dags)") + _ = viper.BindPFlag("dags", cmd.Flags().Lookup("dags")) + + return cmd +} diff --git a/cmd/scheduler_test.go b/cmd/scheduler_test.go new file mode 100644 index 00000000..73694fe6 --- /dev/null +++ b/cmd/scheduler_test.go @@ -0,0 +1,25 @@ +package cmd + +import ( + "syscall" + "testing" + "time" +) + +func TestSchedulerCommand(t *testing.T) { + // Start the scheduler. + done := make(chan struct{}) + go func() { + testRunCommand(t, createSchedulerCommand(), cmdTest{ + args: []string{"scheduler"}, + expectedOut: []string{"starting dagu scheduler"}, + }) + close(done) + }() + + time.Sleep(time.Millisecond * 300) + + // Stop the scheduler. + signalChan <- syscall.SIGTERM + <-done +} diff --git a/cmd/server.go b/cmd/server.go new file mode 100644 index 00000000..d0523e77 --- /dev/null +++ b/cmd/server.go @@ -0,0 +1,46 @@ +package cmd + +import ( + "fmt" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/stretchr/testify/require" + "github.com/yohamta/dagu/internal/config" + "github.com/yohamta/dagu/internal/web" + "net" + "testing" +) + +func serverCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "server", + Short: "Start the server", + Long: `dagu server [--dags=] [--host=] [--port=]`, + Run: func(cmd *cobra.Command, args []string) { + server := web.NewServer(config.Get()) + listenSignals(cmd.Context(), server) + checkError(server.Serve()) + }, + } + bindServerCommandFlags(cmd) + return cmd +} + +func findPort(t *testing.T) string { + t.Helper() + ln, err := net.Listen("tcp", ":0") + require.NoError(t, err) + port := ln.Addr().(*net.TCPAddr).Port + _ = ln.Close() + return fmt.Sprintf("%d", port) +} + +func bindServerCommandFlags(cmd *cobra.Command) { + cmd.Flags().StringP("dags", "d", "", "location of DAG files (default is $HOME/.dagu/dags)") + cmd.Flags().StringP("host", "s", "", "server host (default is localhost)") + cmd.Flags().StringP("port", "p", "", "server port (default is 8080)") + + _ = viper.BindPFlag("port", cmd.Flags().Lookup("port")) + _ = viper.BindPFlag("host", cmd.Flags().Lookup("host")) + _ = viper.BindPFlag("dags", cmd.Flags().Lookup("dags")) +} diff --git a/cmd/server_test.go b/cmd/server_test.go new file mode 100644 index 00000000..d2cd55c4 --- /dev/null +++ b/cmd/server_test.go @@ -0,0 +1,38 @@ +package cmd + +import ( + "fmt" + "github.com/stretchr/testify/require" + "net" + "net/http" + "testing" + "time" +) + +func TestServerCommand(t *testing.T) { + port := findPort(t) + + // Start the server. + done := make(chan struct{}) + go func() { + testRunCommand(t, serverCmd(), cmdTest{ + args: []string{"server", fmt.Sprintf("--port=%s", port)}, + expectedOut: []string{"server is running"}, + }) + close(done) + }() + + time.Sleep(time.Millisecond * 300) + + // Stop the server. + res, err := http.Post( + fmt.Sprintf("http://%s/shutdown", net.JoinHostPort("localhost", port)), + "application/json", + nil, + ) + + require.NoError(t, err) + require.Equal(t, http.StatusOK, res.StatusCode) + + <-done +} diff --git a/cmd/start.go b/cmd/start.go new file mode 100644 index 00000000..0924dab0 --- /dev/null +++ b/cmd/start.go @@ -0,0 +1,17 @@ +package cmd + +import "github.com/spf13/cobra" + +func startCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "start [flags] ", + Short: "Runs the DAG", + Long: `dagu start [--params="param1 param2"] `, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + execDAG(cmd.Context(), cmd, args, false) + }, + } + cmd.Flags().StringP("params", "p", "", "parameters") + return cmd +} diff --git a/cmd/start_test.go b/cmd/start_test.go new file mode 100644 index 00000000..351329c2 --- /dev/null +++ b/cmd/start_test.go @@ -0,0 +1,24 @@ +package cmd + +import "testing" + +func TestStartCommand(t *testing.T) { + tests := []cmdTest{ + { + args: []string{"start", testDAGFile("start.yaml")}, + expectedOut: []string{"1 finished"}, + }, + { + args: []string{"start", testDAGFile("start_with_params.yaml")}, + expectedOut: []string{"params is p1 and p2"}, + }, + { + args: []string{"start", `--params="p3 p4"`, testDAGFile("start_with_params.yaml")}, + expectedOut: []string{"params is p3 and p4"}, + }, + } + + for _, tc := range tests { + testRunCommand(t, startCmd(), tc) + } +} diff --git a/cmd/status.go b/cmd/status.go new file mode 100644 index 00000000..61544641 --- /dev/null +++ b/cmd/status.go @@ -0,0 +1,27 @@ +package cmd + +import ( + "github.com/spf13/cobra" + "github.com/yohamta/dagu/internal/controller" + "github.com/yohamta/dagu/internal/models" + "log" +) + +func createStatusCommand() *cobra.Command { + return &cobra.Command{ + Use: "status ", + Short: "Display current status of the DAG", + Long: `dagu status `, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + loadedDAG, err := loadDAG(args[0], "") + checkError(err) + + status, err := controller.NewDAGController(loadedDAG).GetStatus() + checkError(err) + + res := &models.StatusResponse{Status: status} + log.Printf("Pid=%d Status=%s", res.Status.Pid, res.Status.Status) + }, + } +} diff --git a/cmd/status_test.go b/cmd/status_test.go new file mode 100644 index 00000000..9c15daf7 --- /dev/null +++ b/cmd/status_test.go @@ -0,0 +1,33 @@ +package cmd + +import ( + "github.com/yohamta/dagu/internal/scheduler" + "testing" + "time" +) + +func TestStatusCommand(t *testing.T) { + dagFile := testDAGFile("status.yaml") + + // Start the DAG. + done := make(chan struct{}) + go func() { + testRunCommand(t, startCmd(), cmdTest{args: []string{"start", dagFile}}) + close(done) + }() + + time.Sleep(time.Millisecond * 50) + + // Wait for the DAG running. + testLastStatusEventual(t, dagFile, scheduler.SchedulerStatus_Running) + + // Check the current status. + testRunCommand(t, createStatusCommand(), cmdTest{ + args: []string{"status", dagFile}, + expectedOut: []string{"Status=running"}, + }) + + // Stop the DAG. + testRunCommand(t, stopCmd(), cmdTest{args: []string{"stop", dagFile}}) + <-done +} diff --git a/cmd/stop.go b/cmd/stop.go new file mode 100644 index 00000000..6c38c93e --- /dev/null +++ b/cmd/stop.go @@ -0,0 +1,23 @@ +package cmd + +import ( + "github.com/spf13/cobra" + "github.com/yohamta/dagu/internal/controller" + "log" +) + +func stopCmd() *cobra.Command { + return &cobra.Command{ + Use: "stop ", + Short: "Stop the running DAG", + Long: `dagu stop `, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + loadedDAG, err := loadDAG(args[0], "") + checkError(err) + + log.Printf("Stopping...") + checkError(controller.NewDAGController(loadedDAG).Stop()) + }, + } +} diff --git a/cmd/stop_test.go b/cmd/stop_test.go new file mode 100644 index 00000000..3d356471 --- /dev/null +++ b/cmd/stop_test.go @@ -0,0 +1,32 @@ +package cmd + +import ( + "github.com/yohamta/dagu/internal/scheduler" + "testing" + "time" +) + +func TestStopCommand(t *testing.T) { + dagFile := testDAGFile("stop.yaml") + + // Start the DAG. + done := make(chan struct{}) + go func() { + testRunCommand(t, startCmd(), cmdTest{args: []string{"start", dagFile}}) + close(done) + }() + + time.Sleep(time.Millisecond * 50) + + // Wait for the DAG running. + testLastStatusEventual(t, dagFile, scheduler.SchedulerStatus_Running) + + // Stop the DAG. + testRunCommand(t, stopCmd(), cmdTest{ + args: []string{"stop", dagFile}, + expectedOut: []string{"Stopping..."}}) + + // Check the last execution is cancelled. + testLastStatusEventual(t, dagFile, scheduler.SchedulerStatus_Cancel) + <-done +} diff --git a/cmd/version.go b/cmd/version.go new file mode 100644 index 00000000..810c49b4 --- /dev/null +++ b/cmd/version.go @@ -0,0 +1,18 @@ +package cmd + +import ( + "fmt" + "github.com/spf13/cobra" + "github.com/yohamta/dagu/internal/constants" +) + +func versionCmd() *cobra.Command { + return &cobra.Command{ + Use: "version", + Short: "Display the binary version", + Long: `dagu version`, + Run: func(cmd *cobra.Command, args []string) { + fmt.Println(constants.Version) + }, + } +} diff --git a/cmd/version_test.go b/cmd/version_test.go new file mode 100644 index 00000000..618c8721 --- /dev/null +++ b/cmd/version_test.go @@ -0,0 +1,13 @@ +package cmd + +import ( + "github.com/yohamta/dagu/internal/constants" + "testing" +) + +func TestVersionCommand(t *testing.T) { + constants.Version = "1.2.3" + testRunCommand(t, versionCmd(), cmdTest{ + args: []string{"version"}, + expectedOut: []string{"1.2.3"}}) +} From f5647eae7058875cbca8b69bf36095971d7cbe6b Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Sun, 27 Aug 2023 19:11:08 +0900 Subject: [PATCH 2/4] refactor database --- cmd/common_test.go | 14 +- cmd/restart.go | 3 +- cmd/restart_test.go | 3 +- cmd/retry.go | 4 +- cmd/retry_test.go | 3 +- cmd/status.go | 3 +- cmd/stop.go | 3 +- internal/agent/agent.go | 46 ++--- internal/agent/agent_test.go | 18 +- internal/config/config.go | 2 +- internal/controller/controller.go | 83 ++++----- internal/controller/controller_test.go | 105 ++++++----- internal/controller/status_reader.go | 7 +- internal/controller/status_reader_test.go | 7 +- internal/dag/builder_test.go | 2 +- internal/dag/condition_test.go | 2 +- internal/dag/dag_test.go | 2 +- internal/executor/http.go | 10 +- internal/models/node.go | 18 +- internal/models/status.go | 30 ++-- internal/persistence/interface.go | 26 +++ .../jsondb}/database.go | 168 +++++++++++------- .../jsondb}/database_test.go | 76 ++++---- .../jsondb}/writer.go | 27 +-- .../jsondb}/writer_test.go | 34 ++-- internal/runner/agent_test.go | 17 +- internal/runner/job.go | 7 +- internal/runner/job_test.go | 9 +- internal/runner/runner_test.go | 4 +- internal/scheduler/node_test.go | 2 +- internal/scheduler/scheduler_test.go | 4 +- internal/sock/server_test.go | 4 +- internal/utils/utils_test.go | 2 +- internal/web/handlers/dag.go | 43 ++--- internal/web/handlers/list.go | 5 +- internal/web/handlers/routes.go | 2 +- internal/web/http_test.go | 4 +- internal/web/setup_test.go | 4 +- 38 files changed, 447 insertions(+), 356 deletions(-) create mode 100644 internal/persistence/interface.go rename internal/{database => persistence/jsondb}/database.go (58%) rename internal/{database => persistence/jsondb}/database_test.go (86%) rename internal/{database => persistence/jsondb}/writer.go (67%) rename internal/{database => persistence/jsondb}/writer_test.go (76%) diff --git a/cmd/common_test.go b/cmd/common_test.go index 645fd107..d92a4910 100644 --- a/cmd/common_test.go +++ b/cmd/common_test.go @@ -3,6 +3,7 @@ package cmd import ( "bytes" "github.com/yohamta/dagu/internal/config" + "github.com/yohamta/dagu/internal/persistence/jsondb" "io" "log" "os" @@ -13,7 +14,6 @@ import ( "github.com/spf13/cobra" "github.com/stretchr/testify/require" "github.com/yohamta/dagu/internal/controller" - "github.com/yohamta/dagu/internal/database" "github.com/yohamta/dagu/internal/scheduler" "github.com/yohamta/dagu/internal/utils" ) @@ -22,12 +22,12 @@ func TestMain(m *testing.M) { tmpDir := utils.MustTempDir("dagu_test") changeHomeDir(tmpDir) code := m.Run() - os.RemoveAll(tmpDir) + _ = os.RemoveAll(tmpDir) os.Exit(code) } func changeHomeDir(homeDir string) { - os.Setenv("HOME", homeDir) + _ = os.Setenv("HOME", homeDir) _ = config.LoadConfig(homeDir) } @@ -71,7 +71,7 @@ func withSpool(t *testing.T, f func()) string { defer func() { os.Stdout = origStdout log.SetOutput(origStdout) - w.Close() + _ = w.Close() }() f() @@ -96,7 +96,7 @@ func testStatusEventual(t *testing.T, dagFile string, expected scheduler.Schedul d, err := loadDAG(dagFile, "") require.NoError(t, err) - ctrl := controller.NewDAGController(d) + ctrl := controller.New(d, jsondb.New()) require.Eventually(t, func() bool { status, err := ctrl.GetStatus() @@ -108,8 +108,8 @@ func testStatusEventual(t *testing.T, dagFile string, expected scheduler.Schedul func testLastStatusEventual(t *testing.T, dagFile string, expected scheduler.SchedulerStatus) { t.Helper() require.Eventually(t, func() bool { - db := &database.Database{Config: database.DefaultConfig()} - status := db.ReadStatusHist(dagFile, 1) + hs := jsondb.New() + status := hs.ReadStatusHist(dagFile, 1) if len(status) < 1 { return false } diff --git a/cmd/restart.go b/cmd/restart.go index 07b30e84..4d8fda4e 100644 --- a/cmd/restart.go +++ b/cmd/restart.go @@ -3,6 +3,7 @@ package cmd import ( "github.com/spf13/cobra" "github.com/yohamta/dagu/internal/controller" + "github.com/yohamta/dagu/internal/persistence/jsondb" "github.com/yohamta/dagu/internal/scheduler" "log" "time" @@ -19,7 +20,7 @@ func restartCmd() *cobra.Command { loadedDAG, err := loadDAG(dagFile, "") checkError(err) - ctrl := controller.NewDAGController(loadedDAG) + ctrl := controller.New(loadedDAG, jsondb.New()) // Check the current status and stop the DAG if it is running. stopDAGIfRunning(ctrl) diff --git a/cmd/restart_test.go b/cmd/restart_test.go index aa7b280d..bdf85971 100644 --- a/cmd/restart_test.go +++ b/cmd/restart_test.go @@ -3,6 +3,7 @@ package cmd import ( "github.com/stretchr/testify/require" "github.com/yohamta/dagu/internal/controller" + "github.com/yohamta/dagu/internal/persistence/jsondb" "github.com/yohamta/dagu/internal/scheduler" "testing" "time" @@ -44,7 +45,7 @@ func TestRestartCommand(t *testing.T) { // Check parameter was the same as the first execution d, err := loadDAG(dagFile, "") require.NoError(t, err) - ctrl := controller.NewDAGController(d) + ctrl := controller.New(d, jsondb.New()) sts := ctrl.GetRecentStatuses(2) require.Len(t, sts, 2) require.Equal(t, sts[0].Status.Params, sts[1].Status.Params) diff --git a/cmd/retry.go b/cmd/retry.go index bbd0e560..bc388189 100644 --- a/cmd/retry.go +++ b/cmd/retry.go @@ -3,7 +3,7 @@ package cmd import ( "github.com/spf13/cobra" "github.com/yohamta/dagu/internal/agent" - "github.com/yohamta/dagu/internal/database" + "github.com/yohamta/dagu/internal/persistence/jsondb" "path/filepath" ) @@ -18,7 +18,7 @@ func retryCmd() *cobra.Command { reqID, err := cmd.Flags().GetString("req") checkError(err) - status, err := database.New().FindByRequestId(f, reqID) + status, err := jsondb.New().FindByRequestId(f, reqID) checkError(err) loadedDAG, err := loadDAG(args[0], status.Status.Params) diff --git a/cmd/retry_test.go b/cmd/retry_test.go index aac3222a..cb59f126 100644 --- a/cmd/retry_test.go +++ b/cmd/retry_test.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/stretchr/testify/require" "github.com/yohamta/dagu/internal/controller" + "github.com/yohamta/dagu/internal/persistence/jsondb" "github.com/yohamta/dagu/internal/scheduler" "testing" ) @@ -15,7 +16,7 @@ func TestRetryCommand(t *testing.T) { testRunCommand(t, startCmd(), cmdTest{args: []string{"start", `--params="foo"`, dagFile}}) // Find the request ID. - s, err := controller.NewDAGStatusReader().ReadStatus(dagFile, false) + s, err := controller.NewDAGStatusReader(jsondb.New()).ReadStatus(dagFile, false) require.NoError(t, err) require.Equal(t, s.Status.Status, scheduler.SchedulerStatus_Success) require.NotNil(t, s.Status) diff --git a/cmd/status.go b/cmd/status.go index 61544641..bb3c19a3 100644 --- a/cmd/status.go +++ b/cmd/status.go @@ -4,6 +4,7 @@ import ( "github.com/spf13/cobra" "github.com/yohamta/dagu/internal/controller" "github.com/yohamta/dagu/internal/models" + "github.com/yohamta/dagu/internal/persistence/jsondb" "log" ) @@ -17,7 +18,7 @@ func createStatusCommand() *cobra.Command { loadedDAG, err := loadDAG(args[0], "") checkError(err) - status, err := controller.NewDAGController(loadedDAG).GetStatus() + status, err := controller.New(loadedDAG, jsondb.New()).GetStatus() checkError(err) res := &models.StatusResponse{Status: status} diff --git a/cmd/stop.go b/cmd/stop.go index 6c38c93e..0b8a5367 100644 --- a/cmd/stop.go +++ b/cmd/stop.go @@ -3,6 +3,7 @@ package cmd import ( "github.com/spf13/cobra" "github.com/yohamta/dagu/internal/controller" + "github.com/yohamta/dagu/internal/persistence/jsondb" "log" ) @@ -17,7 +18,7 @@ func stopCmd() *cobra.Command { checkError(err) log.Printf("Stopping...") - checkError(controller.NewDAGController(loadedDAG).Stop()) + checkError(controller.New(loadedDAG, jsondb.New()).Stop()) }, } } diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 36b362b0..6bf506e4 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "github.com/yohamta/dagu/internal/persistence" + "github.com/yohamta/dagu/internal/persistence/jsondb" "log" "net/http" "os" @@ -17,7 +19,6 @@ import ( "github.com/yohamta/dagu/internal/constants" "github.com/yohamta/dagu/internal/controller" "github.com/yohamta/dagu/internal/dag" - "github.com/yohamta/dagu/internal/database" "github.com/yohamta/dagu/internal/logger" "github.com/yohamta/dagu/internal/mailer" "github.com/yohamta/dagu/internal/models" @@ -37,8 +38,7 @@ type Agent struct { graph *scheduler.ExecutionGraph logManager *logManager reporter *reporter.Reporter - database *database.Database - dbManager *dbManager + historyStore persistence.HistoryStore socketServer *sock.Server requestId string } @@ -54,11 +54,6 @@ type RetryConfig struct { Status *models.Status } -type dbManager struct { - dbFile string - dbWriter *database.Writer -} - // Run starts the workflow execution. func (a *Agent) Run(ctx context.Context) error { if err := a.setupRequestId(); err != nil { @@ -242,14 +237,13 @@ func (a *Agent) setupRequestId() error { } func (a *Agent) setupDatabase() error { - a.database = database.New() - utils.LogErr("clean old history data", a.database.RemoveOld(a.DAG.Location, a.DAG.HistRetentionDays)) - - dbWriter, dbFile, err := a.database.NewWriter(a.DAG.Location, time.Now(), a.requestId) - if err != nil { + a.historyStore = jsondb.New() + if err := a.historyStore.RemoveOld(a.DAG.Location, a.DAG.HistRetentionDays); err != nil { + utils.LogErr("clean old history data", err) + } + if err := a.historyStore.Open(a.DAG.Location, time.Now(), a.requestId); err != nil { return err } - a.dbManager = &dbManager{dbFile, dbWriter} return nil } @@ -283,17 +277,13 @@ func (a *Agent) run(ctx context.Context) error { tl.Close() }() - if err := a.dbManager.dbWriter.Open(); err != nil { - return err - } - defer func() { - if err := a.dbManager.dbWriter.Close(); err != nil { - log.Printf("failed to close db writer: %v", err) + if err := a.historyStore.Close(); err != nil { + log.Printf("failed to close history store: %v", err) } }() - utils.LogErr("write status", a.dbManager.dbWriter.Write(a.Status())) + utils.LogErr("write status", a.historyStore.Write(a.Status())) listen := make(chan error) go func() { @@ -317,14 +307,14 @@ func (a *Agent) run(ctx context.Context) error { go func() { for node := range done { status := a.Status() - utils.LogErr("write status", a.dbManager.dbWriter.Write(status)) + utils.LogErr("write status", a.historyStore.Write(status)) utils.LogErr("report step", a.reporter.ReportStep(a.DAG, status, node)) } }() go func() { time.Sleep(time.Millisecond * 100) - utils.LogErr("write status", a.dbManager.dbWriter.Write(a.Status())) + utils.LogErr("write status", a.historyStore.Write(a.Status())) }() ctx = dag.NewContext(ctx, a.DAG) @@ -333,13 +323,11 @@ func (a *Agent) run(ctx context.Context) error { status := a.Status() log.Println("schedule finished.") - utils.LogErr("write status", a.dbManager.dbWriter.Write(a.Status())) + utils.LogErr("write status", a.historyStore.Write(a.Status())) a.reporter.ReportSummary(status, lastErr) utils.LogErr("send email", a.reporter.SendMail(a.DAG, status, lastErr)) - - utils.LogErr("close data file", a.dbManager.dbWriter.Close()) - utils.LogErr("data compaction", a.database.Compact(a.DAG.Location, a.dbManager.dbFile)) + utils.LogErr("close data file", a.historyStore.Close()) return lastErr } @@ -371,7 +359,7 @@ func (a *Agent) dryRun() error { } func (a *Agent) checkIsRunning() error { - status, err := controller.NewDAGController(a.DAG).GetStatus() + status, err := controller.New(a.DAG, jsondb.New()).GetStatus() if err != nil { return err } @@ -395,7 +383,7 @@ var ( ) func (a *Agent) handleHTTP(w http.ResponseWriter, r *http.Request) { - w.Header().Set("content-type", "application/json") + w.Header().Set("content-type", "application/jsondb") switch { case r.Method == http.MethodGet && statusRe.MatchString(r.URL.Path): status := a.Status() diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index 478cebf3..00c907ba 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -2,6 +2,8 @@ package agent import ( "context" + "github.com/yohamta/dagu/internal/persistence" + "github.com/yohamta/dagu/internal/persistence/jsondb" "net/http" "net/url" "os" @@ -25,20 +27,24 @@ func TestMain(m *testing.M) { testHomeDir := utils.MustTempDir("agent_test") changeHomeDir(testHomeDir) code := m.Run() - os.RemoveAll(testHomeDir) + _ = os.RemoveAll(testHomeDir) os.Exit(code) } func changeHomeDir(homeDir string) { - os.Setenv("HOME", homeDir) + _ = os.Setenv("HOME", homeDir) _ = config.LoadConfig(homeDir) } +func defaultHistoryStore() persistence.HistoryStore { + return jsondb.New() +} + func TestRunDAG(t *testing.T) { d := testLoadDAG(t, "run.yaml") a := &Agent{AgentConfig: &AgentConfig{DAG: d}} - status, _ := controller.NewDAGController(d).GetLastStatus() + status, _ := controller.New(d, defaultHistoryStore()).GetLastStatus() require.Equal(t, scheduler.SchedulerStatus_None, status.Status) go func() { @@ -49,7 +55,7 @@ func TestRunDAG(t *testing.T) { time.Sleep(100 * time.Millisecond) require.Eventually(t, func() bool { - status, err := controller.NewDAGController(d).GetLastStatus() + status, err := controller.New(d, defaultHistoryStore()).GetLastStatus() require.NoError(t, err) return status.Status == scheduler.SchedulerStatus_Success }, time.Second*2, time.Millisecond*100) @@ -59,7 +65,7 @@ func TestRunDAG(t *testing.T) { a = &Agent{AgentConfig: &AgentConfig{DAG: d}} err := a.Run(context.Background()) require.NoError(t, err) - statusList := controller.NewDAGController(d).GetRecentStatuses(100) + statusList := controller.New(d, defaultHistoryStore()).GetRecentStatuses(100) require.Equal(t, 1, len(statusList)) } @@ -101,7 +107,7 @@ func TestCancelDAG(t *testing.T) { time.Sleep(time.Millisecond * 100) abort(a) time.Sleep(time.Millisecond * 500) - status, err := controller.NewDAGController(d).GetLastStatus() + status, err := controller.New(d, defaultHistoryStore()).GetLastStatus() require.NoError(t, err) require.Equal(t, scheduler.SchedulerStatus_Cancel, status.Status) } diff --git a/internal/config/config.go b/internal/config/config.go index 7b8ed0b1..595785f4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -99,7 +99,7 @@ func LoadConfig(userHomeDir string) error { cfg := &Config{} err := viper.Unmarshal(cfg) if err != nil { - return fmt.Errorf("failed to unmarshal Config file: %w", err) + return fmt.Errorf("failed to unmarshal cfg file: %w", err) } instance = cfg loadLegacyEnvs() diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 7174e4fb..5486fbc0 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -3,6 +3,7 @@ package controller import ( "errors" "fmt" + "github.com/yohamta/dagu/internal/persistence" "os" "os/exec" "path" @@ -12,7 +13,6 @@ import ( "time" "github.com/yohamta/dagu/internal/dag" - "github.com/yohamta/dagu/internal/database" "github.com/yohamta/dagu/internal/models" "github.com/yohamta/dagu/internal/scheduler" "github.com/yohamta/dagu/internal/sock" @@ -54,14 +54,14 @@ func GrepDAG(dir string, pattern string) (ret []*GrepResult, errs []string, err errs = append(errs, fmt.Sprintf("grep %s failed: %s", fi.Name(), err)) continue } - dag, err := dl.LoadMetadataOnly(fn) + d, err := dl.LoadMetadataOnly(fn) if err != nil { errs = append(errs, fmt.Sprintf("check %s failed: %s", fi.Name(), err)) continue } ret = append(ret, &GrepResult{ Name: strings.TrimSuffix(fi.Name(), path.Ext(fi.Name())), - DAG: dag, + DAG: d, Matches: m, }) } @@ -87,31 +87,32 @@ func CreateDAG(file string) error { return os.WriteFile(file, []byte(_DAGTemplate), 0644) } +// DAGController is a object to interact with a DAG. +type DAGController struct { + DAG *dag.DAG + historyStore persistence.HistoryStore +} + +func New(d *dag.DAG, historyStore persistence.HistoryStore) *DAGController { + return &DAGController{ + DAG: d, + historyStore: historyStore, + } +} + // MoveDAG moves the DAG file. -func MoveDAG(oldDAGPath, newDAGPath string) error { +func (dc *DAGController) MoveDAG(oldDAGPath, newDAGPath string) error { if err := validateLocation(newDAGPath); err != nil { return err } if err := os.Rename(oldDAGPath, newDAGPath); err != nil { return err } - db := database.New() - return db.MoveData(oldDAGPath, newDAGPath) -} - -// DAGController is a object to interact with a DAG. -type DAGController struct { - *dag.DAG -} - -func NewDAGController(d *dag.DAG) *DAGController { - return &DAGController{ - DAG: d, - } + return dc.historyStore.Rename(oldDAGPath, newDAGPath) } func (dc *DAGController) Stop() error { - client := sock.Client{Addr: dc.SockAddr()} + client := sock.Client{Addr: dc.DAG.SockAddr()} _, err := client.Request("POST", "/stop") return err } @@ -129,7 +130,7 @@ func (dc *DAGController) Start(binPath string, workDir string, params string) er args = append(args, "-p") args = append(args, fmt.Sprintf(`"%s"`, utils.EscapeArg(params, false))) } - args = append(args, dc.Location) + args = append(args, dc.DAG.Location) cmd := exec.Command(binPath, args...) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0} cmd.Dir = workDir @@ -148,7 +149,7 @@ func (dc *DAGController) Retry(binPath string, workDir string, reqId string) (er go func() { args := []string{"retry"} args = append(args, fmt.Sprintf("--req=%s", reqId)) - args = append(args, dc.Location) + args = append(args, dc.DAG.Location) cmd := exec.Command(binPath, args...) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0} cmd.Dir = workDir @@ -164,7 +165,7 @@ func (dc *DAGController) Retry(binPath string, workDir string, reqId string) (er } func (dc *DAGController) Restart(bin string, workDir string) error { - args := []string{"restart", dc.Location} + args := []string{"restart", dc.DAG.Location} cmd := exec.Command(bin, args...) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0} cmd.Dir = workDir @@ -177,7 +178,7 @@ func (dc *DAGController) Restart(bin string, workDir string) error { } func (dc *DAGController) GetStatus() (*models.Status, error) { - client := sock.Client{Addr: dc.SockAddr()} + client := sock.Client{Addr: dc.DAG.SockAddr()} ret, err := client.Request("GET", "/status") if err != nil { if errors.Is(err, sock.ErrTimeout) { @@ -190,18 +191,17 @@ func (dc *DAGController) GetStatus() (*models.Status, error) { } func (dc *DAGController) GetLastStatus() (*models.Status, error) { - client := sock.Client{Addr: dc.SockAddr()} + client := sock.Client{Addr: dc.DAG.SockAddr()} ret, err := client.Request("GET", "/status") if err == nil { return models.StatusFromJson(ret) } if err == nil || !errors.Is(err, sock.ErrTimeout) { - db := database.New() - status, err := db.ReadStatusToday(dc.Location) + status, err := dc.historyStore.ReadStatusToday(dc.DAG.Location) if err != nil { var readErr error = nil - if err != database.ErrNoStatusDataToday && err != database.ErrNoStatusData { + if !errors.Is(err, persistence.ErrNoStatusDataToday) && !errors.Is(err, persistence.ErrNoStatusData) { fmt.Printf("read status failed : %s", err) readErr = err } @@ -215,8 +215,7 @@ func (dc *DAGController) GetLastStatus() (*models.Status, error) { } func (dc *DAGController) GetStatusByRequestId(requestId string) (*models.Status, error) { - db := database.New() - ret, err := db.FindByRequestId(dc.Location, requestId) + ret, err := dc.historyStore.FindByRequestId(dc.DAG.Location, requestId) if err != nil { return nil, err } @@ -229,13 +228,12 @@ func (dc *DAGController) GetStatusByRequestId(requestId string) (*models.Status, } func (dc *DAGController) GetRecentStatuses(n int) []*models.StatusFile { - db := database.New() - ret := db.ReadStatusHist(dc.Location, n) + ret := dc.historyStore.ReadStatusHist(dc.DAG.Location, n) return ret } func (dc *DAGController) UpdateStatus(status *models.Status) error { - client := sock.Client{Addr: dc.SockAddr()} + client := sock.Client{Addr: dc.DAG.SockAddr()} res, err := client.Request("GET", "/status") if err != nil { if errors.Is(err, sock.ErrTimeout) { @@ -248,17 +246,7 @@ func (dc *DAGController) UpdateStatus(status *models.Status) error { return fmt.Errorf("the DAG is running") } } - db := database.New() - toUpdate, err := db.FindByRequestId(dc.Location, status.RequestId) - if err != nil { - return err - } - w := &database.Writer{Target: toUpdate.File} - if err := w.Open(); err != nil { - return err - } - defer w.Close() - return w.Write(status) + return dc.historyStore.Update(dc.DAG.Location, status.RequestId, status) } func (dc *DAGController) UpdateDAGSpec(value string) error { @@ -268,20 +256,19 @@ func (dc *DAGController) UpdateDAGSpec(value string) error { if err != nil { return err } - if !utils.FileExists(dc.Location) { - return fmt.Errorf("the config file %s does not exist", dc.Location) + if !utils.FileExists(dc.DAG.Location) { + return fmt.Errorf("the config file %s does not exist", dc.DAG.Location) } - err = os.WriteFile(dc.Location, []byte(value), 0755) + err = os.WriteFile(dc.DAG.Location, []byte(value), 0755) return err } func (dc *DAGController) DeleteDAG() error { - db := database.New() - err := db.RemoveAll(dc.Location) + err := dc.historyStore.RemoveAll(dc.DAG.Location) if err != nil { return err } - return os.Remove(dc.Location) + return os.Remove(dc.DAG.Location) } func validateLocation(dagLocation string) error { diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index 1ce0e9c3..e848cfea 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -1,6 +1,7 @@ package controller_test import ( + "github.com/yohamta/dagu/internal/persistence/jsondb" "io" "net/http" "os" @@ -12,7 +13,6 @@ import ( "github.com/yohamta/dagu/internal/config" "github.com/yohamta/dagu/internal/controller" "github.com/yohamta/dagu/internal/dag" - "github.com/yohamta/dagu/internal/database" "github.com/yohamta/dagu/internal/models" "github.com/yohamta/dagu/internal/scheduler" "github.com/yohamta/dagu/internal/sock" @@ -27,23 +27,23 @@ func TestMain(m *testing.M) { tempDir := utils.MustTempDir("controller_test") changeHomeDir(tempDir) code := m.Run() - os.RemoveAll(tempDir) + _ = os.RemoveAll(tempDir) os.Exit(code) } func changeHomeDir(homeDir string) { - os.Setenv("HOME", homeDir) + _ = os.Setenv("HOME", homeDir) _ = config.LoadConfig(homeDir) } func TestGetStatusRunningAndDone(t *testing.T) { file := testDAG("get_status.yaml") - dr := controller.NewDAGStatusReader() + dr := controller.NewDAGStatusReader(jsondb.New()) ds, err := dr.ReadStatus(file, false) require.NoError(t, err) - dc := controller.NewDAGController(ds.DAG) + dc := controller.New(ds.DAG, jsondb.New()) socketServer, _ := sock.NewServer( &sock.Config{ @@ -90,25 +90,24 @@ func TestUpdateStatus(t *testing.T) { file = testDAG("update_status.yaml") requestId = "test-update-status" now = time.Now() - dr = controller.NewDAGStatusReader() - db = &database.Database{Config: database.DefaultConfig()} + dr = controller.NewDAGStatusReader(jsondb.New()) ) - dag, err := dr.ReadStatus(file, false) + d, err := dr.ReadStatus(file, false) require.NoError(t, err) - dc := controller.NewDAGController(dag.DAG) + hs := jsondb.New() + dc := controller.New(d.DAG, hs) - w, _, _ := db.NewWriter(dag.DAG.Location, now, requestId) - err = w.Open() + err = hs.Open(d.DAG.Location, now, requestId) require.NoError(t, err) - st := testNewStatus(dag.DAG, requestId, + st := testNewStatus(d.DAG, requestId, scheduler.SchedulerStatus_Success, scheduler.NodeStatus_Success) - err = w.Write(st) + err = hs.Write(st) require.NoError(t, err) - w.Close() + _ = hs.Close() time.Sleep(time.Millisecond * 100) @@ -133,15 +132,15 @@ func TestUpdateStatusError(t *testing.T) { var ( file = testDAG("update_status_failed.yaml") requestId = "test-update-status-failure" - dr = controller.NewDAGStatusReader() + dr = controller.NewDAGStatusReader(jsondb.New()) ) - dag, err := dr.ReadStatus(file, false) + d, err := dr.ReadStatus(file, false) require.NoError(t, err) - dc := controller.NewDAGController(dag.DAG) + dc := controller.New(d.DAG, jsondb.New()) - status := testNewStatus(dag.DAG, requestId, + status := testNewStatus(d.DAG, requestId, scheduler.SchedulerStatus_Error, scheduler.NodeStatus_Error) err = dc.UpdateStatus(status) @@ -156,13 +155,13 @@ func TestUpdateStatusError(t *testing.T) { func TestStart(t *testing.T) { var ( file = testDAG("start.yaml") - dr = controller.NewDAGStatusReader() + dr = controller.NewDAGStatusReader(jsondb.New()) ) - dag, err := dr.ReadStatus(file, false) + d, err := dr.ReadStatus(file, false) require.NoError(t, err) - dc := controller.NewDAGController(dag.DAG) + dc := controller.New(d.DAG, jsondb.New()) err = dc.Start(path.Join(utils.MustGetwd(), "../../bin/dagu"), "", "") require.Error(t, err) @@ -174,13 +173,13 @@ func TestStart(t *testing.T) { func TestStop(t *testing.T) { var ( file = testDAG("stop.yaml") - dr = controller.NewDAGStatusReader() + dr = controller.NewDAGStatusReader(jsondb.New()) ) - dag, err := dr.ReadStatus(file, false) + d, err := dr.ReadStatus(file, false) require.NoError(t, err) - dc := controller.NewDAGController(dag.DAG) + dc := controller.New(d.DAG, jsondb.New()) dc.StartAsync(path.Join(utils.MustGetwd(), "../../bin/dagu"), "", "") require.Eventually(t, func() bool { @@ -199,13 +198,13 @@ func TestStop(t *testing.T) { func TestRestart(t *testing.T) { var ( file = testDAG("restart.yaml") - dr = controller.NewDAGStatusReader() + dr = controller.NewDAGStatusReader(jsondb.New()) ) - dag, err := dr.ReadStatus(file, false) + d, err := dr.ReadStatus(file, false) require.NoError(t, err) - dc := controller.NewDAGController(dag.DAG) + dc := controller.New(d.DAG, jsondb.New()) err = dc.Restart(path.Join(utils.MustGetwd(), "../../bin/dagu"), "") require.NoError(t, err) @@ -217,13 +216,13 @@ func TestRestart(t *testing.T) { func TestRetry(t *testing.T) { var ( file = testDAG("retry.yaml") - dr = controller.NewDAGStatusReader() + dr = controller.NewDAGStatusReader(jsondb.New()) ) - dag, err := dr.ReadStatus(file, false) + d, err := dr.ReadStatus(file, false) require.NoError(t, err) - dc := controller.NewDAGController(dag.DAG) + dc := controller.New(d.DAG, jsondb.New()) err = dc.Start(path.Join(utils.MustGetwd(), "../../bin/dagu"), "", "x y z") require.NoError(t, err) @@ -252,14 +251,16 @@ func TestRetry(t *testing.T) { func TestUpdate(t *testing.T) { tmpDir := utils.MustTempDir("controller-test-save") - defer os.RemoveAll(tmpDir) + defer func() { + _ = os.RemoveAll(tmpDir) + }() loc := path.Join(tmpDir, "test.yaml") d := &dag.DAG{ Name: "test", Location: loc, } - dc := controller.NewDAGController(d) + dc := controller.New(d, jsondb.New()) // invalid DAG invalidDAG := `name: test DAG` @@ -278,7 +279,9 @@ steps: // create a new DAG file newFile, _ := utils.CreateFile(loc) - defer newFile.Close() + defer func() { + _ = newFile.Close() + }() // Update the DAG err = dc.UpdateDAGSpec(validDAG) @@ -286,7 +289,9 @@ steps: // Check the content of the DAG file updatedFile, _ := os.Open(loc) - defer updatedFile.Close() + defer func() { + _ = updatedFile.Close() + }() b, err := io.ReadAll(updatedFile) require.NoError(t, err) require.Equal(t, validDAG, string(b)) @@ -294,7 +299,9 @@ steps: func TestRemove(t *testing.T) { tmpDir := utils.MustTempDir("controller-test-remove") - defer os.RemoveAll(tmpDir) + defer func() { + _ = os.RemoveAll(tmpDir) + }() loc := path.Join(tmpDir, "test.yaml") d := &dag.DAG{ @@ -302,7 +309,7 @@ func TestRemove(t *testing.T) { Location: loc, } - dc := controller.NewDAGController(d) + dc := controller.New(d, jsondb.New()) dagSpec := `name: test DAG steps: - name: "1" @@ -310,14 +317,18 @@ steps: ` // create file newFile, _ := utils.CreateFile(loc) - defer newFile.Close() + defer func() { + _ = newFile.Close() + }() err := dc.UpdateDAGSpec(dagSpec) require.NoError(t, err) // check file saved, _ := os.Open(loc) - defer saved.Close() + defer func() { + _ = saved.Close() + }() b, err := io.ReadAll(saved) require.NoError(t, err) require.Equal(t, dagSpec, string(b)) @@ -330,7 +341,9 @@ steps: func TestCreateNewDAG(t *testing.T) { tmpDir := utils.MustTempDir("controller-test-save") - defer os.RemoveAll(tmpDir) + defer func() { + _ = os.RemoveAll(tmpDir) + }() // invalid filename filename := path.Join(tmpDir, "test") @@ -357,7 +370,9 @@ func TestCreateNewDAG(t *testing.T) { func TestRenameDAG(t *testing.T) { tmpDir := utils.MustTempDir("controller-test-rename") - defer os.RemoveAll(tmpDir) + defer func() { + _ = os.RemoveAll(tmpDir) + }() oldName := path.Join(tmpDir, "rename_dag.yaml") newName := path.Join(tmpDir, "rename_dag_renamed.yaml") @@ -365,10 +380,16 @@ func TestRenameDAG(t *testing.T) { err := controller.CreateDAG(oldName) require.NoError(t, err) - err = controller.MoveDAG(oldName, "invalid-config-name") + dr := controller.NewDAGStatusReader(jsondb.New()) + d, err := dr.ReadStatus(oldName, false) + require.NoError(t, err) + + c := controller.New(d.DAG, jsondb.New()) + + err = c.MoveDAG(oldName, "invalid-config-name") require.Error(t, err) - err = controller.MoveDAG(oldName, newName) + err = c.MoveDAG(oldName, newName) require.NoError(t, err) require.FileExists(t, newName) } diff --git a/internal/controller/status_reader.go b/internal/controller/status_reader.go index 3b1b3fb3..1288aa79 100644 --- a/internal/controller/status_reader.go +++ b/internal/controller/status_reader.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "github.com/yohamta/dagu/internal/persistence" "os" "path/filepath" @@ -28,13 +29,15 @@ type DAGStatus struct { // DAGStatusReader is the struct to read DAGStatus. type DAGStatusReader struct { suspendChecker *suspend.SuspendChecker + historyStore persistence.HistoryStore } -func NewDAGStatusReader() *DAGStatusReader { +func NewDAGStatusReader(historyStore persistence.HistoryStore) *DAGStatusReader { return &DAGStatusReader{ suspendChecker: suspend.NewSuspendChecker( storage.NewStorage(config.Get().SuspendFlagsDir), ), + historyStore: historyStore, } } @@ -92,7 +95,7 @@ func (dr *DAGStatusReader) ReadStatus(dagLocation string, loadMetadataOnly bool) } } - dc := NewDAGController(d) + dc := New(d, dr.historyStore) status, err := dc.GetLastStatus() return dr.newDAGStatus(d, status, err), err } diff --git a/internal/controller/status_reader_test.go b/internal/controller/status_reader_test.go index 2d0f69d0..e5e475f3 100644 --- a/internal/controller/status_reader_test.go +++ b/internal/controller/status_reader_test.go @@ -1,6 +1,7 @@ package controller_test import ( + "github.com/yohamta/dagu/internal/persistence/jsondb" "path" "path/filepath" "testing" @@ -12,7 +13,7 @@ import ( func TestLoadConfig(t *testing.T) { var ( file = testDAG("invalid_dag.yaml") - dr = controller.NewDAGStatusReader() + dr = controller.NewDAGStatusReader(jsondb.New()) ) dag, err := dr.ReadStatus(file, false) @@ -24,7 +25,7 @@ func TestLoadConfig(t *testing.T) { } func TestReadAll(t *testing.T) { - dr := controller.NewDAGStatusReader() + dr := controller.NewDAGStatusReader(jsondb.New()) dags, _, err := dr.ReadAllStatus(testdataDir) require.NoError(t, err) require.Greater(t, len(dags), 0) @@ -40,7 +41,7 @@ func TestReadAll(t *testing.T) { func TestReadDAGStatus(t *testing.T) { var ( file = testDAG("read_status.yaml") - dr = controller.NewDAGStatusReader() + dr = controller.NewDAGStatusReader(jsondb.New()) ) _, err := dr.ReadStatus(file, false) diff --git a/internal/dag/builder_test.go b/internal/dag/builder_test.go index 5be75ed7..f8577fee 100644 --- a/internal/dag/builder_test.go +++ b/internal/dag/builder_test.go @@ -183,7 +183,7 @@ params: %s } func TestExpandingEnvs(t *testing.T) { - os.Setenv("FOO", "BAR") + _ = os.Setenv("FOO", "BAR") require.Equal(t, expandEnv("${FOO}", BuildDAGOptions{}), "BAR") require.Equal(t, expandEnv("${FOO}", BuildDAGOptions{skipEnvEval: true}), "${FOO}") } diff --git a/internal/dag/condition_test.go b/internal/dag/condition_test.go index e10c4997..a1205276 100644 --- a/internal/dag/condition_test.go +++ b/internal/dag/condition_test.go @@ -14,7 +14,7 @@ func TestCondition(t *testing.T) { require.NoError(t, err) } { - os.Setenv("TEST_CONDITION", "100") + _ = os.Setenv("TEST_CONDITION", "100") c := &Condition{Condition: "${TEST_CONDITION}", Expected: "100"} err := EvalCondition(c) require.NoError(t, err) diff --git a/internal/dag/dag_test.go b/internal/dag/dag_test.go index aa7fad56..d7314342 100644 --- a/internal/dag/dag_test.go +++ b/internal/dag/dag_test.go @@ -28,7 +28,7 @@ func TestMain(m *testing.M) { } func changeHomeDir(homeDir string) { - os.Setenv("HOME", homeDir) + _ = os.Setenv("HOME", homeDir) _ = config.LoadConfig(homeDir) } diff --git a/internal/executor/http.go b/internal/executor/http.go index 5cd333d7..55464352 100644 --- a/internal/executor/http.go +++ b/internal/executor/http.go @@ -24,11 +24,11 @@ type HTTPExecutor struct { } type HTTPConfig struct { - Timeout int `json:"timeout"` - Headers map[string]string `json:"headers"` - QueryParams map[string]string `json:"query"` - Body string `json:"body"` - Silent bool `json:"silent"` + Timeout int `jsondb:"timeout"` + Headers map[string]string `jsondb:"headers"` + QueryParams map[string]string `jsondb:"query"` + Body string `jsondb:"body"` + Silent bool `jsondb:"silent"` } func (e *HTTPExecutor) SetStdout(out io.Writer) { diff --git a/internal/models/node.go b/internal/models/node.go index 7c109e62..4ec316a6 100644 --- a/internal/models/node.go +++ b/internal/models/node.go @@ -9,15 +9,15 @@ import ( ) type Node struct { - *dag.Step `json:"Step"` - Log string `json:"Log"` - StartedAt string `json:"StartedAt"` - FinishedAt string `json:"FinishedAt"` - Status scheduler.NodeStatus `json:"Status"` - RetryCount int `json:"RetryCount"` - DoneCount int `json:"DoneCount"` - Error string `json:"Error"` - StatusText string `json:"StatusText"` + *dag.Step `jsondb:"Step"` + Log string `jsondb:"Log"` + StartedAt string `jsondb:"StartedAt"` + FinishedAt string `jsondb:"FinishedAt"` + Status scheduler.NodeStatus `jsondb:"Status"` + RetryCount int `jsondb:"RetryCount"` + DoneCount int `jsondb:"DoneCount"` + Error string `jsondb:"Error"` + StatusText string `jsondb:"StatusText"` } func (n *Node) ToNode() *scheduler.Node { diff --git a/internal/models/status.go b/internal/models/status.go index 476051d9..1f9cbdf8 100644 --- a/internal/models/status.go +++ b/internal/models/status.go @@ -12,7 +12,7 @@ import ( ) type StatusResponse struct { - Status *Status `json:"status"` + Status *Status `jsondb:"status"` } type Pid int @@ -31,20 +31,20 @@ func (p Pid) IsRunning() bool { } type Status struct { - RequestId string `json:"RequestId"` - Name string `json:"Name"` - Status scheduler.SchedulerStatus `json:"Status"` - StatusText string `json:"StatusText"` - Pid Pid `json:"Pid"` - Nodes []*Node `json:"Nodes"` - OnExit *Node `json:"OnExit"` - OnSuccess *Node `json:"OnSuccess"` - OnFailure *Node `json:"OnFailure"` - OnCancel *Node `json:"OnCancel"` - StartedAt string `json:"StartedAt"` - FinishedAt string `json:"FinishedAt"` - Log string `json:"Log"` - Params string `json:"Params"` + RequestId string `jsondb:"RequestId"` + Name string `jsondb:"Name"` + Status scheduler.SchedulerStatus `jsondb:"Status"` + StatusText string `jsondb:"StatusText"` + Pid Pid `jsondb:"Pid"` + Nodes []*Node `jsondb:"Nodes"` + OnExit *Node `jsondb:"OnExit"` + OnSuccess *Node `jsondb:"OnSuccess"` + OnFailure *Node `jsondb:"OnFailure"` + OnCancel *Node `jsondb:"OnCancel"` + StartedAt string `jsondb:"StartedAt"` + FinishedAt string `jsondb:"FinishedAt"` + Log string `jsondb:"Log"` + Params string `jsondb:"Params"` } type StatusFile struct { diff --git a/internal/persistence/interface.go b/internal/persistence/interface.go new file mode 100644 index 00000000..577b1853 --- /dev/null +++ b/internal/persistence/interface.go @@ -0,0 +1,26 @@ +package persistence + +import ( + "fmt" + "github.com/yohamta/dagu/internal/models" + "time" +) + +type HistoryStore interface { + Open(dagFile string, t time.Time, requestId string) error + Write(st *models.Status) error + Close() error + Update(dagFile, requestId string, st *models.Status) error + ReadStatusHist(dagFile string, n int) []*models.StatusFile + ReadStatusToday(dagFile string) (*models.Status, error) + FindByRequestId(dagFile string, requestId string) (*models.StatusFile, error) + RemoveAll(dagFile string) error + RemoveOld(dagFile string, retentionDays int) error + Rename(oldDAGFile, newDAGFile string) error +} + +var ( + ErrRequestIdNotFound = fmt.Errorf("request id not found") + ErrNoStatusDataToday = fmt.Errorf("no status data today") + ErrNoStatusData = fmt.Errorf("no status data") +) diff --git a/internal/database/database.go b/internal/persistence/jsondb/database.go similarity index 58% rename from internal/database/database.go rename to internal/persistence/jsondb/database.go index 4e2c678f..8c657e8c 100644 --- a/internal/database/database.go +++ b/internal/persistence/jsondb/database.go @@ -1,10 +1,11 @@ -package database +package jsondb import ( "bufio" "crypto/md5" "encoding/hex" "fmt" + "github.com/yohamta/dagu/internal/persistence" "io" "log" "os" @@ -20,30 +21,75 @@ import ( "github.com/yohamta/dagu/internal/utils" ) -// Database is the interfact to store workflow status in local. -// It stores status in JSON format in a directory as per each configPath. +// Store is the interfact to store workflow status in local. +// It stores status in JSON format in a directory as per each dagFile. // Multiple JSON data can be stored in a single file and each data // is separated by newline. // When a data is updated, it appends a new line to the file. // Only the latest data in a single file can be read. // When Compact is called, it removes old data. // Compact must be called only once per file. -type Database struct { +type Store struct { *Config + writer *writer } type Config struct { Dir string } -// DefaultConfig is the default configuration for Database. -func DefaultConfig() *Config { +// defaultConfig is the default configuration for Store. +func defaultConfig() *Config { return &Config{Dir: config.Get().DataDir} } -// New creates a new Database with default configuration. -func New() *Database { - return &Database{Config: DefaultConfig()} +// New creates a new Store with default configuration. +func New() *Store { + return &Store{Config: defaultConfig()} +} + +func (store *Store) Update(dagFile, requestId string, s *models.Status) error { + f, err := store.FindByRequestId(dagFile, requestId) + if err != nil { + return err + } + w := &writer{target: f.File} + if err := w.open(); err != nil { + return err + } + defer func() { + _ = w.close() + }() + return w.write(s) +} + +func (store *Store) Open(dagFile string, t time.Time, requestId string) error { + writer, _, err := store.newWriter(dagFile, t, requestId) + if err != nil { + return err + } + if err := writer.open(); err != nil { + return err + } + store.writer = writer + return nil +} + +func (store *Store) Write(s *models.Status) error { + return store.writer.write(s) +} + +func (store *Store) Close() error { + if store.writer == nil { + return nil + } + defer func() { + _ = store.writer.close() + }() + if err := store.Compact(store.writer.dagFile, store.writer.target); err != nil { + return err + } + return store.writer.close() } // ParseFile parses a status file. @@ -53,7 +99,9 @@ func ParseFile(file string) (*models.Status, error) { log.Printf("failed to open file. err: %v", err) return nil, err } - defer f.Close() + defer func() { + _ = f.Close() + }() var offset int64 = 0 var ret *models.Status for { @@ -79,19 +127,19 @@ func ParseFile(file string) (*models.Status, error) { } // NewWriter creates a new writer for a status. -func (db *Database) NewWriter(configPath string, t time.Time, requestId string) (*Writer, string, error) { - f, err := db.newFile(configPath, t, requestId) +func (store *Store) newWriter(dagFile string, t time.Time, requestId string) (*writer, string, error) { + f, err := store.newFile(dagFile, t, requestId) if err != nil { return nil, "", err } - w := &Writer{Target: f} + w := &writer{target: f, dagFile: dagFile} return w, f, nil } // ReadStatusHist returns a list of status files. -func (db *Database) ReadStatusHist(configPath string, n int) []*models.StatusFile { +func (store *Store) ReadStatusHist(dagFile string, n int) []*models.StatusFile { ret := make([]*models.StatusFile, 0) - files := db.latest(db.pattern(configPath)+"*.dat", n) + files := store.latest(store.pattern(dagFile)+"*.dat", n) for _, file := range files { status, err := ParseFile(file) if err == nil { @@ -105,8 +153,8 @@ func (db *Database) ReadStatusHist(configPath string, n int) []*models.StatusFil } // ReadStatusToday returns a list of status files. -func (db *Database) ReadStatusToday(configPath string) (*models.Status, error) { - file, err := db.latestToday(configPath, time.Now()) +func (store *Store) ReadStatusToday(dagFile string) (*models.Status, error) { + file, err := store.latestToday(dagFile, time.Now()) if err != nil { return nil, err } @@ -114,11 +162,11 @@ func (db *Database) ReadStatusToday(configPath string) (*models.Status, error) { } // FindByRequestId finds a status file by requestId. -func (db *Database) FindByRequestId(configPath string, requestId string) (*models.StatusFile, error) { +func (store *Store) FindByRequestId(dagFile string, requestId string) (*models.StatusFile, error) { if requestId == "" { return nil, fmt.Errorf("requestId is empty") } - pattern := db.pattern(configPath) + "*.dat" + pattern := store.pattern(dagFile) + "*.dat" matches, err := filepath.Glob(pattern) if len(matches) > 0 || err == nil { sort.Slice(matches, func(i, j int) bool { @@ -138,17 +186,17 @@ func (db *Database) FindByRequestId(configPath string, requestId string) (*model } } } - return nil, fmt.Errorf("%w : %s", ErrRequestIdNotFound, requestId) + return nil, fmt.Errorf("%w : %s", persistence.ErrRequestIdNotFound, requestId) } // RemoveAll removes all files in a directory. -func (db *Database) RemoveAll(configPath string) error { - return db.RemoveOld(configPath, 0) +func (store *Store) RemoveAll(dagFile string) error { + return store.RemoveOld(dagFile, 0) } // RemoveOld removes old files. -func (db *Database) RemoveOld(configPath string, retentionDays int) error { - pattern := db.pattern(configPath) + "*.dat" +func (store *Store) RemoveOld(dagFile string, retentionDays int) error { + pattern := store.pattern(dagFile) + "*.dat" var lastErr error = nil if retentionDays >= 0 { matches, _ := filepath.Glob(pattern) @@ -166,22 +214,24 @@ func (db *Database) RemoveOld(configPath string, retentionDays int) error { } // Compact creates a new file with only the latest data and removes old data. -func (db *Database) Compact(configPath, original string) error { +func (store *Store) Compact(_, original string) error { status, err := ParseFile(original) if err != nil { return err } - new := fmt.Sprintf("%s_c.dat", + newFile := fmt.Sprintf("%s_c.dat", strings.TrimSuffix(filepath.Base(original), path.Ext(original))) - f := path.Join(filepath.Dir(original), new) - w := &Writer{Target: f} - if err := w.Open(); err != nil { + f := path.Join(filepath.Dir(original), newFile) + w := &writer{target: f} + if err := w.open(); err != nil { return err } - defer w.Close() + defer func() { + _ = w.close() + }() - if err := w.Write(status); err != nil { + if err := w.write(status); err != nil { if err := os.Remove(f); err != nil { log.Printf("failed to remove %s : %s", f, err.Error()) } @@ -196,9 +246,9 @@ func (db *Database) Compact(configPath, original string) error { } // MoveData moves data from one directory to another. -func (db *Database) MoveData(oldPath, newPath string) error { - oldDir := db.dir(oldPath, prefix(oldPath)) - newDir := db.dir(newPath, prefix(newPath)) +func (store *Store) Rename(oldPath, newPath string) error { + oldDir := store.dir(oldPath, prefix(oldPath)) + newDir := store.dir(newPath, prefix(newPath)) if !utils.FileExists(oldDir) { // No need to move data return nil @@ -208,60 +258,60 @@ func (db *Database) MoveData(oldPath, newPath string) error { return err } } - matches, err := filepath.Glob(db.pattern(oldPath) + "*.dat") + matches, err := filepath.Glob(store.pattern(oldPath) + "*.dat") if err != nil { return err } - oldPattern := path.Base(db.pattern(oldPath)) - newPattern := path.Base(db.pattern(newPath)) + oldPattern := path.Base(store.pattern(oldPath)) + newPattern := path.Base(store.pattern(newPath)) for _, m := range matches { base := path.Base(m) f := strings.Replace(base, oldPattern, newPattern, 1) _ = os.Rename(m, path.Join(newDir, f)) } if files, _ := os.ReadDir(oldDir); len(files) == 0 { - os.Remove(oldDir) + _ = os.Remove(oldDir) } return nil } -func (db *Database) dir(configPath string, prefix string) string { +func (store *Store) dir(dagFile string, prefix string) string { h := md5.New() - h.Write([]byte(configPath)) + h.Write([]byte(dagFile)) v := hex.EncodeToString(h.Sum(nil)) - return filepath.Join(db.Dir, fmt.Sprintf("%s-%s", prefix, v)) + return filepath.Join(store.Dir, fmt.Sprintf("%s-%s", prefix, v)) } -func (db *Database) newFile(configPath string, t time.Time, requestId string) (string, error) { - if configPath == "" { - return "", fmt.Errorf("configPath is empty") +func (store *Store) newFile(dagFile string, t time.Time, requestId string) (string, error) { + if dagFile == "" { + return "", fmt.Errorf("dagFile is empty") } - fileName := fmt.Sprintf("%s.%s.%s.dat", db.pattern(configPath), t.Format("20060102.15:04:05.000"), utils.TruncString(requestId, 8)) + fileName := fmt.Sprintf("%s.%s.%s.dat", store.pattern(dagFile), t.Format("20060102.15:04:05.000"), utils.TruncString(requestId, 8)) return fileName, nil } -func (db *Database) pattern(configPath string) string { - p := prefix(configPath) - dir := db.dir(configPath, p) +func (store *Store) pattern(dagFile string) string { + p := prefix(dagFile) + dir := store.dir(dagFile, p) return filepath.Join(dir, p) } -func (db *Database) latestToday(configPath string, day time.Time) (string, error) { +func (store *Store) latestToday(dagFile string, day time.Time) (string, error) { var ret []string - pattern := fmt.Sprintf("%s.%s*.*.dat", db.pattern(configPath), day.Format("20060102")) + pattern := fmt.Sprintf("%s.%s*.*.dat", store.pattern(dagFile), day.Format("20060102")) matches, err := filepath.Glob(pattern) if err == nil || len(matches) > 0 { ret = filterLatest(matches, 1) } else { - return "", ErrNoStatusDataToday + return "", persistence.ErrNoStatusDataToday } if len(ret) == 0 { - return "", ErrNoStatusData + return "", persistence.ErrNoStatusData } return ret[0], err } -func (db *Database) latest(pattern string, n int) []string { +func (store *Store) latest(pattern string, n int) []string { matches, err := filepath.Glob(pattern) var ret = []string{} if err == nil || len(matches) >= 0 { @@ -270,12 +320,6 @@ func (db *Database) latest(pattern string, n int) []string { return ret } -var ( - ErrRequestIdNotFound = fmt.Errorf("request id not found") - ErrNoStatusDataToday = fmt.Errorf("no status data today") - ErrNoStatusData = fmt.Errorf("no status data") -) - var rTimestamp = regexp.MustCompile(`2\d{7}.\d{2}:\d{2}:\d{2}`) func filterLatest(files []string, n int) []string { @@ -322,9 +366,9 @@ func readLineFrom(f *os.File, offset int64) ([]byte, error) { return ret, nil } -func prefix(configPath string) string { +func prefix(dagFile string) string { return strings.TrimSuffix( - filepath.Base(configPath), - path.Ext(configPath), + filepath.Base(dagFile), + path.Ext(dagFile), ) } diff --git a/internal/database/database_test.go b/internal/persistence/jsondb/database_test.go similarity index 86% rename from internal/database/database_test.go rename to internal/persistence/jsondb/database_test.go index e515a2f1..bd05fbd1 100644 --- a/internal/database/database_test.go +++ b/internal/persistence/jsondb/database_test.go @@ -1,4 +1,4 @@ -package database +package jsondb import ( "fmt" @@ -11,7 +11,6 @@ import ( "time" "github.com/stretchr/testify/require" - "github.com/yohamta/dagu/internal/config" "github.com/yohamta/dagu/internal/dag" "github.com/yohamta/dagu/internal/models" "github.com/yohamta/dagu/internal/scheduler" @@ -20,7 +19,7 @@ import ( func TestDatabase(t *testing.T) { for scenario, fn := range map[string]func( - t *testing.T, db *Database, + t *testing.T, db *Store, ){ "create new datafile": testNewDataFile, "write status to file and rename": testWriteStatusToFile, @@ -35,20 +34,22 @@ func TestDatabase(t *testing.T) { "test error parse file": testErrorParseFile, } { t.Run(scenario, func(t *testing.T) { - dir, err := os.MkdirTemp("", "test-database") - db := &Database{ + dir, err := os.MkdirTemp("", "test-persistence") + db := &Store{ Config: &Config{ Dir: dir, }, } require.NoError(t, err) - defer os.RemoveAll(dir) + defer func() { + _ = os.RemoveAll(dir) + }() fn(t, db) }) } } -func testNewDataFile(t *testing.T, db *Database) { +func testNewDataFile(t *testing.T, db *Store) { d := &dag.DAG{ Location: "test_new_data_file.yaml", } @@ -64,7 +65,7 @@ func testNewDataFile(t *testing.T, db *Database) { require.Error(t, err) } -func testWriteAndFindFiles(t *testing.T, db *Database) { +func testWriteAndFindFiles(t *testing.T, db *Store) { d := &dag.DAG{ Name: "test_read_status_n", Location: "test_data_files_n.yaml", @@ -103,7 +104,7 @@ func testWriteAndFindFiles(t *testing.T, db *Database) { require.Equal(t, 2, len(files)) } -func testWriteAndFindByRequestId(t *testing.T, db *Database) { +func testWriteAndFindByRequestId(t *testing.T, db *Store) { d := &dag.DAG{ Name: "test_find_by_request_id", Location: "test_find_by_request_id.yaml", @@ -147,7 +148,7 @@ func testWriteAndFindByRequestId(t *testing.T, db *Database) { require.Nil(t, status) } -func testRemoveOldFiles(t *testing.T, db *Database) { +func testRemoveOldFiles(t *testing.T, db *Store) { d := &dag.DAG{ Location: "test_remove_old.yaml", } @@ -190,27 +191,27 @@ func testRemoveOldFiles(t *testing.T, db *Database) { require.Equal(t, 0, len(m)) } -func testReadLatestStatus(t *testing.T, db *Database) { +func testReadLatestStatus(t *testing.T, db *Store) { d := &dag.DAG{ Location: "test_config_status_reader.yaml", } requestId := "request-id-1" - dw, _, err := db.NewWriter(d.Location, time.Now(), requestId) + dw, _, err := db.newWriter(d.Location, time.Now(), requestId) require.NoError(t, err) - err = dw.Open() + err = dw.open() require.NoError(t, err) defer func() { - _ = dw.Close() + _ = dw.close() }() status := models.NewStatus(d, nil, scheduler.SchedulerStatus_None, 10000, nil, nil) - err = dw.Write(status) + err = dw.write(status) require.NoError(t, err) status.Status = scheduler.SchedulerStatus_Success status.Pid = 20000 - _ = dw.Write(status) + _ = dw.write(status) ret, err := db.ReadStatusToday(d.Location) @@ -221,7 +222,7 @@ func testReadLatestStatus(t *testing.T, db *Database) { } -func testReadStatusN(t *testing.T, db *Database) { +func testReadStatusN(t *testing.T, db *Store) { d := &dag.DAG{ Name: "test_read_status_n", Location: "test_config_status_reader_hist.yaml", @@ -262,16 +263,16 @@ func testReadStatusN(t *testing.T, db *Database) { require.Equal(t, d.Name, ret[1].Status.Name) } -func testCompactFile(t *testing.T, db *Database) { +func testCompactFile(t *testing.T, db *Store) { d := &dag.DAG{ Name: "test_compact_file", Location: "test_compact_file.yaml", } requestId := "request-id-1" - dw, _, err := db.NewWriter(d.Location, time.Now(), requestId) + dw, _, err := db.newWriter(d.Location, time.Now(), requestId) require.NoError(t, err) - require.NoError(t, dw.Open()) + require.NoError(t, dw.open()) for _, data := range []struct { Status *models.Status @@ -283,10 +284,10 @@ func testCompactFile(t *testing.T, db *Database) { {models.NewStatus( d, nil, scheduler.SchedulerStatus_Success, 10000, nil, nil)}, } { - require.NoError(t, dw.Write(data.Status)) + require.NoError(t, dw.write(data.Status)) } - dw.Close() + _ = dw.close() var s *models.StatusFile = nil if h := db.ReadStatusHist(d.Location, 1); len(h) > 0 { @@ -294,7 +295,7 @@ func testCompactFile(t *testing.T, db *Database) { } require.NotNil(t, s) - db2 := &Database{ + db2 := &Store{ Config: db.Config, } err = db2.Compact(d.Location, s.File) @@ -314,11 +315,11 @@ func testCompactFile(t *testing.T, db *Database) { require.Error(t, err) } -func testErrorReadFile(t *testing.T, db *Database) { +func testErrorReadFile(t *testing.T, db *Store) { _, err := ParseFile("invalid_file.dat") require.Error(t, err) - _, _, err = db.NewWriter("", time.Now(), "") + _, _, err = db.newWriter("", time.Now(), "") require.Error(t, err) _, err = db.ReadStatusToday("invalid_file.yaml") @@ -328,7 +329,7 @@ func testErrorReadFile(t *testing.T, db *Database) { require.Error(t, err) } -func testErrorParseFile(t *testing.T, db *Database) { +func testErrorParseFile(t *testing.T, _ *Store) { tmpDir := utils.MustTempDir("test_error_parse_file") tmpFile := filepath.Join(tmpDir, "test_error_parse_file.dat") @@ -341,7 +342,7 @@ func testErrorParseFile(t *testing.T, db *Database) { _, err = ParseFile(tmpFile) require.Error(t, err) - _, err = f.WriteString("invalid json") + _, err = f.WriteString("invalid jsondb") require.NoError(t, err) _, err = ParseFile(tmpFile) @@ -354,18 +355,15 @@ func testErrorParseFile(t *testing.T, db *Database) { require.NoError(t, err) } -func testWriteStatus(t *testing.T, db *Database, d *dag.DAG, status *models.Status, tm time.Time) { +func testWriteStatus(t *testing.T, db *Store, d *dag.DAG, status *models.Status, tm time.Time) { t.Helper() - dw, _, err := db.NewWriter(d.Location, tm, status.RequestId) + dw, _, err := db.newWriter(d.Location, tm, status.RequestId) require.NoError(t, err) - require.NoError(t, dw.Open()) - defer dw.Close() - require.NoError(t, dw.Write(status)) -} - -func TestDefaultConfig(t *testing.T) { - d := DefaultConfig() - require.Equal(t, d.Dir, config.Get().DataDir) + require.NoError(t, dw.open()) + defer func() { + _ = dw.close() + }() + require.NoError(t, dw.write(status)) } func TestTimestamp(t *testing.T) { @@ -382,7 +380,9 @@ func TestTimestamp(t *testing.T) { func TestReadLine(t *testing.T) { tmpDir := utils.MustTempDir("test_read_line") - defer os.RemoveAll(tmpDir) + defer func() { + _ = os.RemoveAll(tmpDir) + }() tmpFile := filepath.Join(tmpDir, "test_read_line.dat") f, err := os.OpenFile(tmpFile, os.O_CREATE|os.O_WRONLY, 0644) diff --git a/internal/database/writer.go b/internal/persistence/jsondb/writer.go similarity index 67% rename from internal/database/writer.go rename to internal/persistence/jsondb/writer.go index 17500549..08754d0f 100644 --- a/internal/database/writer.go +++ b/internal/persistence/jsondb/writer.go @@ -1,4 +1,4 @@ -package database +package jsondb import ( "bufio" @@ -12,18 +12,19 @@ import ( ) // Writer is the interface to write status to local file. -type Writer struct { - Target string - writer *bufio.Writer - file *os.File - mu sync.Mutex - closed bool +type writer struct { + target string + dagFile string + writer *bufio.Writer + file *os.File + mu sync.Mutex + closed bool } // Open opens the writer. -func (w *Writer) Open() (err error) { - _ = os.MkdirAll(path.Dir(w.Target), 0755) - w.file, err = utils.OpenOrCreateFile(w.Target) +func (w *writer) open() (err error) { + _ = os.MkdirAll(path.Dir(w.target), 0755) + w.file, err = utils.OpenOrCreateFile(w.target) if err == nil { w.writer = bufio.NewWriter(w.file) } @@ -31,7 +32,7 @@ func (w *Writer) Open() (err error) { } // Writer appends the status to the local file. -func (w *Writer) Write(st *models.Status) error { +func (w *writer) write(st *models.Status) error { w.mu.Lock() defer w.mu.Unlock() jsonb, _ := st.ToJson() @@ -43,7 +44,9 @@ func (w *Writer) Write(st *models.Status) error { } // Close closes the writer. -func (w *Writer) Close() (err error) { +func (w *writer) close() (err error) { + w.mu.Lock() + defer w.mu.Unlock() if !w.closed { err = w.writer.Flush() utils.LogErr("flush file", err) diff --git a/internal/database/writer_test.go b/internal/persistence/jsondb/writer_test.go similarity index 76% rename from internal/database/writer_test.go rename to internal/persistence/jsondb/writer_test.go index 22e6ad9c..27621dd4 100644 --- a/internal/database/writer_test.go +++ b/internal/persistence/jsondb/writer_test.go @@ -1,4 +1,4 @@ -package database +package jsondb import ( "fmt" @@ -12,22 +12,22 @@ import ( "github.com/yohamta/dagu/internal/scheduler" ) -func testWriteStatusToFile(t *testing.T, db *Database) { +func testWriteStatusToFile(t *testing.T, db *Store) { d := &dag.DAG{ Name: "test_write_status", Location: "test_write_status.yaml", } - dw, file, err := db.NewWriter(d.Location, time.Now(), "request-id-1") + dw, file, err := db.newWriter(d.Location, time.Now(), "request-id-1") require.NoError(t, err) - require.NoError(t, dw.Open()) + require.NoError(t, dw.open()) defer func() { - _ = dw.Close() + _ = dw.close() _ = db.RemoveOld(d.Location, 0) }() status := models.NewStatus(d, nil, scheduler.SchedulerStatus_Running, 10000, nil, nil) status.RequestId = fmt.Sprintf("request-id-%d", time.Now().Unix()) - require.NoError(t, dw.Write(status)) + require.NoError(t, dw.write(status)) require.Regexp(t, ".*test_write_status.*", file) dat, err := os.ReadFile(file) @@ -38,7 +38,7 @@ func testWriteStatusToFile(t *testing.T, db *Database) { require.Equal(t, d.Name, r.Name) - err = dw.Close() + err = dw.close() require.NoError(t, err) old := d.Location @@ -49,7 +49,7 @@ func testWriteStatusToFile(t *testing.T, db *Database) { require.DirExists(t, oldDir) require.NoDirExists(t, newDir) - err = db.MoveData(old, new) + err = db.Rename(old, new) require.NoError(t, err) require.NoDirExists(t, oldDir) require.DirExists(t, newDir) @@ -59,30 +59,30 @@ func testWriteStatusToFile(t *testing.T, db *Database) { require.Equal(t, status.RequestId, ret[0].Status.RequestId) } -func testWriteStatusToExistingFile(t *testing.T, db *Database) { +func testWriteStatusToExistingFile(t *testing.T, db *Store) { d := &dag.DAG{ Name: "test_append_to_existing", Location: "test_append_to_existing.yaml", } - dw, file, err := db.NewWriter(d.Location, time.Now(), "request-id-1") + dw, file, err := db.newWriter(d.Location, time.Now(), "request-id-1") require.NoError(t, err) - require.NoError(t, dw.Open()) + require.NoError(t, dw.open()) status := models.NewStatus(d, nil, scheduler.SchedulerStatus_Cancel, 10000, nil, nil) status.RequestId = "request-id-test-write-status-to-existing-file" - require.NoError(t, dw.Write(status)) - dw.Close() + require.NoError(t, dw.write(status)) + dw.close() data, err := db.FindByRequestId(d.Location, status.RequestId) require.NoError(t, err) require.Equal(t, data.Status.Status, scheduler.SchedulerStatus_Cancel) require.Equal(t, file, data.File) - dw = &Writer{Target: file} - require.NoError(t, dw.Open()) + dw = &writer{target: file} + require.NoError(t, dw.open()) status.Status = scheduler.SchedulerStatus_Success - require.NoError(t, dw.Write(status)) - dw.Close() + require.NoError(t, dw.write(status)) + dw.close() data, err = db.FindByRequestId(d.Location, status.RequestId) require.NoError(t, err) diff --git a/internal/runner/agent_test.go b/internal/runner/agent_test.go index 692c0983..db70bbc2 100644 --- a/internal/runner/agent_test.go +++ b/internal/runner/agent_test.go @@ -1,6 +1,7 @@ package runner import ( + "github.com/yohamta/dagu/internal/persistence/jsondb" "os" "path" "testing" @@ -17,7 +18,7 @@ import ( func TestAgent(t *testing.T) { tmpDir := utils.MustTempDir("runner_agent_test") defer func() { - os.RemoveAll(tmpDir) + _ = os.RemoveAll(tmpDir) }() now := time.Date(2020, 1, 1, 1, 0, 0, 0, time.UTC) @@ -36,9 +37,9 @@ func TestAgent(t *testing.T) { pathToDAG := path.Join(testdataDir, "scheduled_job.yaml") loader := &dag.Loader{} - dag, err := loader.LoadMetadataOnly(pathToDAG) + d, err := loader.LoadMetadataOnly(pathToDAG) require.NoError(t, err) - c := controller.NewDAGController(dag) + c := controller.New(d, jsondb.New()) require.Eventually(t, func() bool { status, err := c.GetLastStatus() @@ -51,7 +52,7 @@ func TestAgent(t *testing.T) { func TestAgentForStop(t *testing.T) { tmpDir := utils.MustTempDir("runner_agent_test_for_stop") defer func() { - os.RemoveAll(tmpDir) + _ = os.RemoveAll(tmpDir) }() now := time.Date(2020, 1, 1, 1, 1, 0, 0, time.UTC) @@ -65,12 +66,12 @@ func TestAgentForStop(t *testing.T) { // read the test DAG file := path.Join(testdataDir, "start_stop.yaml") - dr := controller.NewDAGStatusReader() - dag, _ := dr.ReadStatus(file, false) - c := controller.NewDAGController(dag.DAG) + dr := controller.NewDAGStatusReader(jsondb.New()) + d, _ := dr.ReadStatus(file, false) + c := controller.New(d.DAG, jsondb.New()) j := &job{ - DAG: dag.DAG, + DAG: d.DAG, Config: testConfig, Next: time.Date(2020, 1, 1, 1, 0, 0, 0, time.UTC), } diff --git a/internal/runner/job.go b/internal/runner/job.go index af1b6df7..40936ba9 100644 --- a/internal/runner/job.go +++ b/internal/runner/job.go @@ -2,6 +2,7 @@ package runner import ( "errors" + "github.com/yohamta/dagu/internal/persistence/jsondb" "time" "github.com/yohamta/dagu/internal/config" @@ -33,7 +34,7 @@ var ( ) func (j *job) Start() error { - c := controller.NewDAGController(j.DAG) + c := controller.New(j.DAG, jsondb.New()) s, err := c.GetLastStatus() if err != nil { return err @@ -58,7 +59,7 @@ func (j *job) Start() error { } func (j *job) Stop() error { - c := controller.NewDAGController(j.DAG) + c := controller.New(j.DAG, jsondb.New()) s, err := c.GetLastStatus() if err != nil { return err @@ -70,7 +71,7 @@ func (j *job) Stop() error { } func (j *job) Restart() error { - c := controller.NewDAGController(j.DAG) + c := controller.New(j.DAG, jsondb.New()) return c.Restart(j.Config.Command, j.Config.WorkDir) } diff --git a/internal/runner/job_test.go b/internal/runner/job_test.go index ccb86556..d4028147 100644 --- a/internal/runner/job_test.go +++ b/internal/runner/job_test.go @@ -1,6 +1,7 @@ package runner import ( + "github.com/yohamta/dagu/internal/persistence/jsondb" "path" "testing" "time" @@ -13,9 +14,9 @@ import ( func TestJobStart(t *testing.T) { file := path.Join(testdataDir, "start.yaml") - dr := controller.NewDAGStatusReader() + dr := controller.NewDAGStatusReader(jsondb.New()) dag, _ := dr.ReadStatus(file, false) - c := controller.NewDAGController(dag.DAG) + c := controller.New(dag.DAG, jsondb.New()) j := &job{ DAG: dag.DAG, @@ -46,7 +47,7 @@ func TestJobStart(t *testing.T) { func TestJobSop(t *testing.T) { file := path.Join(testdataDir, "stop.yaml") - dr := controller.NewDAGStatusReader() + dr := controller.NewDAGStatusReader(jsondb.New()) dag, _ := dr.ReadStatus(file, false) j := &job{ @@ -59,7 +60,7 @@ func TestJobSop(t *testing.T) { _ = j.Start() }() - c := controller.NewDAGController(dag.DAG) + c := controller.New(dag.DAG, jsondb.New()) require.Eventually(t, func() bool { s, _ := c.GetLastStatus() diff --git a/internal/runner/runner_test.go b/internal/runner/runner_test.go index d7a2ad9a..2d992f1f 100644 --- a/internal/runner/runner_test.go +++ b/internal/runner/runner_test.go @@ -24,12 +24,12 @@ func TestMain(m *testing.M) { changeHomeDir(tempDir) testHomeDir = tempDir code := m.Run() - os.RemoveAll(tempDir) + _ = os.RemoveAll(tempDir) os.Exit(code) } func changeHomeDir(homeDir string) { - os.Setenv("HOME", homeDir) + _ = os.Setenv("HOME", homeDir) _ = config.LoadConfig(homeDir) } diff --git a/internal/scheduler/node_test.go b/internal/scheduler/node_test.go index 8305d446..40b229a1 100644 --- a/internal/scheduler/node_test.go +++ b/internal/scheduler/node_test.go @@ -231,7 +231,7 @@ func TestOutputJson(t *testing.T) { OutputVariables: &utils.SyncMap{}, }, } - err := n.setup(os.Getenv("HOME"), fmt.Sprintf("test-output-json-%d", i)) + err := n.setup(os.Getenv("HOME"), fmt.Sprintf("test-output-jsondb-%d", i)) require.NoError(t, err) defer func() { _ = n.teardown() diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index bbbb0391..a28eb9e5 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -26,12 +26,12 @@ func TestMain(m *testing.M) { testHomeDir = utils.MustTempDir("scheduler-test") changeHomeDir(testHomeDir) code := m.Run() - os.RemoveAll(testHomeDir) + _ = os.RemoveAll(testHomeDir) os.Exit(code) } func changeHomeDir(homeDir string) { - os.Setenv("HOME", homeDir) + _ = os.Setenv("HOME", homeDir) _ = config.LoadConfig(homeDir) } diff --git a/internal/sock/server_test.go b/internal/sock/server_test.go index fa758c81..4948f6fc 100644 --- a/internal/sock/server_test.go +++ b/internal/sock/server_test.go @@ -15,9 +15,9 @@ func TestMain(m *testing.M) { if err != nil { panic(err) } - os.Setenv("HOME", testHomeDir) + _ = os.Setenv("HOME", testHomeDir) code := m.Run() - os.RemoveAll(testHomeDir) + _ = os.RemoveAll(testHomeDir) os.Exit(code) } diff --git a/internal/utils/utils_test.go b/internal/utils/utils_test.go index 0b3130b1..ff3a1e93 100644 --- a/internal/utils/utils_test.go +++ b/internal/utils/utils_test.go @@ -122,7 +122,7 @@ func TestOpenOrCreateFile(t *testing.T) { } func TestParseVariable(t *testing.T) { - os.Setenv("TEST_VAR", "test") + _ = os.Setenv("TEST_VAR", "test") r, err := utils.ParseVariable("${TEST_VAR}") require.NoError(t, err) require.Equal(t, r, "test") diff --git a/internal/web/handlers/dag.go b/internal/web/handlers/dag.go index 7d9e6e10..13e0b676 100644 --- a/internal/web/handlers/dag.go +++ b/internal/web/handlers/dag.go @@ -2,6 +2,7 @@ package handlers import ( "fmt" + "github.com/yohamta/dagu/internal/persistence/jsondb" "io" "net/http" "os" @@ -15,7 +16,6 @@ import ( "github.com/yohamta/dagu/internal/constants" "github.com/yohamta/dagu/internal/controller" "github.com/yohamta/dagu/internal/dag" - "github.com/yohamta/dagu/internal/database" "github.com/yohamta/dagu/internal/models" "github.com/yohamta/dagu/internal/scheduler" "github.com/yohamta/dagu/internal/storage" @@ -86,13 +86,13 @@ func handleGetDAG() http.HandlerFunc { paramStep := getReqParam(r, "step") file := filepath.Join(cfg.DAGs, fmt.Sprintf("%s.yaml", dn)) - dr := controller.NewDAGStatusReader() + dr := controller.NewDAGStatusReader(jsondb.New()) d, err := dr.ReadStatus(file, false) if d == nil { encodeError(w, err) return } - c := controller.NewDAGController(d.DAG) + c := controller.New(d.DAG, jsondb.New()) data := newDAGResponse(d.DAG.Name, d, tab) if err != nil { data.Errors = append(data.Errors, err.Error()) @@ -104,7 +104,7 @@ func handleGetDAG() http.HandlerFunc { data.Definition, _ = dag.ReadFile(file) case dag_TabType_History: - logs := controller.NewDAGController(d.DAG).GetRecentStatuses(30) + logs := controller.New(d.DAG, jsondb.New()).GetRecentStatuses(30) data.LogData = buildLog(logs) case dag_TabType_StepLog: @@ -137,7 +137,7 @@ func handleGetDAG() http.HandlerFunc { } func isJsonRequest(r *http.Request) bool { - return r.Header.Get("Accept") == "application/json" + return r.Header.Get("Accept") == "application/jsondb" } func handlePostDAG() http.HandlerFunc { @@ -153,17 +153,17 @@ func handlePostDAG() http.HandlerFunc { dn := dagNameFromCtx(r.Context()) file := filepath.Join(cfg.DAGs, fmt.Sprintf("%s.yaml", dn)) - dr := controller.NewDAGStatusReader() - dag, err := dr.ReadStatus(file, false) + dr := controller.NewDAGStatusReader(jsondb.New()) + d, err := dr.ReadStatus(file, false) if err != nil && action != "save" { encodeError(w, err) return } - c := controller.NewDAGController(dag.DAG) + c := controller.New(d.DAG, jsondb.New()) switch action { case "start": - if dag.Status.Status == scheduler.SchedulerStatus_Running { + if d.Status.Status == scheduler.SchedulerStatus_Running { encodeError(w, fmt.Errorf("already running: %w", errInvalidArgs)) return } @@ -171,10 +171,10 @@ func handlePostDAG() http.HandlerFunc { case "suspend": sc := suspend.NewSuspendChecker(storage.NewStorage(config.Get().SuspendFlagsDir)) - _ = sc.ToggleSuspend(dag.DAG, value == "true") + _ = sc.ToggleSuspend(d.DAG, value == "true") case "stop": - if dag.Status.Status != scheduler.SchedulerStatus_Running { + if d.Status.Status != scheduler.SchedulerStatus_Running { encodeError(w, fmt.Errorf("the DAG is not running: %w", errInvalidArgs)) return } @@ -196,7 +196,7 @@ func handlePostDAG() http.HandlerFunc { } case "mark-success": - if dag.Status.Status == scheduler.SchedulerStatus_Running { + if d.Status.Status == scheduler.SchedulerStatus_Running { encodeError(w, fmt.Errorf("the DAG is still running: %w", errInvalidArgs)) return } @@ -216,7 +216,7 @@ func handlePostDAG() http.HandlerFunc { } case "mark-failed": - if dag.Status.Status == scheduler.SchedulerStatus_Running { + if d.Status.Status == scheduler.SchedulerStatus_Running { encodeError(w, fmt.Errorf("the DAG is still running: %w", errInvalidArgs)) return } @@ -247,7 +247,8 @@ func handlePostDAG() http.HandlerFunc { case "rename": newfile := nameWithExt(path.Join(cfg.DAGs, value)) - err := controller.MoveDAG(file, newfile) + c := controller.New(d.DAG, jsondb.New()) + err := c.MoveDAG(file, newfile) if err != nil { encodeError(w, err) return @@ -271,13 +272,13 @@ func handleDeleteDAG() http.HandlerFunc { dn := dagNameFromCtx(r.Context()) file := filepath.Join(cfg.DAGs, fmt.Sprintf("%s.yaml", dn)) - dr := controller.NewDAGStatusReader() - dag, err := dr.ReadStatus(file, false) + dr := controller.NewDAGStatusReader(jsondb.New()) + d, err := dr.ReadStatus(file, false) if err != nil { encodeError(w, err) } - c := controller.NewDAGController(dag.DAG) + c := controller.New(d.DAG, jsondb.New()) err = c.DeleteDAG() @@ -320,7 +321,7 @@ func readSchedulerLog(c *controller.DAGController, file string) (*logFile, error } f = s.Log } else { - s, err := database.ParseFile(file) + s, err := jsondb.ParseFile(file) if err != nil { return nil, fmt.Errorf("error parsing %s: %w", file, err) } @@ -355,7 +356,7 @@ func readStepLog(c *controller.DAGController, file, stepName, enc string) (*logF stepm[constants.OnCancel] = s.OnCancel stepm[constants.OnExit] = s.OnExit } else { - s, err := database.ParseFile(file) + s, err := jsondb.ParseFile(file) if err != nil { return nil, fmt.Errorf("error parsing %s: %w", file, err) } @@ -400,7 +401,9 @@ func readFile(f string, decorder *encoding.Decoder) ([]byte, error) { if err != nil { return nil, fmt.Errorf("error reading %s: %w", f, err) } - defer r.Close() + defer func() { + _ = r.Close() + }() tr := transform.NewReader(r, decorder) ret, err := io.ReadAll(tr) return ret, err diff --git a/internal/web/handlers/list.go b/internal/web/handlers/list.go index 43e5a903..0a70e65f 100644 --- a/internal/web/handlers/list.go +++ b/internal/web/handlers/list.go @@ -1,6 +1,7 @@ package handlers import ( + "github.com/yohamta/dagu/internal/persistence/jsondb" "net/http" "path" "path/filepath" @@ -22,7 +23,7 @@ func handleGetList() http.HandlerFunc { cfg := config.Get() return func(w http.ResponseWriter, r *http.Request) { dir := filepath.Join(cfg.DAGs) - dr := controller.NewDAGStatusReader() + dr := controller.NewDAGStatusReader(jsondb.New()) dags, errs, err := dr.ReadAllStatus(dir) if err != nil { encodeError(w, err) @@ -46,7 +47,7 @@ func handleGetList() http.HandlerFunc { Errors: errs, HasError: hasErr, } - if r.Header.Get("Accept") == "application/json" { + if r.Header.Get("Accept") == "application/jsondb" { renderJson(w, data) } else { renderFunc(w, data) diff --git a/internal/web/handlers/routes.go b/internal/web/handlers/routes.go index b66a6cad..26792dc3 100644 --- a/internal/web/handlers/routes.go +++ b/internal/web/handlers/routes.go @@ -44,7 +44,7 @@ func handleGetAssets() http.HandlerFunc { } func renderJson(w http.ResponseWriter, data interface{}) { - w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("Content-Type", "application/jsondb; charset=utf-8") w.WriteHeader(http.StatusOK) err := json.NewEncoder(w).Encode(data) if err != nil { diff --git a/internal/web/http_test.go b/internal/web/http_test.go index a9a35dc3..204ef018 100644 --- a/internal/web/http_test.go +++ b/internal/web/http_test.go @@ -15,7 +15,7 @@ import ( func TestHttpServerStartShutdown(t *testing.T) { dir, err := os.MkdirTemp("", "test_http_server") require.NoError(t, err) - os.RemoveAll(dir) + _ = os.RemoveAll(dir) host := "127.0.0.1" port := findPort(t) @@ -45,7 +45,7 @@ func TestHttpServerStartShutdown(t *testing.T) { func TestHttpServerShutdownWithAPI(t *testing.T) { dir, err := os.MkdirTemp("", "test_http_server") require.NoError(t, err) - os.RemoveAll(dir) + _ = os.RemoveAll(dir) host := "127.0.0.1" port := findPort(t) diff --git a/internal/web/setup_test.go b/internal/web/setup_test.go index 94954dcc..7469f3cd 100644 --- a/internal/web/setup_test.go +++ b/internal/web/setup_test.go @@ -14,7 +14,7 @@ var testHomeDir string func TestMain(m *testing.M) { testHomeDir = utils.MustTempDir("dagu-admin-test") - os.Setenv("HOST", "localhost") + _ = os.Setenv("HOST", "localhost") changeHomeDir(testdataDir) code := m.Run() _ = os.RemoveAll(testHomeDir) @@ -22,6 +22,6 @@ func TestMain(m *testing.M) { } func changeHomeDir(homeDir string) { - os.Setenv("HOME", homeDir) + _ = os.Setenv("HOME", homeDir) _ = config.LoadConfig(homeDir) } From a8dc4e25eefb6c163e0f3d2cbca74aa7c7c58d19 Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Sun, 27 Aug 2023 19:18:08 +0900 Subject: [PATCH 3/4] update Makefile --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 06eb43bc..57652ea0 100644 --- a/Makefile +++ b/Makefile @@ -41,11 +41,11 @@ build-ui: @cp ui/dist/*.woff2 ./internal/web/handlers/assets/ test: - @go test -v ./... + @go test ./... test-clean: @go clean -testcache - @go test -v ./... + @go test ./... lint: @golangci-lint run ./... From 173464f6f1a4afec190f711c84cbc143e34ddf34 Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Sun, 27 Aug 2023 19:38:10 +0900 Subject: [PATCH 4/4] fix bug --- internal/agent/agent.go | 2 +- internal/executor/http.go | 10 +++++----- internal/models/node.go | 18 +++++++++--------- internal/models/status.go | 30 +++++++++++++++--------------- internal/web/handlers/dag.go | 2 +- internal/web/handlers/list.go | 2 +- internal/web/handlers/routes.go | 2 +- 7 files changed, 33 insertions(+), 33 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 6bf506e4..2f230426 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -383,7 +383,7 @@ var ( ) func (a *Agent) handleHTTP(w http.ResponseWriter, r *http.Request) { - w.Header().Set("content-type", "application/jsondb") + w.Header().Set("content-type", "application/json") switch { case r.Method == http.MethodGet && statusRe.MatchString(r.URL.Path): status := a.Status() diff --git a/internal/executor/http.go b/internal/executor/http.go index 55464352..5cd333d7 100644 --- a/internal/executor/http.go +++ b/internal/executor/http.go @@ -24,11 +24,11 @@ type HTTPExecutor struct { } type HTTPConfig struct { - Timeout int `jsondb:"timeout"` - Headers map[string]string `jsondb:"headers"` - QueryParams map[string]string `jsondb:"query"` - Body string `jsondb:"body"` - Silent bool `jsondb:"silent"` + Timeout int `json:"timeout"` + Headers map[string]string `json:"headers"` + QueryParams map[string]string `json:"query"` + Body string `json:"body"` + Silent bool `json:"silent"` } func (e *HTTPExecutor) SetStdout(out io.Writer) { diff --git a/internal/models/node.go b/internal/models/node.go index 4ec316a6..7c109e62 100644 --- a/internal/models/node.go +++ b/internal/models/node.go @@ -9,15 +9,15 @@ import ( ) type Node struct { - *dag.Step `jsondb:"Step"` - Log string `jsondb:"Log"` - StartedAt string `jsondb:"StartedAt"` - FinishedAt string `jsondb:"FinishedAt"` - Status scheduler.NodeStatus `jsondb:"Status"` - RetryCount int `jsondb:"RetryCount"` - DoneCount int `jsondb:"DoneCount"` - Error string `jsondb:"Error"` - StatusText string `jsondb:"StatusText"` + *dag.Step `json:"Step"` + Log string `json:"Log"` + StartedAt string `json:"StartedAt"` + FinishedAt string `json:"FinishedAt"` + Status scheduler.NodeStatus `json:"Status"` + RetryCount int `json:"RetryCount"` + DoneCount int `json:"DoneCount"` + Error string `json:"Error"` + StatusText string `json:"StatusText"` } func (n *Node) ToNode() *scheduler.Node { diff --git a/internal/models/status.go b/internal/models/status.go index 1f9cbdf8..476051d9 100644 --- a/internal/models/status.go +++ b/internal/models/status.go @@ -12,7 +12,7 @@ import ( ) type StatusResponse struct { - Status *Status `jsondb:"status"` + Status *Status `json:"status"` } type Pid int @@ -31,20 +31,20 @@ func (p Pid) IsRunning() bool { } type Status struct { - RequestId string `jsondb:"RequestId"` - Name string `jsondb:"Name"` - Status scheduler.SchedulerStatus `jsondb:"Status"` - StatusText string `jsondb:"StatusText"` - Pid Pid `jsondb:"Pid"` - Nodes []*Node `jsondb:"Nodes"` - OnExit *Node `jsondb:"OnExit"` - OnSuccess *Node `jsondb:"OnSuccess"` - OnFailure *Node `jsondb:"OnFailure"` - OnCancel *Node `jsondb:"OnCancel"` - StartedAt string `jsondb:"StartedAt"` - FinishedAt string `jsondb:"FinishedAt"` - Log string `jsondb:"Log"` - Params string `jsondb:"Params"` + RequestId string `json:"RequestId"` + Name string `json:"Name"` + Status scheduler.SchedulerStatus `json:"Status"` + StatusText string `json:"StatusText"` + Pid Pid `json:"Pid"` + Nodes []*Node `json:"Nodes"` + OnExit *Node `json:"OnExit"` + OnSuccess *Node `json:"OnSuccess"` + OnFailure *Node `json:"OnFailure"` + OnCancel *Node `json:"OnCancel"` + StartedAt string `json:"StartedAt"` + FinishedAt string `json:"FinishedAt"` + Log string `json:"Log"` + Params string `json:"Params"` } type StatusFile struct { diff --git a/internal/web/handlers/dag.go b/internal/web/handlers/dag.go index 13e0b676..8b24c821 100644 --- a/internal/web/handlers/dag.go +++ b/internal/web/handlers/dag.go @@ -137,7 +137,7 @@ func handleGetDAG() http.HandlerFunc { } func isJsonRequest(r *http.Request) bool { - return r.Header.Get("Accept") == "application/jsondb" + return r.Header.Get("Accept") == "application/json" } func handlePostDAG() http.HandlerFunc { diff --git a/internal/web/handlers/list.go b/internal/web/handlers/list.go index 0a70e65f..dcd40212 100644 --- a/internal/web/handlers/list.go +++ b/internal/web/handlers/list.go @@ -47,7 +47,7 @@ func handleGetList() http.HandlerFunc { Errors: errs, HasError: hasErr, } - if r.Header.Get("Accept") == "application/jsondb" { + if r.Header.Get("Accept") == "application/json" { renderJson(w, data) } else { renderFunc(w, data) diff --git a/internal/web/handlers/routes.go b/internal/web/handlers/routes.go index 26792dc3..b66a6cad 100644 --- a/internal/web/handlers/routes.go +++ b/internal/web/handlers/routes.go @@ -44,7 +44,7 @@ func handleGetAssets() http.HandlerFunc { } func renderJson(w http.ResponseWriter, data interface{}) { - w.Header().Set("Content-Type", "application/jsondb; charset=utf-8") + w.Header().Set("Content-Type", "application/json; charset=utf-8") w.WriteHeader(http.StatusOK) err := json.NewEncoder(w).Encode(data) if err != nil {