Skip to content

Commit

Permalink
psql/cleanup: Only use transaction in pretend mode
Browse files Browse the repository at this point in the history
The transaction is only needed for the pretend mode.
When pretend is disabled run the queries without a transaction. This
will be faster.

When the deletion fails the partial result is printed.
  • Loading branch information
fho committed Sep 26, 2024
1 parent 053fd3d commit dbff773
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 60 deletions.
26 changes: 19 additions & 7 deletions internal/command/cleanup_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ type cleanupDbCmd struct {
pretend bool
}

const timeFormat = "02 Jan 06 15:04:05 MST"
const cleanupDbGracetime = time.Second * 5
const (
timeFormat = "02 Jan 06 15:04:05 MST"
cleanupDbGracetime = time.Second * 5
)

const deleteTargetTaskRuns = "taskruns"
const deleteTargetReleases = "releases"
const (
deleteTargetTaskRuns = "taskruns"
deleteTargetReleases = "releases"
)

var cleanupDbLongHelp = fmt.Sprintf(`
Delete old data from the baur database.
Expand Down Expand Up @@ -162,12 +166,20 @@ func (c *cleanupDbCmd) deleteReleases(ctx context.Context, storageClt storage.St
}

func (c *cleanupDbCmd) deleteTaskRuns(ctx context.Context, storageClt storage.Storer) error {
var stateStr string

startTime := time.Now()
result, err := storageClt.TaskRunsDelete(ctx, c.before.Time, c.pretend)
if err != nil {
if err != nil && result == nil {
return err
}

if err == nil {
stateStr = term.GreenHighlight("successful")
} else {
stateStr = term.RedHighlight("failed")
}

stdout.Printf(
"\n"+
"deletion %s in %s, deleted records:\n"+
Expand All @@ -178,7 +190,7 @@ func (c *cleanupDbCmd) deleteTaskRuns(ctx context.Context, storageClt storage.St
"%-16s %s\n"+
"%-16s %s\n"+
"%-16s %s\n",
term.GreenHighlight("successful"),
stateStr,
term.FormatDuration(time.Since(startTime)),
"Task Runs:", term.Highlight(result.DeletedTaskRuns),
"Tasks:", term.Highlight(result.DeletedTasks),
Expand All @@ -189,5 +201,5 @@ func (c *cleanupDbCmd) deleteTaskRuns(ctx context.Context, storageClt storage.St
"VCSs:", term.Highlight(result.DeletedVCS),
)

return nil
return err
}
109 changes: 56 additions & 53 deletions pkg/storage/postgres/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,60 +49,63 @@ func (*Client) deleteOldReleases(ctx context.Context, tx pgx.Tx, before time.Tim
}

func (c *Client) TaskRunsDelete(ctx context.Context, before time.Time, pretend bool) (*storage.TaskRunsDeleteResult, error) {
var result storage.TaskRunsDeleteResult
var result *storage.TaskRunsDeleteResult

err := c.db.BeginFunc(ctx, func(tx pgx.Tx) error {
var err error
if pretend {
if pretend {
err := c.db.BeginFunc(ctx, func(tx pgx.Tx) (err error) {
result, err = c.taskRunsDelete(ctx, before, tx)
defer tx.Rollback(ctx) //nolint: errcheck
}

result.DeletedTaskRuns, err = c.deleteOldTaskRuns(ctx, tx, before)
if err != nil {
return err
})
if err != nil && !errors.Is(err, pgx.ErrTxClosed) {
return nil, err
}

result.DeletedTasks, err = c.deleteUnusedTasks(ctx, tx)
if err != nil {
return err
}
return result, nil
}

result.DeletedApps, err = c.deleteUnusedApps(ctx, tx)
if err != nil {
return err
}
return c.taskRunsDelete(ctx, before, c.db)
}

result.DeletedOutputs, err = c.deleteUnusedOutputs(ctx, tx)
if err != nil {
return err
}
func (c *Client) taskRunsDelete(ctx context.Context, before time.Time, con dbConn) (*storage.TaskRunsDeleteResult, error) {
var err error
var result storage.TaskRunsDeleteResult

result.DeletedUploads, err = c.deleteUnusedUploads(ctx, tx)
if err != nil {
return err
}
result.DeletedTaskRuns, err = c.deleteOldTaskRuns(ctx, con, before)
if err != nil {
return nil, err
}

result.DeletedInputs, err = c.deleteUnusedInputs(ctx, tx)
if err != nil {
return err
}
result.DeletedTasks, err = c.deleteUnusedTasks(ctx, con)
if err != nil {
return &result, err
}

result.DeletedVCS, err = c.deleteUnusedVCS(ctx, tx)
if err != nil {
return err
}
result.DeletedApps, err = c.deleteUnusedApps(ctx, con)
if err != nil {
return &result, err
}

return nil
})
result.DeletedOutputs, err = c.deleteUnusedOutputs(ctx, con)
if err != nil {
return &result, err
}

if err != nil && !(pretend && errors.Is(err, pgx.ErrTxClosed)) {
return nil, err
result.DeletedUploads, err = c.deleteUnusedUploads(ctx, con)
if err != nil {
return &result, err
}

return &result, nil
result.DeletedInputs, err = c.deleteUnusedInputs(ctx, con)
if err != nil {
return &result, err
}

result.DeletedVCS, err = c.deleteUnusedVCS(ctx, con)
return &result, err
}

func (*Client) deleteOldTaskRuns(ctx context.Context, tx pgx.Tx, before time.Time) (int64, error) {
func (*Client) deleteOldTaskRuns(ctx context.Context, con dbConn, before time.Time) (int64, error) {
const query = `
DELETE FROM task_run
WHERE start_timestamp < $1
Expand All @@ -112,94 +115,94 @@ func (*Client) deleteOldTaskRuns(ctx context.Context, tx pgx.Tx, before time.Tim
)
`

t, err := tx.Exec(ctx, query, before)
t, err := con.Exec(ctx, query, before)
if err != nil {
return 0, newQueryError(query, err, before)
}

return t.RowsAffected(), nil
}

func (*Client) deleteUnusedTasks(ctx context.Context, tx pgx.Tx) (int64, error) {
func (*Client) deleteUnusedTasks(ctx context.Context, con dbConn) (int64, error) {
const query = `
DELETE FROM task
WHERE NOT EXISTS (
SELECT 1 FROM task_run
WHERE task.id = task_run.task_id
)
`
t, err := tx.Exec(ctx, query)
t, err := con.Exec(ctx, query)
if err != nil {
return 0, newQueryError(query, err)
}

return t.RowsAffected(), nil
}

func (*Client) deleteUnusedApps(ctx context.Context, tx pgx.Tx) (int64, error) {
func (*Client) deleteUnusedApps(ctx context.Context, con dbConn) (int64, error) {
const query = `
DELETE FROM application
WHERE id NOT IN (
SELECT task.application_id FROM task
)
`
t, err := tx.Exec(ctx, query)
t, err := con.Exec(ctx, query)
if err != nil {
return 0, newQueryError(query, err)
}

return t.RowsAffected(), nil
}

func (*Client) deleteUnusedOutputs(ctx context.Context, tx pgx.Tx) (int64, error) {
func (*Client) deleteUnusedOutputs(ctx context.Context, con dbConn) (int64, error) {
const query = `
DELETE FROM output
WHERE id NOT IN (
SELECT task_run_output.output_id
FROM task_run_output
)
`
t, err := tx.Exec(ctx, query)
t, err := con.Exec(ctx, query)
if err != nil {
return 0, newQueryError(query, err)
}

return t.RowsAffected(), nil
}

func (*Client) deleteUnusedUploads(ctx context.Context, tx pgx.Tx) (int64, error) {
func (*Client) deleteUnusedUploads(ctx context.Context, con dbConn) (int64, error) {
const query = `
DELETE FROM upload
WHERE id NOT IN (
SELECT task_run_output.upload_id
FROM task_run_output
)
`
t, err := tx.Exec(ctx, query)
t, err := con.Exec(ctx, query)
if err != nil {
return 0, newQueryError(query, err)
}

return t.RowsAffected(), nil
}

func (*Client) deleteUnusedVCS(ctx context.Context, tx pgx.Tx) (int64, error) {
func (*Client) deleteUnusedVCS(ctx context.Context, con dbConn) (int64, error) {
const query = `
DELETE FROM vcs
WHERE NOT EXISTS (
SELECT 1 FROM task_run
WHERE vcs.id = task_run.vcs_id
)
`
t, err := tx.Exec(ctx, query)
t, err := con.Exec(ctx, query)
if err != nil {
return 0, newQueryError(query, err)
}

return t.RowsAffected(), nil
}

func (*Client) deleteUnusedInputs(ctx context.Context, tx pgx.Tx) (int64, error) {
func (*Client) deleteUnusedInputs(ctx context.Context, con dbConn) (int64, error) {
var cnt int64
const qInputFiles = `
DELETE FROM input_file
Expand All @@ -208,7 +211,7 @@ func (*Client) deleteUnusedInputs(ctx context.Context, tx pgx.Tx) (int64, error)
WHERE input_file.id = task_run_file_input.input_file_id
)
`
t, err := tx.Exec(ctx, qInputFiles)
t, err := con.Exec(ctx, qInputFiles)
if err != nil {
return 0, newQueryError(qInputFiles, err)
}
Expand All @@ -221,7 +224,7 @@ func (*Client) deleteUnusedInputs(ctx context.Context, tx pgx.Tx) (int64, error)
FROM task_run_string_input
)
`
t, err = tx.Exec(ctx, qInputStrings)
t, err = con.Exec(ctx, qInputStrings)
if err != nil {
return 0, newQueryError(qInputFiles, err)
}
Expand All @@ -234,7 +237,7 @@ func (*Client) deleteUnusedInputs(ctx context.Context, tx pgx.Tx) (int64, error)
FROM task_run_task_input
)
`
t, err = tx.Exec(ctx, qInputTasks)
t, err = con.Exec(ctx, qInputTasks)
if err != nil {
return 0, newQueryError(qInputFiles, err)
}
Expand Down

0 comments on commit dbff773

Please sign in to comment.