From bdd7988eb606cb1ee5138102912ae2a380000bbf Mon Sep 17 00:00:00 2001 From: Fabian Holler Date: Thu, 20 Jun 2024 09:51:05 +0200 Subject: [PATCH] add cleanup db command Add a "cleanup db" command. The commands removes all tasks runs that have been started before a given timestamp. When run it prints what it will do and gives the user 5sec to abort. This can be skipped by passing the --force parameter. It prints the number of deleted records. --pretend can be passed to not do anything and only print the number of records that would be deleted. --- internal/command/cleanup.go | 12 ++ internal/command/cleanup_db.go | 131 ++++++++++++++++++ internal/command/helpers.go | 2 +- pkg/storage/postgres/delete.go | 204 ++++++++++++++++++++++++++++ pkg/storage/postgres/delete_test.go | 135 ++++++++++++++++++ pkg/storage/storage.go | 11 ++ 6 files changed, 494 insertions(+), 1 deletion(-) create mode 100644 internal/command/cleanup.go create mode 100644 internal/command/cleanup_db.go create mode 100644 pkg/storage/postgres/delete.go create mode 100644 pkg/storage/postgres/delete_test.go diff --git a/internal/command/cleanup.go b/internal/command/cleanup.go new file mode 100644 index 00000000..db61167e --- /dev/null +++ b/internal/command/cleanup.go @@ -0,0 +1,12 @@ +package command + +import "github.com/spf13/cobra" + +var cleanupCmd = &cobra.Command{ + Use: "cleanup", + Short: "delete old database records", +} + +func init() { + rootCmd.AddCommand(cleanupCmd) +} diff --git a/internal/command/cleanup_db.go b/internal/command/cleanup_db.go new file mode 100644 index 00000000..b6f16207 --- /dev/null +++ b/internal/command/cleanup_db.go @@ -0,0 +1,131 @@ +package command + +import ( + "fmt" + "strings" + "time" + + "github.com/simplesurance/baur/v4/internal/command/flag" + "github.com/simplesurance/baur/v4/internal/command/term" + + "github.com/spf13/cobra" +) + +type cleanupDbCmd struct { + cobra.Command + taskRunsBefore flag.DateTimeFlagValue + force bool + pretend bool +} + +const timeFormat = "02 Jan 06 15:04:05 MST" +const cleanupDbGracetime = time.Second * 5 + +var cleanupDbLongHelp = fmt.Sprintf(` +Delete old data from the baur database. +The command deletes information about tasks runs that started to run before +a given date from the database. It also removes records that became +dangling because all task runs referencing them were deleted. +Task runs that are referenced by a release are not deleted. + +The command can be run without access to the baur repository by specifying the +PostgreSQL URI via the environment variable %s. +`, + term.Highlight(envVarPSQLURL), +) + +const cleanupDbCmdExample = ` +baur cleanup db --pretend --task-runs-before=2023.06.01-13:30 +` + +func init() { + cleanupCmd.AddCommand(&newCleanupDbCmd().Command) +} + +func newCleanupDbCmd() *cleanupDbCmd { + cmd := cleanupDbCmd{ + Command: cobra.Command{ + Args: cobra.NoArgs, + Use: "db --task-runs-before=DATETIME", + Long: strings.TrimSpace(cleanupDbLongHelp), + Example: strings.TrimSpace(cleanupDbCmdExample), + }, + } + + cmd.Flags().Var(&cmd.taskRunsBefore, "task-runs-before", + fmt.Sprintf( + "delete tasks that ran before DATETIME\nFormat: %s", + term.Highlight(flag.DateTimeFormatDescr), + ), + ) + + cmd.Flags().BoolVarP(&cmd.pretend, "pretend", "p", false, + "do not delete anything, only pretend how many records would be deleted", + ) + + cmd.Flags().BoolVarP(&cmd.force, "force", "f", false, + fmt.Sprintf( + "do not wait %s seconds before starting deletion, delete immediately", + cleanupDbGracetime, + ), + ) + + if err := cmd.MarkFlagRequired("task-runs-before"); err != nil { + panic(err) + } + + cmd.Run = cmd.run + + return &cmd +} + +func (c *cleanupDbCmd) run(cmd *cobra.Command, _ []string) { + var op string + if c.pretend { + op = term.Highlight("pretending to delete") + } else { + op = term.Highlight("deleting") + } + stdout.Printf( + "%s tasks runs older then %s and dangling records,\n"+ + "tasks runs referenced by releases are kept\n", + op, + term.Highlight(c.taskRunsBefore.Format(timeFormat)), + ) + + if !c.force { + stdout.Printf("starting in %s seconds, press %s to abort\n", + term.Highlight(cleanupDbGracetime), term.Highlight("CTRL+C")) + time.Sleep(cleanupDbGracetime) + stdout.Println("starting deleting...") + } + + psqlURL, err := postgresqlURL() + exitOnErr(err) + + storageClt := mustNewCompatibleStorage(psqlURL) + startTime := time.Now() + result, err := storageClt.TaskRunsDelete(cmd.Context(), c.taskRunsBefore.Time, c.pretend) + exitOnErr(err) + + stdout.Printf( + "\n"+ + "deletion %s in %s, deleted records:\n"+ + "%-16s %s\n"+ + "%-16s %s\n"+ + "%-16s %s\n"+ + "%-16s %s\n"+ + "%-16s %s\n"+ + "%-16s %s\n"+ + "%-16s %s\n", + term.GreenHighlight("successful"), + term.FormatDuration(time.Since(startTime)), + "Task Runs:", term.Highlight(result.DeletedTaskRuns), + "Tasks:", term.Highlight(result.DeletedTasks), + "Apps:", term.Highlight(result.DeletedApps), + "Inputs:", term.Highlight(result.DeletedInputs), + "Outputs:", term.Highlight(result.DeletedOutputs), + "Uploads:", term.Highlight(result.DeletedUploads), + "VCSs:", term.Highlight(result.DeletedVCS), + ) +} diff --git a/internal/command/helpers.go b/internal/command/helpers.go index 826afaef..e11477f2 100644 --- a/internal/command/helpers.go +++ b/internal/command/helpers.go @@ -160,7 +160,7 @@ func getPSQLURIEnv() string { } // postgresqlURL returns the value of the environment variable [envVarPSQLURL], -// if is set. +// if set. // Otherwise it searches for a baur repository and returns the postgresql url // from the repository config. // If the repository object is needed, use [mustNewCompatibleStorageRepo] diff --git a/pkg/storage/postgres/delete.go b/pkg/storage/postgres/delete.go new file mode 100644 index 00000000..913a4867 --- /dev/null +++ b/pkg/storage/postgres/delete.go @@ -0,0 +1,204 @@ +package postgres + +import ( + "context" + "errors" + "time" + + "github.com/jackc/pgx/v4" + + "github.com/simplesurance/baur/v4/pkg/storage" +) + +func (c *Client) TaskRunsDelete(ctx context.Context, before time.Time, pretend bool) (*storage.TaskRunsDeleteResult, error) { + var result storage.TaskRunsDeleteResult + + err := c.db.BeginFunc(ctx, func(tx pgx.Tx) error { + var err error + if pretend { + defer tx.Rollback(ctx) //nolint: errcheck + } + + result.DeletedTaskRuns, err = c.deleteUnusedTaskRuns(ctx, tx, before) + if err != nil { + return err + } + + result.DeletedTasks, err = c.deleteUnusedTasks(ctx, tx) + if err != nil { + return err + } + + result.DeletedApps, err = c.deleteUnusedApps(ctx, tx) + if err != nil { + return err + } + + result.DeletedOutputs, err = c.deleteUnusedOutputs(ctx, tx) + if err != nil { + return err + } + + result.DeletedUploads, err = c.deleteUnusedUploads(ctx, tx) + if err != nil { + return err + } + + result.DeletedInputs, err = c.deleteUnusedInputs(ctx, tx) + if err != nil { + return err + } + + result.DeletedVCS, err = c.deleteUnusedVCS(ctx, tx) + if err != nil { + return err + } + + return nil + }) + + if err != nil && !(pretend && errors.Is(err, pgx.ErrTxClosed)) { + return nil, err + } + + return &result, nil +} + +func (*Client) deleteUnusedTaskRuns(ctx context.Context, tx pgx.Tx, before time.Time) (int64, error) { + const query = ` + DELETE FROM task_run + WHERE start_timestamp < $1 + AND task_run.id NOT IN ( + SELECT task_run_id FROM release_task_run + ) + ` + + t, err := tx.Exec(ctx, query, before) + if err != nil { + return 0, newQueryError(query, err, before) + } + + return t.RowsAffected(), nil +} + +func (*Client) deleteUnusedTasks(ctx context.Context, tx pgx.Tx) (int64, error) { + const query = ` + DELETE FROM task + WHERE id NOT IN ( + SELECT task_run.task_id FROM task_run + ) + ` + t, err := tx.Exec(ctx, query) + if err != nil { + return 0, newQueryError(query, err) + } + + return t.RowsAffected(), nil +} + +func (*Client) deleteUnusedApps(ctx context.Context, tx pgx.Tx) (int64, error) { + const query = ` + DELETE FROM application + WHERE id NOT IN ( + SELECT task.application_id FROM task + ) + ` + t, err := tx.Exec(ctx, query) + if err != nil { + return 0, newQueryError(query, err) + } + + return t.RowsAffected(), nil +} + +func (*Client) deleteUnusedOutputs(ctx context.Context, tx pgx.Tx) (int64, error) { + const query = ` + DELETE FROM output + WHERE id NOT IN ( + SELECT task_run_output.output_id + FROM task_run_output + ) + ` + t, err := tx.Exec(ctx, query) + if err != nil { + return 0, newQueryError(query, err) + } + + return t.RowsAffected(), nil +} + +func (*Client) deleteUnusedUploads(ctx context.Context, tx pgx.Tx) (int64, error) { + const query = ` + DELETE FROM upload + WHERE id NOT IN ( + SELECT task_run_output.upload_id + FROM task_run_output + ) + ` + t, err := tx.Exec(ctx, query) + if err != nil { + return 0, newQueryError(query, err) + } + + return t.RowsAffected(), nil +} + +func (*Client) deleteUnusedVCS(ctx context.Context, tx pgx.Tx) (int64, error) { + const query = ` + DELETE FROM vcs + WHERE id NOT IN ( + SELECT task_run.vcs_id + FROM task_run + ) + ` + t, err := tx.Exec(ctx, query) + if err != nil { + return 0, newQueryError(query, err) + } + + return t.RowsAffected(), nil +} + +func (*Client) deleteUnusedInputs(ctx context.Context, tx pgx.Tx) (int64, error) { + var cnt int64 + const qInputFiles = ` + DELETE FROM input_file + WHERE id NOT IN ( + SELECT task_run_file_input.input_file_id + FROM task_run_file_input + ) + ` + t, err := tx.Exec(ctx, qInputFiles) + if err != nil { + return 0, newQueryError(qInputFiles, err) + } + cnt = t.RowsAffected() + + const qInputStrings = ` + DELETE FROM input_string + WHERE id NOT IN ( + SELECT task_run_string_input.input_string_id + FROM task_run_string_input + ) + ` + t, err = tx.Exec(ctx, qInputStrings) + if err != nil { + return 0, newQueryError(qInputFiles, err) + } + cnt += t.RowsAffected() + + const qInputTasks = ` + DELETE FROM input_task + WHERE id NOT IN ( + SELECT task_run_task_input.input_task_id + FROM task_run_task_input + ) + ` + t, err = tx.Exec(ctx, qInputTasks) + if err != nil { + return 0, newQueryError(qInputFiles, err) + } + cnt += t.RowsAffected() + + return cnt, nil +} diff --git a/pkg/storage/postgres/delete_test.go b/pkg/storage/postgres/delete_test.go new file mode 100644 index 00000000..fc1c1c24 --- /dev/null +++ b/pkg/storage/postgres/delete_test.go @@ -0,0 +1,135 @@ +//go:build dbtest + +package postgres + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/jackc/pgx/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/simplesurance/baur/v4/pkg/storage" +) + +func TestDelete(t *testing.T) { + startTime := time.Now().Add(-1 * time.Minute) + + tr := storage.TaskRunFull{ + TaskRun: storage.TaskRun{ + ApplicationName: "baurHimself", + TaskName: "build", + VCSRevision: "1", + VCSIsDirty: false, + StartTimestamp: startTime, + StopTimestamp: time.Now(), + Result: storage.ResultSuccess, + TotalInputDigest: "1234567890", + }, + Inputs: storage.Inputs{ + Files: []*storage.InputFile{ + { + Path: "main.go", + Digest: "45", + }, + { + Path: "abc.go", + Digest: "01", + }, + }, + }, + Outputs: []*storage.Output{ + { + Name: "binary", + Type: storage.ArtifactTypeFile, + Digest: "456", + SizeBytes: 300, + Uploads: []*storage.Upload{ + { + URI: "abc", + UploadStartTimestamp: time.Now(), + UploadStopTimestamp: time.Now().Add(5 * time.Second), + Method: storage.UploadMethodS3, + }, + { + URI: "efg", + UploadStartTimestamp: time.Now(), + UploadStopTimestamp: time.Now().Add(5 * time.Second), + Method: storage.UploadMethodS3, + }, + }, + }, + }, + } + + clt, cleanupFn := newTestClient(t) + defer cleanupFn() + + require.NoError(t, clt.Init(ctx)) + + _, err := clt.SaveTaskRun(ctx, &tr) + require.NoError(t, err) + + checkResultFn := func(result *storage.TaskRunsDeleteResult) { + assert.Equal(t, int64(1), result.DeletedVCS) + assert.Equal(t, int64(1), result.DeletedApps) + assert.Equal(t, int64(1), result.DeletedTasks) + assert.Equal(t, int64(2), result.DeletedInputs) + assert.Equal(t, int64(1), result.DeletedOutputs) + assert.Equal(t, int64(2), result.DeletedUploads) + } + + result, err := clt.TaskRunsDelete(ctx, startTime.Add(time.Minute), true) + require.NoError(t, err) + checkResultFn(result) + + result, err = clt.TaskRunsDelete(ctx, startTime.Add(time.Minute), false) + require.NoError(t, err) + checkResultFn(result) + + require.NotNil(t, result) + + tableNames := allTableNames(t, clt) + for _, tableName := range tableNames { + if tableName == "migrations" { + continue + } + assert.Truef( + t, + tableIsEmpty(t, clt, tableName), + "table %s is not empty", tableName, + ) + } +} + +func tableIsEmpty(t *testing.T, clt *Client, tableName string) bool { + var result bool + q := fmt.Sprintf(`SELECT EXISTS (SELECT * FROM %s LIMIT 1)`, pgx.Identifier{tableName}.Sanitize()) + + err := clt.db.QueryRow(context.Background(), q).Scan(&result) + require.NoErrorf(t, err, "checking if table is empty failed, query: %q", q) + return !result +} + +func allTableNames(t *testing.T, clt *Client) []string { + var result []string + + const q = `SELECT tablename + FROM pg_catalog.pg_tables + WHERE schemaname = 'public' + ` + + rows, err := clt.db.Query(context.Background(), q) + require.NoError(t, err, "querying table names failed") + for rows.Next() { + var tableName string + require.NoError(t, rows.Scan(&tableName), "scanning table name failed") + result = append(result, tableName) + } + + require.NoError(t, rows.Err(), "iterating over table name rows failed") + return result +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 66686e01..ec6674a1 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -115,6 +115,16 @@ const ( NoLimit uint = 0 ) +type TaskRunsDeleteResult struct { + DeletedTaskRuns int64 + DeletedTasks int64 + DeletedApps int64 + DeletedOutputs int64 + DeletedUploads int64 + DeletedInputs int64 + DeletedVCS int64 +} + // Storer is an interface for storing and retrieving baur task runs type Storer interface { Close() error @@ -150,6 +160,7 @@ type Storer interface { limit uint, callback func(*TaskRunWithID) error, ) error + TaskRunsDelete(ctx context.Context, before time.Time, pretend bool) (*TaskRunsDeleteResult, error) // Inputs returns the inputs of a task run. If no records were found, // the method returns ErrNotExist.