Skip to content

Commit

Permalink
FMWK-658 Add retry mechanism to XDR info commands (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
filkeith authored Jan 28, 2025
1 parent 7b1c6ba commit fcff047
Show file tree
Hide file tree
Showing 19 changed files with 424 additions and 117 deletions.
83 changes: 29 additions & 54 deletions cmd/internal/app/asbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"log/slog"

"github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup-go"
"github.com/aerospike/backup-go/cmd/internal/models"
"github.com/aerospike/backup-go/internal/asinfo"
Expand Down Expand Up @@ -128,31 +127,43 @@ func NewASBackup(
}

if params.BackupXDRParams != nil {
if err := checkVersion(aerospikeClient, backupXDRConfig); err != nil {
return nil, err
}
}
infoClient := asinfo.NewInfoClientFromAerospike(
aerospikeClient,
backupXDRConfig.InfoPolicy,
backupXDRConfig.InfoRetryPolicy,
)

// Stop xdr.
if params.isStopXDR() {
logger.Info("stopping XDR on the database")
version, err := infoClient.GetVersion()
if err != nil {
return nil, fmt.Errorf("failed to get version: %w", err)
}

if err := stopXDR(aerospikeClient, backupXDRConfig); err != nil {
return nil, err
if version.Major < xdrSupportedVersion {
return nil, fmt.Errorf("version %s is unsupported, only databse version %d+ is supproted",
version.String(), xdrSupportedVersion)
}

return nil, nil
}
// Stop xdr.
if params.isStopXDR() {
logger.Info("stopping XDR on the database")

// Unblock mRT.
if params.isUnblockMRT() {
logger.Info("enabling MRT writes on the database")
if err = infoClient.StopXDR(backupXDRConfig.DC); err != nil {
return nil, fmt.Errorf("failed to stop XDR: %w", err)
}

if err := unblockMRT(aerospikeClient, backupXDRConfig); err != nil {
return nil, err
return nil, nil
}

return nil, nil
// Unblock mRT.
if params.isUnblockMRT() {
logger.Info("enabling MRT writes on the database")

if err = infoClient.UnBlockMRTWrites(backupXDRConfig.Namespace); err != nil {
return nil, fmt.Errorf("failed to enable MRT writes: %w", err)
}

return nil, nil
}
}

backupClient, err := backup.NewClient(aerospikeClient, backup.WithLogger(logger), backup.WithID(idBackup))
Expand Down Expand Up @@ -308,39 +319,3 @@ func getSecretAgent(b *backup.ConfigBackup, bxdr *backup.ConfigBackupXDR) *backu
return nil
}
}

func stopXDR(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error {
infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy)

if err := infoClient.StopXDR(cfg.DC); err != nil {
return fmt.Errorf("failed to stop xdr: %w", err)
}

return nil
}

func unblockMRT(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error {
infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy)

if err := infoClient.UnBlockMRTWrites(cfg.Namespace); err != nil {
return fmt.Errorf("failed to unblock MRT: %w", err)
}

return nil
}

func checkVersion(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error {
infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy)

version, err := infoClient.GetVersion()
if err != nil {
return fmt.Errorf("failed to get version: %w", err)
}

if version.Major < xdrSupportedVersion {
return fmt.Errorf("version %s is unsupported, only databse version %d+ is supproted",
version.String(), xdrSupportedVersion)
}

return nil
}
11 changes: 11 additions & 0 deletions cmd/internal/app/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ func mapBackupConfig(params *ASBackupParams) (*backup.ConfigBackup, error) {
c.ModAfter = &modAfterTime
}

c.InfoRetryPolicy = mapRetryPolicy(
params.BackupParams.InfoRetryIntervalMilliseconds,
params.BackupParams.InfoRetriesMultiplier,
params.BackupParams.InfoMaxRetries,
)

return c, nil
}

Expand Down Expand Up @@ -148,6 +154,11 @@ func mapBackupXDRConfig(params *ASBackupParams) *backup.ConfigBackupXDR {
MaxConnections: params.BackupXDRParams.MaxConnections,
InfoPolingPeriodMilliseconds: params.BackupXDRParams.InfoPolingPeriodMilliseconds,
StartTimeoutMilliseconds: params.BackupXDRParams.StartTimeoutMilliseconds,
InfoRetryPolicy: mapRetryPolicy(
params.BackupXDRParams.InfoRetryIntervalMilliseconds,
params.BackupXDRParams.InfoRetriesMultiplier,
params.BackupXDRParams.InfoMaxRetries,
),
}

return c
Expand Down
5 changes: 3 additions & 2 deletions cmd/internal/app/reports.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ func printBackupReport(stats, xdrStats *bModels.BackupStats) {

fmt.Printf("Records Read: %d\n", stats.GetReadRecords())

var bw, fw uint64
var bw, fw, tr uint64
if xdrStats != nil {
bw = xdrStats.GetBytesWritten()
fw = xdrStats.GetFileCount()
tr = xdrStats.TotalRecords
fmt.Printf("Records Received: %d\n", xdrStats.GetReadRecords())
}

Expand All @@ -52,7 +53,7 @@ func printBackupReport(stats, xdrStats *bModels.BackupStats) {
fmt.Println()

fmt.Printf("Bytes Written: %d bytes\n", stats.GetBytesWritten()+bw)
fmt.Printf("Total Records: %d\n", stats.TotalRecords)
fmt.Printf("Total Records: %d\n", stats.TotalRecords+tr)
fmt.Printf("Files Written: %d\n", stats.GetFileCount()+fw)
}

Expand Down
46 changes: 46 additions & 0 deletions cmd/internal/app/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,59 @@ func validateBackup(params *ASBackupParams) error {
}
}

if params.BackupXDRParams != nil {
if err := validateBackupXDRParams(params.BackupXDRParams); err != nil {
return err
}
}

if err := validateStorages(params.AwsS3, params.GcpStorage, params.AzureBlob); err != nil {
return err
}

return nil
}

func validateBackupXDRParams(params *models.BackupXDR) error {
if params.ReadTimeoutMilliseconds < 0 {
return fmt.Errorf("backup xdr read timeout can't be negative")
}

if params.WriteTimeoutMilliseconds < 0 {
return fmt.Errorf("backup xdr write timeout can't be negative")
}

if params.InfoPolingPeriodMilliseconds < 0 {
return fmt.Errorf("backup xdr info poling period can't be negative")
}

if params.StartTimeoutMilliseconds < 0 {
return fmt.Errorf("backup xdr start timeout can't be negative")
}

if params.ResultQueueSize < 0 {
return fmt.Errorf("backup xdr result queue size can't be negative")
}

if params.AckQueueSize < 0 {
return fmt.Errorf("backup xdr ack queue size can't be negative")
}

if params.MaxConnections < 1 {
return fmt.Errorf("backup xdr max connections can't be less than 1")
}

if params.ParallelWrite < 0 {
return fmt.Errorf("backup xdr parallel write can't be negative")
}

if params.FileLimit < 1 {
return fmt.Errorf("backup xdr file limit can't be less than 1")
}

return nil
}

func validateRestore(params *ASRestoreParams) error {
if params.RestoreParams != nil && params.CommonParams != nil {
switch params.RestoreParams.Mode {
Expand Down
120 changes: 120 additions & 0 deletions cmd/internal/app/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,3 +654,123 @@ func TestValidateRestoreParams(t *testing.T) {
})
}
}

func Test_validateBackupXDRParams(t *testing.T) {
tests := []struct {
name string
params *models.BackupXDR
wantErr string
}{
{
name: "valid params",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 1000,
WriteTimeoutMilliseconds: 1000,
InfoPolingPeriodMilliseconds: 1000,
StartTimeoutMilliseconds: 1000,
ResultQueueSize: 100,
AckQueueSize: 100,
MaxConnections: 10,
ParallelWrite: 5,
FileLimit: 1000,
},
wantErr: "",
},
{
name: "negative read timeout",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: -1,
},
wantErr: "backup xdr read timeout can't be negative",
},
{
name: "negative write timeout",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: -1,
},
wantErr: "backup xdr write timeout can't be negative",
},
{
name: "negative info polling period",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: -1,
},
wantErr: "backup xdr info poling period can't be negative",
},
{
name: "negative start timeout",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: -1,
},
wantErr: "backup xdr start timeout can't be negative",
},
{
name: "negative result queue size",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: 0,
ResultQueueSize: -1,
},
wantErr: "backup xdr result queue size can't be negative",
},
{
name: "negative ack queue size",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: 0,
ResultQueueSize: 0,
AckQueueSize: -1,
},
wantErr: "backup xdr ack queue size can't be negative",
},
{
name: "invalid max connections",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: 0,
ResultQueueSize: 0,
AckQueueSize: 0,
MaxConnections: 0,
},
wantErr: "backup xdr max connections can't be less than 1",
},
{
name: "invalid file limit",
params: &models.BackupXDR{
ReadTimeoutMilliseconds: 0,
WriteTimeoutMilliseconds: 0,
InfoPolingPeriodMilliseconds: 0,
StartTimeoutMilliseconds: 0,
ResultQueueSize: 0,
AckQueueSize: 0,
MaxConnections: 1,
ParallelWrite: 1,
FileLimit: 0,
},
wantErr: "backup xdr file limit can't be less than 1",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateBackupXDRParams(tt.params)
if tt.wantErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, tt.wantErr)
}
})
}
}
10 changes: 10 additions & 0 deletions cmd/internal/flags/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ func (f *Backup) NewFlagSet() *pflag.FlagSet {
"Number of records will be read on one iteration for continuation backup.\n"+
"Affects size if overlap on resuming backup after an error.\n"+
"Used only with --state-file-dst or --continue.")
flagSet.Int64Var(&f.InfoRetryIntervalMilliseconds, "info-retry-timeout", 1000,
"Set the initial timeout for a retry in milliseconds when info commands are sent."+
"This parameter is applied to stop xdr and unblock MRT writes requests.")
flagSet.Float64Var(&f.InfoRetriesMultiplier, "info-retry-multiplier",
1,
"Used to increase the delay between subsequent retry attempts.\n"+
"The actual delay is calculated as: info-retry-timeout * (info-retry-multiplier ^ attemptNumber)")
flagSet.UintVar(&f.InfoMaxRetries, "info-max-retries", 3,
"How many times to retry to send info commands before failing. "+
"This parameter is applied to stop xdr and unblock MRT writes requests.")

return flagSet
}
Expand Down
10 changes: 10 additions & 0 deletions cmd/internal/flags/backup_xdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ func (f *BackupXDR) NewFlagSet() *pflag.FlagSet {
1000,
"How often (in milliseconds) a backup client will send info commands to check aerospike cluster stats.\n"+
"To measure recovery state and lag.")
flagSet.Int64Var(&f.InfoRetryIntervalMilliseconds, "info-retry-timeout", 1000,
"Set the initial timeout for a retry in milliseconds when info commands are sent."+
"This parameter is applied to stop xdr and unblock MRT writes requests.")
flagSet.Float64Var(&f.InfoRetriesMultiplier, "info-retry-multiplier",
1,
"Used to increase the delay between subsequent retry attempts.\n"+
"The actual delay is calculated as: info-retry-timeout * (info-retry-multiplier ^ attemptNumber)")
flagSet.UintVar(&f.InfoMaxRetries, "info-max-retries", 3,
"How many times to retry to send info commands before failing. "+
"This parameter is applied to stop xdr and unblock MRT writes requests.")
flagSet.Int64Var(&f.StartTimeoutMilliseconds, "start-timeout",
30000,
"Timeout for starting TCP server for XDR.\n"+
Expand Down
4 changes: 4 additions & 0 deletions cmd/internal/models/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type Backup struct {
Continue string
ScanPageSize int64
OutputFilePrefix string

InfoMaxRetries uint
InfoRetriesMultiplier float64
InfoRetryIntervalMilliseconds int64
}

// ShouldClearTarget check if we should clean target directory.
Expand Down
4 changes: 4 additions & 0 deletions cmd/internal/models/backup_xdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ type BackupXDR struct {

StopXDR bool
UnblockMRT bool

InfoMaxRetries uint
InfoRetriesMultiplier float64
InfoRetryIntervalMilliseconds int64
}
Loading

0 comments on commit fcff047

Please sign in to comment.