From e7b2cfe3516ca475cc1b89c729824fafcd8729cc Mon Sep 17 00:00:00 2001 From: Kevin Cao Date: Mon, 24 Feb 2025 13:18:00 -0500 Subject: [PATCH] backup: add resume and onfail/cancel logic to compactions Compactions were jobified in #141183, but missing support for job resumption and cancellation handling. This commit adds support for handling those cases. An additional PR should be made to add telemetry for compactions. Epic: none Release note: None --- pkg/backup/backup_compaction.go | 580 +++++++++++++------- pkg/backup/backup_compaction_test.go | 52 +- pkg/backup/backup_job.go | 18 +- pkg/backup/backupdest/backup_destination.go | 20 + pkg/backup/backupencryption/encryption.go | 8 +- 5 files changed, 446 insertions(+), 232 deletions(-) diff --git a/pkg/backup/backup_compaction.go b/pkg/backup/backup_compaction.go index 5e50faa99045..0177025a658b 100644 --- a/pkg/backup/backup_compaction.go +++ b/pkg/backup/backup_compaction.go @@ -20,7 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/backup/backuputils" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/joberror" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -38,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -92,128 +95,250 @@ func StartCompactionJob( return jobID, nil } -// compactBackups performs a compaction of the backups at the given collection -// URI within the start and end timestamps specified in the job details. -func compactBackups( - ctx context.Context, jobID jobspb.JobID, execCtx sql.JobExecContext, details jobspb.BackupDetails, +func (b *backupResumer) ResumeCompaction( + ctx context.Context, + initialDetails jobspb.BackupDetails, + execCtx sql.JobExecContext, + kmsEnv cloud.KMSEnv, ) error { - dest := details.Destination - resolvedBaseDirs, resolvedIncDirs, _, err := resolveBackupDirs( - ctx, execCtx, dest.To, dest.IncrementalStorage, dest.Subdir, - ) + // We interleave the computation of the compaction chain between the destination + // resolution and writing of backup lock due to the need to verify that the + // compaction chain is a valid chain. + prevManifests, localityInfo, encryption, allIters, err := getBackupChain(ctx, execCtx, initialDetails, kmsEnv) if err != nil { return err } - mkStore := execCtx.ExecCfg().DistSQLSrv.ExternalStorageFromURI - baseStores, baseCleanup, err := backupdest.MakeBackupDestinationStores( - ctx, execCtx.User(), mkStore, resolvedBaseDirs, + compactChain, err := newCompactionChain( + prevManifests, + initialDetails.StartTime, + initialDetails.EndTime, + localityInfo, + allIters, ) if err != nil { return err } - defer func() { - if err := baseCleanup(); err != nil { - log.Warningf(ctx, "failed to cleanup base backup stores: %+v", err) + + var backupManifest *backuppb.BackupManifest + updatedDetails := initialDetails + if initialDetails.URI == "" { + // Resolve the backup destination. If we have already resolved and persisted + // the destination during a previous resumption of this job, we can re-use + // the previous resolution. + backupDest, err := backupdest.ResolveDestForCompaction(ctx, execCtx, initialDetails) + if err != nil { + return err } - }() - incStores, incCleanup, err := backupdest.MakeBackupDestinationStores( - ctx, execCtx.User(), mkStore, resolvedIncDirs, - ) + if err = maybeWriteBackupLock(ctx, execCtx, backupDest, b.job.ID()); err != nil { + return err + } + updatedDetails, err = updateCompactionBackupDetails( + ctx, compactChain, initialDetails, backupDest, encryption, kmsEnv, + ) + if err != nil { + return err + } + backupManifest, err = createCompactionManifest(ctx, execCtx, updatedDetails, compactChain) + if err != nil { + return err + } + + if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.before.write_first_checkpoint"); err != nil { + return err + } + + if err := backupinfo.WriteBackupManifestCheckpoint( + ctx, updatedDetails.URI, updatedDetails.EncryptionOptions, kmsEnv, + backupManifest, execCtx.ExecCfg(), execCtx.User(), + ); err != nil { + return err + } + + if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.after.write_first_checkpoint"); err != nil { + return err + } + + description := maybeUpdateJobDescription( + initialDetails, updatedDetails, b.job.Payload().Description, + ) + + // Update the job payload (non-volatile job definition) once, with the now + // resolved destination, updated description, etc. If we resume again we'll + // skip this whole block so this isn't an excessive update of payload. + if err := b.job.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + if err := md.CheckRunningOrReverting(); err != nil { + return err + } + md.Payload.Details = jobspb.WrapPayloadDetails(updatedDetails) + md.Payload.Description = description + ju.UpdatePayload(md.Payload) + return nil + }); err != nil { + return err + } + + if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.after.details_has_checkpoint"); err != nil { + return err + } + // TODO (kev-cao): Add telemetry for backup compactions. + } + + // For all backups, partitioned or not, the main BACKUP manifest is stored at + // details.URI. + defaultConf, err := cloud.ExternalStorageConfFromURI(updatedDetails.URI, execCtx.User()) if err != nil { - return err + return errors.Wrapf(err, "export configuration") } - defer func() { - if err := incCleanup(); err != nil { - log.Warningf(ctx, "failed to cleanup incremental backup stores: %+v", err) - } - }() - ioConf := baseStores[0].ExternalIOConf() - kmsEnv := backupencryption.MakeBackupKMSEnv( - execCtx.ExecCfg().Settings, - &ioConf, - execCtx.ExecCfg().InternalDB, - execCtx.User(), - ) - encryption, err := backupencryption.GetEncryptionFromBaseStore( - ctx, baseStores[0], *details.EncryptionOptions, &kmsEnv, - ) + defaultStore, err := execCtx.ExecCfg().DistSQLSrv.ExternalStorage(ctx, defaultConf) if err != nil { - return err + return errors.Wrapf(err, "make storage") + } + defer defaultStore.Close() + + // EncryptionInfo is non-nil only when new encryption information has been + // generated during BACKUP planning. + redactedURI := backuputils.RedactURIForErrorMessage(updatedDetails.URI) + if updatedDetails.EncryptionInfo != nil { + if err := backupencryption.WriteEncryptionInfoIfNotExists( + ctx, updatedDetails.EncryptionInfo, defaultStore, + ); err != nil { + return errors.Wrapf(err, "creating encryption info file to %s", redactedURI) + } + } + + storageByLocalityKV := make(map[string]*cloudpb.ExternalStorage) + for kv, uri := range updatedDetails.URIsByLocalityKV { + conf, err := cloud.ExternalStorageConfFromURI(uri, execCtx.User()) + if err != nil { + return err + } + storageByLocalityKV[kv] = &conf } + mem := execCtx.ExecCfg().RootMemoryMonitor.MakeBoundAccount() defer mem.Close(ctx) + var memSize int64 - _, manifests, localityInfo, memReserved, err := backupdest.ResolveBackupManifests( - ctx, &mem, baseStores, incStores, mkStore, resolvedBaseDirs, - resolvedIncDirs, details.EndTime, encryption, &kmsEnv, - execCtx.User(), false, - ) - if err != nil { + if backupManifest == nil { + backupManifest, memSize, err = b.readManifestOnResume(ctx, &mem, execCtx.ExecCfg(), defaultStore, + updatedDetails, execCtx.User(), kmsEnv) + if err != nil { + return err + } + } + + // We retry on pretty generic failures -- any rpc error. If a worker node were + // to restart, it would produce this kind of error, but there may be other + // errors that are also rpc errors. Don't retry too aggressively. + retryOpts := retry.Options{ + MaxBackoff: 1 * time.Second, + MaxRetries: 5, + } + + if execCtx.ExecCfg().BackupRestoreTestingKnobs != nil && + execCtx.ExecCfg().BackupRestoreTestingKnobs.BackupDistSQLRetryPolicy != nil { + retryOpts = *execCtx.ExecCfg().BackupRestoreTestingKnobs.BackupDistSQLRetryPolicy + } + + if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.before.flow"); err != nil { return err } - defer func() { - mem.Shrink(ctx, memReserved) - }() - return compactIncrementals( - ctx, jobID, execCtx, details, manifests, encryption, &kmsEnv, localityInfo, + + // We want to retry a backup if there are transient failures (i.e. worker nodes + // dying), so if we receive a retryable error, re-plan and retry the backup. + // TODO (kev-cao): Add progress tracking to compactions. + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + if err = compactChain.Compact(ctx, execCtx, updatedDetails, backupManifest, defaultStore, kmsEnv); err == nil { + break + } + + if joberror.IsPermanentBulkJobError(err) { + return errors.Wrap(err, "failed to run backup compaction") + } + + // If we are draining, it is unlikely we can start a + // new DistSQL flow. Exit with a retryable error so + // that another node can pick up the job. + if execCtx.ExecCfg().JobRegistry.IsDraining() { + return jobs.MarkAsRetryJobError(errors.Wrapf(err, "job encountered retryable error on draining node")) + } + + log.Warningf(ctx, "encountered retryable error: %+v", err) + + // Reload the backup manifest to pick up any spans we may have completed on + // previous attempts. + // TODO (kev-cao): Compactions currently do not create checkpoints, but this + // can be used to reload the manifest once we add checkpointing. + var reloadBackupErr error + mem.Shrink(ctx, memSize) + backupManifest, memSize, reloadBackupErr = b.readManifestOnResume(ctx, &mem, execCtx.ExecCfg(), + defaultStore, updatedDetails, execCtx.User(), kmsEnv) + if reloadBackupErr != nil { + return errors.Wrap(reloadBackupErr, "could not reload backup manifest when retrying") + } + } + // We have exhausted retries without getting a "PermanentBulkJobError", but + // something must be wrong if we keep seeing errors so give up and fail to + // ensure that any alerting on failures is triggered and that any subsequent + // schedule runs are not blocked. + if err != nil { + return errors.Wrap(err, "exhausted retries") + } + + return b.maybeNotifyScheduledJobCompletion( + ctx, jobs.StateSucceeded, execCtx.ExecCfg().JobsKnobs(), execCtx.ExecCfg().InternalDB, ) } -func compactIncrementals( +type compactionChain struct { + // backupChain is the linear chain of backups up to the end time required + // for a restore. + backupChain []backuppb.BackupManifest + chainToCompact []backuppb.BackupManifest + // start refers to the start time of the first backup to be compacted. + // end refers to the end time of the last backup to be compacted. + start, end hlc.Timestamp + // Inclusive startIdx and exclusive endIdx of the sub-chain to compact. + startIdx, endIdx int + // Locality info per layer in the compacted chain. + compactedLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo + // All iter factories for all backups in the chain. + allIters backupinfo.LayerToBackupManifestFileIterFactory + // Iter factory for just the backups in the chain to compact. + compactedIterFactory backupinfo.LayerToBackupManifestFileIterFactory +} + +// Compact runs compaction on the chain according to the job details and +// associated backup manifest. +func (c *compactionChain) Compact( ctx context.Context, - jobID jobspb.JobID, execCtx sql.JobExecContext, details jobspb.BackupDetails, - backupChain []backuppb.BackupManifest, - encryption *jobspb.BackupEncryptionOptions, + backupManifest *backuppb.BackupManifest, + defaultStore cloud.ExternalStorage, kmsEnv cloud.KMSEnv, - localityInfo []jobspb.RestoreDetails_BackupLocalityInfo, ) error { ctx, span := tracing.ChildSpan(ctx, "backup.compaction") defer span.Finish() - allIters, err := backupinfo.GetBackupManifestIterFactories( - ctx, execCtx.ExecCfg().DistSQLSrv.ExternalStorage, backupChain, encryption, kmsEnv, - ) - if err != nil { - return err - } - compactChain, err := newCompactionChain(backupChain, details.StartTime, details.EndTime, allIters) - if err != nil { - return err - } - localityInfo = localityInfo[compactChain.startIdx:compactChain.endIdx] - chainToCompact := compactChain.chainToCompact log.Infof( ctx, "beginning compaction of %d backups: %s", - len(chainToCompact), util.Map(chainToCompact, func(m backuppb.BackupManifest) string { + len(c.chainToCompact), util.Map(c.chainToCompact, func(m backuppb.BackupManifest) string { return m.ID.String() }), ) - backupManifest, newDetails, err := prepareCompactedBackupMeta( - ctx, execCtx, jobID, details.Destination, compactChain, encryption, kmsEnv, allIters, - ) - if err != nil { - return err - } - if err := backupinfo.WriteBackupManifestCheckpoint( - ctx, newDetails.URI, encryption, kmsEnv, - backupManifest, execCtx.ExecCfg(), execCtx.User(), - ); err != nil { - return err - } - backupLocalityMap, err := makeBackupLocalityMap(localityInfo, execCtx.User()) + backupLocalityMap, err := makeBackupLocalityMap(c.compactedLocalityInfo, execCtx.User()) if err != nil { return err } - introducedSpanFrontier, err := createIntroducedSpanFrontier(backupChain, backupManifest.EndTime) + introducedSpanFrontier, err := createIntroducedSpanFrontier(c.backupChain, backupManifest.EndTime) if err != nil { return err } defer introducedSpanFrontier.Release() spanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) - backupCodec, err := backupinfo.MakeBackupCodec(chainToCompact) + backupCodec, err := backupinfo.MakeBackupCodec(c.chainToCompact) if err != nil { return err } @@ -248,6 +373,14 @@ func compactIncrementals( if err != nil { return err } + completedSpans, completedIntroducedSpans, err := getCompletedSpans( + ctx, execCtx, backupManifest, defaultStore, details.EncryptionOptions, kmsEnv, + ) + if err != nil { + return err + } + spans = filterSpans(spans, completedSpans) + spans = filterSpans(spans, completedIntroducedSpans) genSpan := func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error { defer close(spanCh) @@ -257,8 +390,8 @@ func compactIncrementals( return errors.Wrap(generateAndSendImportSpans( ctx, spans, - chainToCompact, - compactChain.compactedIterFactory, + c.chainToCompact, + c.compactedIterFactory, backupLocalityMap, filter, fsc, @@ -266,18 +399,14 @@ func compactIncrementals( ), "generate and send import spans") } - store, err := execCtx.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, newDetails.URI, execCtx.User()) - if err != nil { - return err - } - defer store.Close() progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) var tasks []func(context.Context) error + encryption := details.EncryptionOptions tasks = append(tasks, func(ctx context.Context) error { return genSpan(ctx, spanCh) }) tasks = append(tasks, func(ctx context.Context) error { - return runCompaction(ctx, execCtx, encryption, spanCh, newDetails, backupManifest, progCh, store) + return runCompaction(ctx, execCtx, encryption, spanCh, details, backupManifest, progCh, defaultStore) }) tasks = append(tasks, func(ctx context.Context) error { return processProgress(ctx, backupManifest, progCh) @@ -286,7 +415,62 @@ func compactIncrementals( if err := ctxgroup.GoAndWait(ctx, tasks...); err != nil { return err } - return concludeBackupCompaction(ctx, execCtx, store, encryption, kmsEnv, backupManifest) + return concludeBackupCompaction(ctx, execCtx, defaultStore, encryption, kmsEnv, backupManifest) +} + +// lastBackup returns the last backup of the chain to compact. +func (c *compactionChain) lastBackup() backuppb.BackupManifest { + return c.backupChain[c.endIdx-1] +} + +// newCompactionChain returns a new compacted backup chain based on the specified start and end +// timestamps from a chain of backups. The start and end times must specify specific backups. +func newCompactionChain( + manifests []backuppb.BackupManifest, + start, end hlc.Timestamp, + localityInfo []jobspb.RestoreDetails_BackupLocalityInfo, + layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, +) (compactionChain, error) { + // The start and end timestamps indicate a chain of incrementals and therefore should not + // include the full backup. + if start.Less(manifests[0].EndTime) { + return compactionChain{}, errors.Errorf( + "start time %s is before full backup end time %s", + start, manifests[0].EndTime, + ) + } + var startIdx, endIdx int + for idx, m := range manifests { + if m.StartTime.Equal(start) { + startIdx = idx + } + if m.EndTime.Equal(end) { + endIdx = idx + 1 + } + } + if startIdx == 0 { + return compactionChain{}, errors.Newf( + "no incrementals found with the specified start time %s", start, + ) + } else if endIdx == 0 { + return compactionChain{}, errors.Newf("no incrementals found with the specified end time %s", end) + } + + compactedIters := make(backupinfo.LayerToBackupManifestFileIterFactory) + for i := startIdx; i < endIdx; i++ { + compactedIters[i-startIdx] = layerToIterFactory[i] + } + return compactionChain{ + backupChain: manifests, + chainToCompact: manifests[startIdx:endIdx], + startIdx: startIdx, + endIdx: endIdx, + start: start, + end: end, + compactedLocalityInfo: localityInfo[startIdx:endIdx], + allIters: layerToIterFactory, + compactedIterFactory: compactedIters, + }, nil } func runCompaction( @@ -439,13 +623,13 @@ func openSSTs( }, nil } -// makeCompactionBackupDetails takes a backup chain (up until the end timestamp) +// updateCompactionBackupDetails takes a backup chain (up until the end timestamp) // and returns a corresponding BackupDetails for the compacted // backup of backups from the start timestamp to the end timestamp. -func makeCompactionBackupDetails( +func updateCompactionBackupDetails( ctx context.Context, compactionChain compactionChain, - dest jobspb.BackupDetails_Destination, + initialDetails jobspb.BackupDetails, resolvedDest backupdest.ResolvedDestination, encryption *jobspb.BackupEncryptionOptions, kmsEnv cloud.KMSEnv, @@ -456,10 +640,8 @@ func makeCompactionBackupDetails( var encryptionInfo *jobspb.EncryptionInfo if encryption != nil { var err error - _, encryptionInfo, err = backupencryption.MakeNewEncryptionOptions( - ctx, - *encryption, - kmsEnv, + encryption, encryptionInfo, err = backupencryption.MakeNewEncryptionOptions( + ctx, *encryption, kmsEnv, ) if err != nil { return jobspb.BackupDetails{}, err @@ -478,8 +660,10 @@ func makeCompactionBackupDetails( allDescsPb := util.Map(allDescs, func(desc catalog.Descriptor) descpb.Descriptor { return *desc.DescriptorProto() }) + destination := initialDetails.Destination + destination.Subdir = resolvedDest.ChosenSubdir compactedDetails := jobspb.BackupDetails{ - Destination: dest, + Destination: destination, StartTime: compactionChain.start, EndTime: compactionChain.end, URI: resolvedDest.DefaultURI, @@ -490,6 +674,7 @@ func makeCompactionBackupDetails( ResolvedTargets: allDescsPb, ResolvedCompleteDbs: lastBackup.CompleteDbs, FullCluster: lastBackup.DescriptorCoverage == tree.AllDescriptors, + Compact: true, } return compactedDetails, nil } @@ -554,57 +739,27 @@ func resolveBackupDirs( return resolvedBaseDirs, resolvedIncDirs, resolvedSubdir, nil } -// prepareCompactedBackupMeta prepares the manifest, job details, -// and resolved destination for the compacted backup based on the chain of backups. -func prepareCompactedBackupMeta( +// createCompactionManifest creates a new manifest for a compaction job and its +// compacted chain. +func createCompactionManifest( ctx context.Context, execCtx sql.JobExecContext, - jobID jobspb.JobID, - dest jobspb.BackupDetails_Destination, - compactionChain compactionChain, - encryption *jobspb.BackupEncryptionOptions, - kmsEnv cloud.KMSEnv, - layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, -) (*backuppb.BackupManifest, jobspb.BackupDetails, error) { - resolvedDest, err := backupdest.ResolveDest( - ctx, - execCtx.User(), - dest, - // While the end time of this compacted backup matches the end time of - // the last backup in the chain to compact, when resolving the - // destination we need to adjust the end time to ensure that the backup - // location doesn't clobber the last backup in the chain. We do this by - // adding a small duration (large enough to change the backup path) - // to the end time. - compactionChain.end.AddDuration(10*time.Millisecond), - execCtx.ExecCfg(), - ) - if err != nil { - return nil, jobspb.BackupDetails{}, err - } - details, err := makeCompactionBackupDetails( - ctx, compactionChain, dest, resolvedDest, encryption, kmsEnv, - ) - if err != nil { - return nil, jobspb.BackupDetails{}, err - } - if err = maybeWriteBackupLock(ctx, execCtx, resolvedDest, jobID); err != nil { - return nil, jobspb.BackupDetails{}, err - } - + details jobspb.BackupDetails, + compactChain compactionChain, +) (*backuppb.BackupManifest, error) { var tenantSpans []roachpb.Span var tenantInfos []mtinfopb.TenantInfoWithUsage - insqlDB := execCtx.ExecCfg().InternalDB - if err = insqlDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + if err := execCtx.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + var err error tenantSpans, tenantInfos, err = getTenantInfo(ctx, execCtx.ExecCfg().Codec, txn, details) return err }); err != nil { - return nil, jobspb.BackupDetails{}, err + return nil, err } // TODO (kev-cao): Will need to update the SSTSinkKeyWriter to support // range keys. if len(tenantSpans) != 0 || len(tenantInfos) != 0 { - return nil, jobspb.BackupDetails{}, errors.New("backup compactions does not yet support range keys") + return nil, errors.New("backup compactions does not yet support range keys") } m, err := createBackupManifest( ctx, @@ -612,15 +767,90 @@ func prepareCompactedBackupMeta( tenantSpans, tenantInfos, details, - compactionChain.backupChain, - layerToIterFactory, + compactChain.backupChain, + compactChain.allIters, + ) + if err != nil { + return nil, err + } + m.IntroducedSpans, err = compactIntroducedSpans(ctx, m, compactChain) + if err != nil { + return nil, err + } + return &m, nil +} + +// getBackupChain fetches the current shortest chain of backups (and its +// associated info) required to restore the to the end time specified in the details. +func getBackupChain( + ctx context.Context, + execCtx sql.JobExecContext, + details jobspb.BackupDetails, + kmsEnv cloud.KMSEnv, +) ( + []backuppb.BackupManifest, + []jobspb.RestoreDetails_BackupLocalityInfo, + *jobspb.BackupEncryptionOptions, + map[int]*backupinfo.IterFactory, + error, +) { + dest := details.Destination + resolvedBaseDirs, resolvedIncDirs, _, err := resolveBackupDirs( + ctx, execCtx, dest.To, dest.IncrementalStorage, dest.Subdir, + ) + if err != nil { + return nil, nil, nil, nil, err + } + mkStore := execCtx.ExecCfg().DistSQLSrv.ExternalStorageFromURI + baseStores, baseCleanup, err := backupdest.MakeBackupDestinationStores( + ctx, execCtx.User(), mkStore, resolvedBaseDirs, + ) + if err != nil { + return nil, nil, nil, nil, err + } + defer func() { + if err := baseCleanup(); err != nil { + log.Warningf(ctx, "failed to cleanup base backup stores: %+v", err) + } + }() + incStores, incCleanup, err := backupdest.MakeBackupDestinationStores( + ctx, execCtx.User(), mkStore, resolvedIncDirs, + ) + if err != nil { + return nil, nil, nil, nil, err + } + defer func() { + if err := incCleanup(); err != nil { + log.Warningf(ctx, "failed to cleanup incremental backup stores: %+v", err) + } + }() + encryption, err := backupencryption.GetEncryptionFromBaseStore( + ctx, baseStores[0], details.EncryptionOptions, kmsEnv, + ) + if err != nil { + return nil, nil, nil, nil, err + } + mem := execCtx.ExecCfg().RootMemoryMonitor.MakeBoundAccount() + defer mem.Close(ctx) + + _, manifests, localityInfo, memReserved, err := backupdest.ResolveBackupManifests( + ctx, &mem, baseStores, incStores, mkStore, resolvedBaseDirs, + resolvedIncDirs, details.EndTime, encryption, kmsEnv, + execCtx.User(), false, + ) + if err != nil { + return nil, nil, nil, nil, err + } + defer func() { + mem.Shrink(ctx, memReserved) + }() + allIters, err := backupinfo.GetBackupManifestIterFactories( + ctx, execCtx.ExecCfg().DistSQLSrv.ExternalStorage, manifests, encryption, kmsEnv, ) if err != nil { - return nil, jobspb.BackupDetails{}, err + return nil, nil, nil, nil, err } - manifest := &m - manifest.IntroducedSpans, err = compactIntroducedSpans(ctx, *manifest, compactionChain) - return manifest, details, err + return manifests, localityInfo, encryption, allIters, nil } // concludeBackupCompaction completes the backup compaction process after the backup has been @@ -676,72 +906,6 @@ func processProgress( return nil } -type compactionChain struct { - // backupChain is the linear chain of backups up to the end time required - // for a restore. - backupChain []backuppb.BackupManifest - chainToCompact []backuppb.BackupManifest - // start refers to the start time of the first backup to be compacted - // end refers to the end time of the last backup to be compacted - start, end hlc.Timestamp - // Inclusive startIdx and exclusive endIdx of the sub-chain to compact. - startIdx, endIdx int - // Iter factory for just the backups in the chain to compact. - compactedIterFactory backupinfo.LayerToBackupManifestFileIterFactory -} - -// lastBackup returns the last backup of the chain to compact. -func (c *compactionChain) lastBackup() backuppb.BackupManifest { - return c.backupChain[c.endIdx-1] -} - -// newCompactionChain returns a new compacted backup chain based on the specified start and end -// timestamps from a chain of backups. The start and end times must specify specific backups. -func newCompactionChain( - manifests []backuppb.BackupManifest, - start, end hlc.Timestamp, - layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, -) (compactionChain, error) { - // The start and end timestamps indicate a chain of incrementals and therefore should not - // include the full backup. - if start.Less(manifests[0].EndTime) { - return compactionChain{}, errors.Errorf( - "start time %s is before full backup end time %s", - start, manifests[0].EndTime, - ) - } - var startIdx, endIdx int - for idx, m := range manifests { - if m.StartTime.Equal(start) { - startIdx = idx - } - if m.EndTime.Equal(end) { - endIdx = idx + 1 - } - } - if startIdx == 0 { - return compactionChain{}, errors.Newf( - "no incrementals found with the specified start time %s", start, - ) - } else if endIdx == 0 { - return compactionChain{}, errors.Newf("no incrementals found with the specified end time %s", end) - } - - compactedIters := make(backupinfo.LayerToBackupManifestFileIterFactory) - for i := startIdx; i < endIdx; i++ { - compactedIters[i-startIdx] = layerToIterFactory[i] - } - return compactionChain{ - backupChain: manifests, - chainToCompact: manifests[startIdx:endIdx], - startIdx: startIdx, - endIdx: endIdx, - start: start, - end: end, - compactedIterFactory: compactedIters, - }, nil -} - func compactionJobDescription(details jobspb.BackupDetails) (string, error) { fmtCtx := tree.NewFmtCtx(tree.FmtSimple) redactedURIs, err := sanitizeURIList(details.Destination.To) diff --git a/pkg/backup/backup_compaction_test.go b/pkg/backup/backup_compaction_test.go index 077558266a45..893194b6861e 100644 --- a/pkg/backup/backup_compaction_test.go +++ b/pkg/backup/backup_compaction_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -45,12 +46,12 @@ func TestBackupCompaction(t *testing.T) { ) defer cleanupDB() - startAndAwaitCompaction := func(start, end string) { + startCompaction := func(start, end string) jobspb.JobID { compactionBuiltin := "SELECT crdb_internal.backup_compaction(ARRAY['nodelocal://1/backup'], 'LATEST', ''::BYTES, '%s'::DECIMAL, '%s'::DECIMAL)" row := db.QueryRow(t, fmt.Sprintf(compactionBuiltin, start, end)) var jobID jobspb.JobID row.Scan(&jobID) - waitForSuccessfulJob(t, tc, jobID) + return jobID } fullBackupAostCmd := "BACKUP INTO 'nodelocal://1/backup' AS OF SYSTEM TIME '%s'" incBackupCmd := "BACKUP INTO LATEST IN 'nodelocal://1/backup'" @@ -78,7 +79,7 @@ func TestBackupCompaction(t *testing.T) { t, fmt.Sprintf(incBackupAostCmd, end), ) - startAndAwaitCompaction(start, end) + waitForSuccessfulJob(t, tc, startCompaction(start, end)) validateCompactedBackupForTables(t, db, []string{"foo"}, "'nodelocal://1/backup'") start = end } @@ -117,7 +118,7 @@ func TestBackupCompaction(t *testing.T) { t, fmt.Sprintf(incBackupAostCmd, end), ) - startAndAwaitCompaction(start, end) + waitForSuccessfulJob(t, tc, startCompaction(start, end)) validateCompactedBackupForTables( t, db, []string{"foo", "bar", "baz"}, @@ -130,7 +131,7 @@ func TestBackupCompaction(t *testing.T) { t, fmt.Sprintf(incBackupAostCmd, end), ) - startAndAwaitCompaction(start, end) + waitForSuccessfulJob(t, tc, startCompaction(start, end)) db.Exec(t, "DROP TABLE foo, baz") db.Exec(t, "RESTORE FROM LATEST IN 'nodelocal://1/backup'") @@ -158,7 +159,7 @@ func TestBackupCompaction(t *testing.T) { t, fmt.Sprintf(incBackupAostCmd, end), ) - startAndAwaitCompaction(start, end) + waitForSuccessfulJob(t, tc, startCompaction(start, end)) var numIndexes, restoredNumIndexes int db.QueryRow(t, "SELECT count(*) FROM [SHOW INDEXES FROM foo]").Scan(&numIndexes) @@ -199,7 +200,7 @@ func TestBackupCompaction(t *testing.T) { db.Exec(t, "INSERT INTO foo VALUES (6, 6)") db.Exec(t, incBackupCmd) - startAndAwaitCompaction(start, end) + waitForSuccessfulJob(t, tc, startCompaction(start, end)) validateCompactedBackupForTables(t, db, []string{"foo"}, "'nodelocal://1/backup'") }) @@ -227,7 +228,7 @@ func TestBackupCompaction(t *testing.T) { ), ) - startAndAwaitCompaction(start, end) + waitForSuccessfulJob(t, tc, startCompaction(start, end)) validateCompactedBackupForTables(t, db, []string{"foo"}, "'nodelocal://1/backup'") }) @@ -270,6 +271,41 @@ crdb_internal.json_to_pb( t, db, []string{"foo"}, "'nodelocal://1/backup'", opts, ) }) + + t.Run("pause resume and cancel", func(t *testing.T) { + db.Exec(t, "CREATE TABLE foo (a INT, b INT)") + defer func() { + db.Exec(t, "DROP TABLE foo, bar") + }() + db.Exec(t, "INSERT INTO foo VALUES (1, 1)") + start := getTime() + db.Exec(t, fmt.Sprintf(fullBackupAostCmd, start)) + db.Exec(t, "INSERT INTO foo VALUES (2, 2)") + db.Exec(t, "CREATE TABLE bar (a INT, b INT)") + db.Exec(t, "INSERT INTO bar VALUES (1, 1)") + end := getTime() + db.Exec(t, fmt.Sprintf(incBackupAostCmd, end)) + db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.details_has_checkpoint'") + defer func() { + db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = ''") + }() + + jobID := startCompaction(start, end) + jobutils.WaitForJobToPause(t, db, jobID) + db.Exec(t, "RESUME JOB $1", jobID) + waitForSuccessfulJob(t, tc, jobID) + validateCompactedBackupForTables(t, db, []string{"foo", "bar"}, "'nodelocal://1/backup'") + + db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = ''") + db.Exec(t, "INSERT INTO bar VALUES (4, 4)") + end = getTime() + db.Exec(t, fmt.Sprintf(incBackupAostCmd, end)) + db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.details_has_checkpoint'") + jobID = startCompaction(start, end) + jobutils.WaitForJobToPause(t, db, jobID) + db.Exec(t, "CANCEL JOB $1", jobID) + jobutils.WaitForJobToCancel(t, db, jobID) + }) // TODO (kev-cao): Once range keys are supported by the compaction // iterator, add tests for dropped tables/indexes. } diff --git a/pkg/backup/backup_job.go b/pkg/backup/backup_job.go index f6520aeaf30e..1ec1814310be 100644 --- a/pkg/backup/backup_job.go +++ b/pkg/backup/backup_job.go @@ -535,8 +535,8 @@ func (b *backupResumer) DumpTraceAfterRun() bool { // Resume is part of the jobs.Resumer interface. func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { // The span is finished by the registry executing the job. - initialDetails := b.job.Details().(jobspb.BackupDetails) p := execCtx.(sql.JobExecContext) + initialDetails := b.job.Details().(jobspb.BackupDetails) if err := maybeRelocateJobExecution( ctx, b.job.ID(), p, initialDetails.ExecutionLocality, "BACKUP", @@ -548,14 +548,6 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { return err } - // TODO (kev-cao): there is a decent amount of overlap between the initial - // setup of classic backup and backup compaction with some minor differences, - // (e.g. lock writing, checkpointing, etc.). It would be nice to unify these - // with a common setup function. - if initialDetails.Compact { - return compactBackups(ctx, b.job.ID(), p, initialDetails) - } - kmsEnv := backupencryption.MakeBackupKMSEnv( p.ExecCfg().Settings, &p.ExecCfg().ExternalIODirConfig, @@ -563,6 +555,9 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { p.User(), ) + if initialDetails.Compact { + return b.ResumeCompaction(ctx, initialDetails, p, &kmsEnv) + } // Resolve the backup destination. We can skip this step if we // have already resolved and persisted the destination either // during a previous resumption of this job. @@ -707,7 +702,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { statsCache := p.ExecCfg().TableStatsCache // We retry on pretty generic failures -- any rpc error. If a worker node were // to restart, it would produce this kind of error, but there may be other - // errors that are also rpc errors. Don't retry to aggressively. + // errors that are also rpc errors. Don't retry too aggressively. retryOpts := retry.Options{ MaxBackoff: 1 * time.Second, MaxRetries: 5, @@ -764,7 +759,6 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { // previous attempts. var reloadBackupErr error mem.Shrink(ctx, memSize) - memSize = 0 backupManifest, memSize, reloadBackupErr = b.readManifestOnResume(ctx, &mem, p.ExecCfg(), defaultStore, details, p.User(), &kmsEnv) if reloadBackupErr != nil { @@ -1722,7 +1716,7 @@ func getBackupDetailAndManifest( if len(backupDestination.PrevBackupURIs) != 0 { var err error baseEncryptionOptions, err = backupencryption.GetEncryptionFromBase(ctx, user, makeCloudStorage, - backupDestination.PrevBackupURIs[0], *initialDetails.EncryptionOptions, &kmsEnv) + backupDestination.PrevBackupURIs[0], initialDetails.EncryptionOptions, &kmsEnv) if err != nil { return jobspb.BackupDetails{}, nil, err } diff --git a/pkg/backup/backupdest/backup_destination.go b/pkg/backup/backupdest/backup_destination.go index 87db2f45053b..730a6a023f53 100644 --- a/pkg/backup/backupdest/backup_destination.go +++ b/pkg/backup/backupdest/backup_destination.go @@ -13,6 +13,7 @@ import ( "path" "regexp" "strings" + "time" "github.com/cockroachdb/cockroach/pkg/backup/backupbase" "github.com/cockroachdb/cockroach/pkg/backup/backupinfo" @@ -239,6 +240,25 @@ func ResolveDest( }, nil } +// ResolveDestForCompaction resolves the destination for a compacted backup. +// While the end time of this compacted backup matches the end time of +// the last backup in the chain to compact, when resolving the +// destination we need to adjust the end time to ensure that the backup +// location doesn't clobber the last backup in the chain. We do this by +// adding a small duration (large enough to change the backup path) +// to the end time. +func ResolveDestForCompaction( + ctx context.Context, execCtx sql.JobExecContext, details jobspb.BackupDetails, +) (ResolvedDestination, error) { + return ResolveDest( + ctx, + execCtx.User(), + details.Destination, + details.EndTime.AddDuration(10*time.Millisecond), + execCtx.ExecCfg(), + ) +} + // ReadLatestFile reads the LATEST file from collectionURI and returns the path // stored in the file. func ReadLatestFile( diff --git a/pkg/backup/backupencryption/encryption.go b/pkg/backup/backupencryption/encryption.go index 34885b863b25..5081b3452f75 100644 --- a/pkg/backup/backupencryption/encryption.go +++ b/pkg/backup/backupencryption/encryption.go @@ -338,10 +338,10 @@ func GetEncryptionFromBase( user username.SQLUsername, makeCloudStorage cloud.ExternalStorageFromURIFactory, baseBackupURI string, - encryptionParams jobspb.BackupEncryptionOptions, + encryptionParams *jobspb.BackupEncryptionOptions, kmsEnv cloud.KMSEnv, ) (*jobspb.BackupEncryptionOptions, error) { - if encryptionParams.Mode == jobspb.EncryptionMode_None { + if encryptionParams == nil || encryptionParams.Mode == jobspb.EncryptionMode_None { return nil, nil } exportStore, err := makeCloudStorage(ctx, baseBackupURI, user) @@ -356,10 +356,10 @@ func GetEncryptionFromBase( func GetEncryptionFromBaseStore( ctx context.Context, baseStore cloud.ExternalStorage, - encryptionParams jobspb.BackupEncryptionOptions, + encryptionParams *jobspb.BackupEncryptionOptions, kmsEnv cloud.KMSEnv, ) (*jobspb.BackupEncryptionOptions, error) { - if encryptionParams.Mode == jobspb.EncryptionMode_None { + if encryptionParams == nil || encryptionParams.Mode == jobspb.EncryptionMode_None { return nil, nil } opts, err := ReadEncryptionOptions(ctx, baseStore)