Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-658 Add retry mechanism to XDR info commands #207

Merged
merged 10 commits into from
Jan 28, 2025
Merged
83 changes: 29 additions & 54 deletions cmd/internal/app/asbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"log/slog"

"github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup-go"
"github.com/aerospike/backup-go/cmd/internal/models"
"github.com/aerospike/backup-go/internal/asinfo"
Expand Down Expand Up @@ -128,31 +127,43 @@ func NewASBackup(
}

if params.BackupXDRParams != nil {
if err := checkVersion(aerospikeClient, backupXDRConfig); err != nil {
return nil, err
}
}
infoClient := asinfo.NewInfoClientFromAerospike(
aerospikeClient,
backupXDRConfig.InfoPolicy,
backupXDRConfig.InfoRetryPolicy,
)

// Stop xdr.
if params.isStopXDR() {
logger.Info("stopping XDR on the database")
version, err := infoClient.GetVersion()
if err != nil {
return nil, fmt.Errorf("failed to get version: %w", err)
}

if err := stopXDR(aerospikeClient, backupXDRConfig); err != nil {
return nil, err
if version.Major < xdrSupportedVersion {
return nil, fmt.Errorf("version %s is unsupported, only databse version %d+ is supproted",
version.String(), xdrSupportedVersion)
}

return nil, nil
}
// Stop xdr.
if params.isStopXDR() {
logger.Info("stopping XDR on the database")

// Unblock mRT.
if params.isUnblockMRT() {
logger.Info("enabling MRT writes on the database")
if err = infoClient.StopXDR(backupXDRConfig.DC); err != nil {
return nil, fmt.Errorf("failed to stop XDR: %w", err)
}

if err := unblockMRT(aerospikeClient, backupXDRConfig); err != nil {
return nil, err
return nil, nil
}

return nil, nil
// Unblock mRT.
if params.isUnblockMRT() {
logger.Info("enabling MRT writes on the database")

if err = infoClient.UnBlockMRTWrites(backupXDRConfig.Namespace); err != nil {
return nil, fmt.Errorf("failed to enable MRT writes: %w", err)
}

return nil, nil
}
}

backupClient, err := backup.NewClient(aerospikeClient, backup.WithLogger(logger), backup.WithID(idBackup))
Expand Down Expand Up @@ -308,39 +319,3 @@ func getSecretAgent(b *backup.ConfigBackup, bxdr *backup.ConfigBackupXDR) *backu
return nil
}
}

func stopXDR(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error {
infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy)

if err := infoClient.StopXDR(cfg.DC); err != nil {
return fmt.Errorf("failed to stop xdr: %w", err)
}

return nil
}

func unblockMRT(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error {
infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy)

if err := infoClient.UnBlockMRTWrites(cfg.Namespace); err != nil {
return fmt.Errorf("failed to unblock MRT: %w", err)
}

return nil
}

func checkVersion(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error {
infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy)

version, err := infoClient.GetVersion()
if err != nil {
return fmt.Errorf("failed to get version: %w", err)
}

if version.Major < xdrSupportedVersion {
return fmt.Errorf("version %s is unsupported, only databse version %d+ is supproted",
version.String(), xdrSupportedVersion)
}

return nil
}
11 changes: 11 additions & 0 deletions cmd/internal/app/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ func mapBackupConfig(params *ASBackupParams) (*backup.ConfigBackup, error) {
c.ModAfter = &modAfterTime
}

c.InfoRetryPolicy = mapRetryPolicy(
params.BackupParams.InfoRetryIntervalMilliseconds,
params.BackupParams.InfoRetriesMultiplier,
params.BackupParams.InfoMaxRetries,
)

return c, nil
}

Expand Down Expand Up @@ -148,6 +154,11 @@ func mapBackupXDRConfig(params *ASBackupParams) *backup.ConfigBackupXDR {
MaxConnections: params.BackupXDRParams.MaxConnections,
InfoPolingPeriodMilliseconds: params.BackupXDRParams.InfoPolingPeriodMilliseconds,
StartTimeoutMilliseconds: params.BackupXDRParams.StartTimeoutMilliseconds,
InfoRetryPolicy: mapRetryPolicy(
params.BackupXDRParams.InfoRetryIntervalMilliseconds,
params.BackupXDRParams.InfoRetriesMultiplier,
params.BackupXDRParams.InfoMaxRetries,
),
}

return c
Expand Down
5 changes: 3 additions & 2 deletions cmd/internal/app/reports.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ func printBackupReport(stats, xdrStats *bModels.BackupStats) {

fmt.Printf("Records Read: %d\n", stats.GetReadRecords())

var bw, fw uint64
var bw, fw, tr uint64
if xdrStats != nil {
bw = xdrStats.GetBytesWritten()
fw = xdrStats.GetFileCount()
tr = xdrStats.TotalRecords
fmt.Printf("Records Received: %d\n", xdrStats.GetReadRecords())
}

Expand All @@ -52,7 +53,7 @@ func printBackupReport(stats, xdrStats *bModels.BackupStats) {
fmt.Println()

fmt.Printf("Bytes Written: %d bytes\n", stats.GetBytesWritten()+bw)
fmt.Printf("Total Records: %d\n", stats.TotalRecords)
fmt.Printf("Total Records: %d\n", stats.TotalRecords+tr)
fmt.Printf("Files Written: %d\n", stats.GetFileCount()+fw)
}

Expand Down
46 changes: 46 additions & 0 deletions cmd/internal/app/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,59 @@ func validateBackup(params *ASBackupParams) error {
}
}

if params.BackupXDRParams != nil {
if err := validateBackupXDRParams(params.BackupXDRParams); err != nil {
return err
}
}

if err := validateStorages(params.AwsS3, params.GcpStorage, params.AzureBlob); err != nil {
return err
}

return nil
}

func validateBackupXDRParams(params *models.BackupXDR) error {
if params.ReadTimeoutMilliseconds < 0 {
return fmt.Errorf("backup xdr read timeout can't be negative")
}

if params.WriteTimeoutMilliseconds < 0 {
return fmt.Errorf("backup xdr write timeout can't be negative")
}

if params.InfoPolingPeriodMilliseconds < 0 {
return fmt.Errorf("backup xdr info poling period can't be negative")
}

if params.StartTimeoutMilliseconds < 0 {
return fmt.Errorf("backup xdr start timeout can't be negative")
}

if params.ResultQueueSize < 0 {
return fmt.Errorf("backup xdr result queue size can't be negative")
}

if params.AckQueueSize < 0 {
return fmt.Errorf("backup xdr ack queue size can't be negative")
}

if params.MaxConnections < 1 {
return fmt.Errorf("backup xdr max connections can't be less than 1")
}

if params.ParallelWrite < 0 {
return fmt.Errorf("backup xdr parallel write can't be negative")
}

if params.FileLimit < 1 {
return fmt.Errorf("backup xdr file limit can't be less than 1")
}

return nil
}

func validateRestore(params *ASRestoreParams) error {
if params.RestoreParams != nil && params.CommonParams != nil {
switch params.RestoreParams.Mode {
Expand Down
120 changes: 120 additions & 0 deletions cmd/internal/app/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,3 +654,123 @@ func TestValidateRestoreParams(t *testing.T) {
})
}
}

func Test_validateBackupXDRParams(t *testing.T) {
tests := []struct {
name string
params *models.BackupXDR
wantErr string
}{
{
name: "valid params",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 1000,
WriteTimeoutMilliseconds: 1000,
InfoPolingPeriodMilliseconds: 1000,
StartTimeoutMilliseconds: 1000,
ResultQueueSize: 100,
AckQueueSize: 100,
MaxConnections: 10,
ParallelWrite: 5,
FileLimit: 1000,
},
wantErr: "",
},
{
name: "negative read timeout",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: -1,
},
wantErr: "backup xdr read timeout can't be negative",
},
{
name: "negative write timeout",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: -1,
},
wantErr: "backup xdr write timeout can't be negative",
},
{
name: "negative info polling period",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: -1,
},
wantErr: "backup xdr info poling period can't be negative",
},
{
name: "negative start timeout",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: -1,
},
wantErr: "backup xdr start timeout can't be negative",
},
{
name: "negative result queue size",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: 0,
ResultQueueSize: -1,
},
wantErr: "backup xdr result queue size can't be negative",
},
{
name: "negative ack queue size",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: 0,
ResultQueueSize: 0,
AckQueueSize: -1,
},
wantErr: "backup xdr ack queue size can't be negative",
},
{
name: "invalid max connections",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: 0,
ResultQueueSize: 0,
AckQueueSize: 0,
MaxConnections: 0,
},
wantErr: "backup xdr max connections can't be less than 1",
},
{
name: "invalid file limit",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: 0,
ResultQueueSize: 0,
AckQueueSize: 0,
MaxConnections: 1,
ParallelWrite: 1,
FileLimit: 0,
},
wantErr: "backup xdr file limit can't be less than 1",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateBackupXDRParams(tt.params)
if tt.wantErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, tt.wantErr)
}
})
}
}
10 changes: 10 additions & 0 deletions cmd/internal/flags/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ func (f *Backup) NewFlagSet() *pflag.FlagSet {
"Number of records will be read on one iteration for continuation backup.\n"+
"Affects size if overlap on resuming backup after an error.\n"+
"Used only with --state-file-dst or --continue.")
flagSet.Int64Var(&f.InfoRetryIntervalMilliseconds, "info-retry-timeout", 1000,
"Set the initial timeout for a retry in milliseconds when info commands are sent."+
"This parameter is applied to stop xdr and unblock MRT writes requests.")
flagSet.Float64Var(&f.InfoRetriesMultiplier, "info-retry-multiplier",
1,
"Used to increase the delay between subsequent retry attempts.\n"+
"The actual delay is calculated as: info-retry-timeout * (info-retry-multiplier ^ attemptNumber)")
flagSet.UintVar(&f.InfoMaxRetries, "info-max-retries", 3,
reugn marked this conversation as resolved.
Show resolved Hide resolved
"How many times to retry to send info commands before failing. "+
"This parameter is applied to stop xdr and unblock MRT writes requests.")

return flagSet
}
Expand Down
10 changes: 10 additions & 0 deletions cmd/internal/flags/backup_xdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ func (f *BackupXDR) NewFlagSet() *pflag.FlagSet {
1000,
"How often (in milliseconds) a backup client will send info commands to check aerospike cluster stats.\n"+
"To measure recovery state and lag.")
flagSet.Int64Var(&f.InfoRetryIntervalMilliseconds, "info-retry-timeout", 1000,
"Set the initial timeout for a retry in milliseconds when info commands are sent."+
"This parameter is applied to stop xdr and unblock MRT writes requests.")
flagSet.Float64Var(&f.InfoRetriesMultiplier, "info-retry-multiplier",
1,
"Used to increase the delay between subsequent retry attempts.\n"+
"The actual delay is calculated as: info-retry-timeout * (info-retry-multiplier ^ attemptNumber)")
flagSet.UintVar(&f.InfoMaxRetries, "info-max-retries", 3,
filkeith marked this conversation as resolved.
Show resolved Hide resolved
"How many times to retry to send info commands before failing. "+
"This parameter is applied to stop xdr and unblock MRT writes requests.")
flagSet.Int64Var(&f.StartTimeoutMilliseconds, "start-timeout",
30000,
"Timeout for starting TCP server for XDR.\n"+
Expand Down
4 changes: 4 additions & 0 deletions cmd/internal/models/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type Backup struct {
Continue string
ScanPageSize int64
OutputFilePrefix string

InfoMaxRetries uint
InfoRetriesMultiplier float64
InfoRetryIntervalMilliseconds int64
}

// ShouldClearTarget check if we should clean target directory.
Expand Down
4 changes: 4 additions & 0 deletions cmd/internal/models/backup_xdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ type BackupXDR struct {

StopXDR bool
UnblockMRT bool

InfoMaxRetries uint
InfoRetriesMultiplier float64
InfoRetryIntervalMilliseconds int64
}
Loading