Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-658 Add retry mechanism to XDR info commands #207

Merged
merged 10 commits into from
Jan 28, 2025
34 changes: 22 additions & 12 deletions cmd/internal/app/asbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func NewASBackup(
if params.isStopXDR() {
logger.Info("stopping XDR on the database")

if err := stopXDR(aerospikeClient, backupXDRConfig); err != nil {
return nil, err
if err = stopXDR(aerospikeClient, backupXDRConfig, params.BackupXDRParams.InfoRetryAttempts); err != nil {
return nil, fmt.Errorf("failed to stop XDR: %w", err)
}

return nil, nil
Expand All @@ -148,8 +148,8 @@ func NewASBackup(
if params.isUnblockMRT() {
logger.Info("enabling MRT writes on the database")

if err := unblockMRT(aerospikeClient, backupXDRConfig); err != nil {
return nil, err
if err = unblockMRT(aerospikeClient, backupXDRConfig, params.BackupXDRParams.InfoRetryAttempts); err != nil {
return nil, fmt.Errorf("failed to enable MRT: %w", err)
}

return nil, nil
Expand Down Expand Up @@ -309,24 +309,34 @@ func getSecretAgent(b *backup.ConfigBackup, bxdr *backup.ConfigBackupXDR) *backu
}
}

func stopXDR(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error {
func stopXDR(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR, attempts int) error {
infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy)

if err := infoClient.StopXDR(cfg.DC); err != nil {
return fmt.Errorf("failed to stop xdr: %w", err)
var err error

for range attempts {
err = infoClient.StopXDR(cfg.DC)
if err == nil {
return nil
}
}

return nil
return err
}

func unblockMRT(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error {
func unblockMRT(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR, attempts int) error {
infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy)

if err := infoClient.UnBlockMRTWrites(cfg.Namespace); err != nil {
return fmt.Errorf("failed to unblock MRT: %w", err)
var err error

for range attempts {
err = infoClient.UnBlockMRTWrites(cfg.Namespace)
if err == nil {
return nil
}
}

return nil
return err
}

func checkVersion(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error {
Expand Down
50 changes: 50 additions & 0 deletions cmd/internal/app/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,63 @@ func validateBackup(params *ASBackupParams) error {
}
}

if params.BackupXDRParams != nil {
if err := validateBackupXDRParams(params.BackupXDRParams); err != nil {
return err
}
}

if err := validateStorages(params.AwsS3, params.GcpStorage, params.AzureBlob); err != nil {
return err
}

return nil
}

func validateBackupXDRParams(params *models.BackupXDR) error {
if params.InfoRetryAttempts < 1 {
return fmt.Errorf("backup xdr retry attempts can't be less than 1")
}

if params.ReadTimeoutMilliseconds < 0 {
return fmt.Errorf("backup xdr read timeout can't be negative")
}

if params.WriteTimeoutMilliseconds < 0 {
return fmt.Errorf("backup xdr write timeout can't be negative")
}

if params.InfoPolingPeriodMilliseconds < 0 {
return fmt.Errorf("backup xdr info poling period can't be negative")
}

if params.StartTimeoutMilliseconds < 0 {
return fmt.Errorf("backup xdr start timeout can't be negative")
}

if params.ResultQueueSize < 0 {
return fmt.Errorf("backup xdr result queue size can't be negative")
}

if params.AckQueueSize < 0 {
return fmt.Errorf("backup xdr ack queue size can't be negative")
}

if params.MaxConnections < 1 {
return fmt.Errorf("backup xdr max connections can't be less than 1")
}

if params.ParallelWrite < 1 {
return fmt.Errorf("backup xdr parallel write can't be less than 1")
filkeith marked this conversation as resolved.
Show resolved Hide resolved
}

if params.FileLimit < 1 {
return fmt.Errorf("backup xdr file limit can't be less than 1")
}

return nil
}

func validateRestore(params *ASRestoreParams) error {
if params.RestoreParams != nil && params.CommonParams != nil {
switch params.RestoreParams.Mode {
Expand Down
151 changes: 151 additions & 0 deletions cmd/internal/app/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,3 +654,154 @@ func TestValidateRestoreParams(t *testing.T) {
})
}
}

func Test_validateBackupXDRParams(t *testing.T) {
tests := []struct {
name string
params *models.BackupXDR
wantErr string
}{
{
name: "valid params",
params: &models.BackupXDR{
InfoRetryAttempts: 3,
ReadTimeoutMilliseconds: 1000,
WriteTimeoutMilliseconds: 1000,
InfoPolingPeriodMilliseconds: 1000,
StartTimeoutMilliseconds: 1000,
ResultQueueSize: 100,
AckQueueSize: 100,
MaxConnections: 10,
ParallelWrite: 5,
FileLimit: 1000,
},
wantErr: "",
},
{
name: "invalid info retry attempts",
params: &models.BackupXDR{
InfoRetryAttempts: 0,
},
wantErr: "backup xdr retry attempts can't be less than 1",
},
{
name: "negative read timeout",
params: &models.BackupXDR{
InfoRetryAttempts: 1,
ReadTimeoutMilliseconds: -1,
},
wantErr: "backup xdr read timeout can't be negative",
},
{
name: "negative write timeout",
params: &models.BackupXDR{
InfoRetryAttempts: 1,
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: -1,
},
wantErr: "backup xdr write timeout can't be negative",
},
{
name: "negative info polling period",
params: &models.BackupXDR{
InfoRetryAttempts: 1,
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: -1,
},
wantErr: "backup xdr info poling period can't be negative",
},
{
name: "negative start timeout",
params: &models.BackupXDR{
InfoRetryAttempts: 1,
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: -1,
},
wantErr: "backup xdr start timeout can't be negative",
},
{
name: "negative result queue size",
params: &models.BackupXDR{
InfoRetryAttempts: 1,
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: 0,
ResultQueueSize: -1,
},
wantErr: "backup xdr result queue size can't be negative",
},
{
name: "negative ack queue size",
params: &models.BackupXDR{
InfoRetryAttempts: 1,
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: 0,
ResultQueueSize: 0,
AckQueueSize: -1,
},
wantErr: "backup xdr ack queue size can't be negative",
},
{
name: "invalid max connections",
params: &models.BackupXDR{
InfoRetryAttempts: 1,
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: 0,
ResultQueueSize: 0,
AckQueueSize: 0,
MaxConnections: 0,
},
wantErr: "backup xdr max connections can't be less than 1",
},
{
name: "invalid parallel write",
params: &models.BackupXDR{
InfoRetryAttempts: 1,
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: 0,
ResultQueueSize: 0,
AckQueueSize: 0,
MaxConnections: 1,
ParallelWrite: 0,
},
wantErr: "backup xdr parallel write can't be less than 1",
},
{
name: "invalid file limit",
params: &models.BackupXDR{
InfoRetryAttempts: 1,
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: 0,
ResultQueueSize: 0,
AckQueueSize: 0,
MaxConnections: 1,
ParallelWrite: 1,
FileLimit: 0,
},
wantErr: "backup xdr file limit can't be less than 1",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateBackupXDRParams(tt.params)
if tt.wantErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, tt.wantErr)
}
})
}
}
3 changes: 3 additions & 0 deletions cmd/internal/flags/backup_xdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func (f *BackupXDR) NewFlagSet() *pflag.FlagSet {
1000,
"How often (in milliseconds) a backup client will send info commands to check aerospike cluster stats.\n"+
"To measure recovery state and lag.")
flagSet.IntVar(&f.InfoRetryAttempts, "info-retry-attempts", 1,
filkeith marked this conversation as resolved.
Show resolved Hide resolved
filkeith marked this conversation as resolved.
Show resolved Hide resolved
"How many times to retry to send info commands before failing. "+
"This parameter is applied to stop xdr and unblock MRT writes requests.")
flagSet.Int64Var(&f.StartTimeoutMilliseconds, "start-timeout",
30000,
"Timeout for starting TCP server for XDR.\n"+
Expand Down
1 change: 1 addition & 0 deletions cmd/internal/models/backup_xdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type BackupXDR struct {
AckQueueSize int
MaxConnections int
InfoPolingPeriodMilliseconds int64
InfoRetryAttempts int
filkeith marked this conversation as resolved.
Show resolved Hide resolved
StartTimeoutMilliseconds int64

TLSConfig *tls.Config
Expand Down
Loading