Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added queue functionality #690

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
{
"editor.formatOnSave": true,
"[go]": {
"editor.insertSpaces": true,
"editor.formatOnSave": true,
"editor.defaultFormatter": "golang.go"
},
"editor.rulers": [
80
],
"editor.renderWhitespace": "all",
"go.lintTool": "golangci-lint",
"go.lintFlags": [
"--fast"
],
"go.formatTool": "gofmt",
"editor.defaultFormatter": "esbenp.prettier-vscode",
"files.associations": {
"*.gohtml": "html"
},
}
"editor.formatOnSave": true,
"[go]": {
"editor.insertSpaces": true,
"editor.formatOnSave": true,
"editor.defaultFormatter": "golang.go"
},
"editor.rulers": [80],
"editor.renderWhitespace": "all",
"go.lintTool": "golangci-lint",
"go.lintFlags": ["--fast"],
"go.formatTool": "gofmt",
"editor.defaultFormatter": "esbenp.prettier-vscode",
"files.associations": {
"*.gohtml": "html"
},
"cSpell.words": [
"dagu",
"dsclient",
"filecache",
"gosec",
"nolint",
"stretchr"
]
}
36 changes: 36 additions & 0 deletions api.v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ paths:
in: query
required: false
type: string
- name: searchStatus
in: query
required: false
type: string
responses:
"200":
description: A successful response.
Expand Down Expand Up @@ -135,6 +139,7 @@ paths:
type: string
enum:
- start
- dequeue
- suspend
- stop
- retry
Expand Down Expand Up @@ -226,6 +231,23 @@ paths:
$ref: "#/definitions/ApiError"
tags:
- dags
/status:
get:
description: Returns a list of current status.
produces:
- application/json
operationId: listStatus
responses:
"200":
description: A successful response.
schema:
$ref: "#/definitions/listStatusResponse"
default:
description: Generic error response.
schema:
$ref: "#/definitions/ApiError"
tags:
- dags

definitions:
ApiError:
Expand Down Expand Up @@ -785,3 +807,17 @@ definitions:
required:
- Tags
- Errors
listStatusResponse:
type: object
properties:
Status:
type: array
items:
type: string
Errors:
type: array
items:
type: string
required:
- Status
- Errors
52 changes: 52 additions & 0 deletions cmd/dequeue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (C) 2024 The Dagu Authors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package cmd

import (
"log"

"github.com/dagu-org/dagu/internal/config"
"github.com/spf13/cobra"
)

func dequeueCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "dequeue /path/to/spec.yaml",
Short: "dequeues the DAG",
Long: `dagu dequeue /path/to/spec.yaml`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {

Check failure on line 31 in cmd/dequeue.go

View workflow job for this annotation

GitHub Actions / Go Linter

unused-parameter: parameter 'cmd' seems to be unused, consider removing or renaming it as _ (revive)
cfg, err := config.Load()
if err != nil {
log.Fatalf("Configuration load failed: %v", err)
}
dataStore := newDataStores(cfg)
historyStore := dataStore.HistoryStore()
queueStore := newQueueStore(cfg)

found, err := queueStore.FindJobId(args[0])

Check failure on line 40 in cmd/dequeue.go

View workflow job for this annotation

GitHub Actions / Go Linter

ineffectual assignment to err (ineffassign)
if err := historyStore.RemoveEmptyQueue(args[0]); err != nil {
log.Fatal("Queue History data clean up failed", "error", err)
}
if found {
log.Print("job id dequeued ", args[0])
} else {
log.Print(args[0], " is not present in the queue.")
}
},
}
return cmd
}
5 changes: 5 additions & 0 deletions cmd/dry.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,20 @@ func dryCmd() *cobra.Command {

dataStore := newDataStores(cfg)
cli := newClient(cfg, dataStore, agentLogger)
queueStore := newQueueStore(cfg)
statsStore := newStatsStore(cfg)

agt := agent.New(
requestID,
workflow,
agentLogger,
filepath.Dir(logFile.Name()),
logFile.Name(),
cfg.DAGQueueLength,
cli,
dataStore,
queueStore,
statsStore,
&agent.Options{Dry: true})

ctx := cmd.Context()
Expand Down
11 changes: 11 additions & 0 deletions cmd/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,27 @@ import (
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/persistence"
dsclient "github.com/dagu-org/dagu/internal/persistence/client"
"github.com/dagu-org/dagu/internal/persistence/queue"
"github.com/dagu-org/dagu/internal/persistence/stats"
)

func newClient(cfg *config.Config, ds persistence.DataStores, lg logger.Logger) client.Client {
return client.New(ds, cfg.Executable, cfg.WorkDir, lg)
}

func newQueueStore(cfg *config.Config) persistence.QueueStore {
return queue.NewQueueStore(cfg.QueueDir)
}
func newStatsStore(cfg *config.Config) persistence.StatsStore {
return stats.NewStatsStore(cfg.StatsDir)
}

func newDataStores(cfg *config.Config) persistence.DataStores {
return dsclient.NewDataStores(
cfg.DAGs,
cfg.DataDir,
cfg.QueueDir,
cfg.StatsDir,
cfg.SuspendFlagsDir,
dsclient.DataStoreOptions{
LatestStatusToday: cfg.LatestStatusToday,
Expand Down
6 changes: 6 additions & 0 deletions cmd/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func restartCmd() *cobra.Command {

dataStore := newDataStores(cfg)
cli := newClient(cfg, dataStore, initLogger)
queueStore := newQueueStore(cfg)
statsStore := newStatsStore(cfg)

if err := stopDAGIfRunning(cli, workflow, initLogger); err != nil {
initLogger.Fatal("Workflow stop operation failed",
Expand Down Expand Up @@ -126,8 +128,12 @@ func restartCmd() *cobra.Command {
agentLogger,
filepath.Dir(logFile.Name()),
logFile.Name(),
cfg.DAGQueueLength,

newClient(cfg, dataStore, agentLogger),
dataStore,
queueStore,
statsStore,
&agent.Options{Dry: false})

listenSignals(cmd.Context(), agt)
Expand Down
5 changes: 5 additions & 0 deletions cmd/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func retryCmd() *cobra.Command {
})

cli := newClient(cfg, dataStore, agentLogger)
queueStore := newQueueStore(cfg)
statsStore := newStatsStore(cfg)

agentLogger.Info("Workflow retry initiated",
"workflow", workflow.Name,
Expand All @@ -118,8 +120,11 @@ func retryCmd() *cobra.Command {
agentLogger,
filepath.Dir(logFile.Name()),
logFile.Name(),
cfg.DAGQueueLength,
cli,
dataStore,
queueStore,
statsStore,
&agent.Options{RetryTarget: status.Status},
)

Expand Down
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func registerCommands() {
rootCmd.AddCommand(schedulerCmd())
rootCmd.AddCommand(retryCmd())
rootCmd.AddCommand(startAllCmd())
rootCmd.AddCommand(dequeueCmd())
}

func init() {
Expand Down
16 changes: 14 additions & 2 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func startCmd() *cobra.Command {
if err != nil {
log.Fatalf("Flag retrieval failed (quiet): %v", err)
}
waiting, err := cmd.Flags().GetBool("waiting")
if err != nil {
log.Fatalf("Flag retrieval failed (waiting): %v", err)
}

initLogger := logger.NewLogger(logger.NewLoggerArgs{
Debug: cfg.Debug,
Expand Down Expand Up @@ -90,6 +94,10 @@ func startCmd() *cobra.Command {
})

dataStore := newDataStores(cfg)

queueStore := newQueueStore(cfg)
statsStore := newStatsStore(cfg)

cli := newClient(cfg, dataStore, agentLogger)

agentLogger.Info("Workflow execution initiated",
Expand All @@ -103,10 +111,12 @@ func startCmd() *cobra.Command {
agentLogger,
filepath.Dir(logFile.Name()),
logFile.Name(),
cfg.DAGQueueLength,
cli,
dataStore,
&agent.Options{})

queueStore,
statsStore,
&agent.Options{FromWaitingQueue: waiting})
ctx := cmd.Context()

listenSignals(ctx, agt)
Expand All @@ -122,6 +132,8 @@ func startCmd() *cobra.Command {

cmd.Flags().StringP("params", "p", "", "parameters")
cmd.Flags().BoolP("quiet", "q", false, "suppress output")
cmd.Flags().BoolP("waiting", "w", false, "from waiting queue")

return cmd
}

Expand Down
Loading
Loading