From 9862e4aa1012b7775bab571d6821b3c6863f977e Mon Sep 17 00:00:00 2001 From: Dmitrii Neeman Date: Mon, 27 Jan 2025 10:25:23 +0200 Subject: [PATCH 1/9] FMWK-658-retry-info-commands - retry for stop xdr and unblock mrt commands --- cmd/internal/app/asbackup.go | 34 ++++--- cmd/internal/app/validation.go | 50 +++++++++ cmd/internal/app/validation_test.go | 151 ++++++++++++++++++++++++++++ cmd/internal/flags/backup_xdr.go | 3 + cmd/internal/models/backup_xdr.go | 1 + 5 files changed, 227 insertions(+), 12 deletions(-) diff --git a/cmd/internal/app/asbackup.go b/cmd/internal/app/asbackup.go index f789803b..3f0c489a 100644 --- a/cmd/internal/app/asbackup.go +++ b/cmd/internal/app/asbackup.go @@ -137,8 +137,8 @@ func NewASBackup( if params.isStopXDR() { logger.Info("stopping XDR on the database") - if err := stopXDR(aerospikeClient, backupXDRConfig); err != nil { - return nil, err + if err = stopXDR(aerospikeClient, backupXDRConfig, params.BackupXDRParams.InfoRetryAttempts); err != nil { + return nil, fmt.Errorf("failed to stop XDR: %w", err) } return nil, nil @@ -148,8 +148,8 @@ func NewASBackup( if params.isUnblockMRT() { logger.Info("enabling MRT writes on the database") - if err := unblockMRT(aerospikeClient, backupXDRConfig); err != nil { - return nil, err + if err = unblockMRT(aerospikeClient, backupXDRConfig, params.BackupXDRParams.InfoRetryAttempts); err != nil { + return nil, fmt.Errorf("failed to enable MRT: %w", err) } return nil, nil @@ -309,24 +309,34 @@ func getSecretAgent(b *backup.ConfigBackup, bxdr *backup.ConfigBackupXDR) *backu } } -func stopXDR(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error { +func stopXDR(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR, attempts int) error { infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy) - if err := infoClient.StopXDR(cfg.DC); err != nil { - return fmt.Errorf("failed to stop xdr: %w", err) + var err error + + for range attempts { + err = infoClient.StopXDR(cfg.DC) + if err == nil { + return nil + } } - return nil + return err } -func unblockMRT(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error { +func unblockMRT(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR, attempts int) error { infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy) - if err := infoClient.UnBlockMRTWrites(cfg.Namespace); err != nil { - return fmt.Errorf("failed to unblock MRT: %w", err) + var err error + + for range attempts { + err = infoClient.UnBlockMRTWrites(cfg.Namespace) + if err == nil { + return nil + } } - return nil + return err } func checkVersion(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error { diff --git a/cmd/internal/app/validation.go b/cmd/internal/app/validation.go index 1f7187e6..24d29ce7 100644 --- a/cmd/internal/app/validation.go +++ b/cmd/internal/app/validation.go @@ -37,6 +37,12 @@ func validateBackup(params *ASBackupParams) error { } } + if params.BackupXDRParams != nil { + if err := validateBackupXDRParams(params.BackupXDRParams); err != nil { + return err + } + } + if err := validateStorages(params.AwsS3, params.GcpStorage, params.AzureBlob); err != nil { return err } @@ -44,6 +50,50 @@ func validateBackup(params *ASBackupParams) error { return nil } +func validateBackupXDRParams(params *models.BackupXDR) error { + if params.InfoRetryAttempts < 1 { + return fmt.Errorf("backup xdr retry attempts can't be less than 1") + } + + if params.ReadTimeoutMilliseconds < 0 { + return fmt.Errorf("backup xdr read timeout can't be negative") + } + + if params.WriteTimeoutMilliseconds < 0 { + return fmt.Errorf("backup xdr write timeout can't be negative") + } + + if params.InfoPolingPeriodMilliseconds < 0 { + return fmt.Errorf("backup xdr info poling period can't be negative") + } + + if params.StartTimeoutMilliseconds < 0 { + return fmt.Errorf("backup xdr start timeout can't be negative") + } + + if params.ResultQueueSize < 0 { + return fmt.Errorf("backup xdr result queue size can't be negative") + } + + if params.AckQueueSize < 0 { + return fmt.Errorf("backup xdr ack queue size can't be negative") + } + + if params.MaxConnections < 1 { + return fmt.Errorf("backup xdr max connections can't be less than 1") + } + + if params.ParallelWrite < 1 { + return fmt.Errorf("backup xdr parallel write can't be less than 1") + } + + if params.FileLimit < 1 { + return fmt.Errorf("backup xdr file limit can't be less than 1") + } + + return nil +} + func validateRestore(params *ASRestoreParams) error { if params.RestoreParams != nil && params.CommonParams != nil { switch params.RestoreParams.Mode { diff --git a/cmd/internal/app/validation_test.go b/cmd/internal/app/validation_test.go index 9e691090..2781fe31 100644 --- a/cmd/internal/app/validation_test.go +++ b/cmd/internal/app/validation_test.go @@ -654,3 +654,154 @@ func TestValidateRestoreParams(t *testing.T) { }) } } + +func Test_validateBackupXDRParams(t *testing.T) { + tests := []struct { + name string + params *models.BackupXDR + wantErr string + }{ + { + name: "valid params", + params: &models.BackupXDR{ + InfoRetryAttempts: 3, + ReadTimeoutMilliseconds: 1000, + WriteTimeoutMilliseconds: 1000, + InfoPolingPeriodMilliseconds: 1000, + StartTimeoutMilliseconds: 1000, + ResultQueueSize: 100, + AckQueueSize: 100, + MaxConnections: 10, + ParallelWrite: 5, + FileLimit: 1000, + }, + wantErr: "", + }, + { + name: "invalid info retry attempts", + params: &models.BackupXDR{ + InfoRetryAttempts: 0, + }, + wantErr: "backup xdr retry attempts can't be less than 1", + }, + { + name: "negative read timeout", + params: &models.BackupXDR{ + InfoRetryAttempts: 1, + ReadTimeoutMilliseconds: -1, + }, + wantErr: "backup xdr read timeout can't be negative", + }, + { + name: "negative write timeout", + params: &models.BackupXDR{ + InfoRetryAttempts: 1, + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: -1, + }, + wantErr: "backup xdr write timeout can't be negative", + }, + { + name: "negative info polling period", + params: &models.BackupXDR{ + InfoRetryAttempts: 1, + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: 0, + InfoPolingPeriodMilliseconds: -1, + }, + wantErr: "backup xdr info poling period can't be negative", + }, + { + name: "negative start timeout", + params: &models.BackupXDR{ + InfoRetryAttempts: 1, + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: 0, + InfoPolingPeriodMilliseconds: 0, + StartTimeoutMilliseconds: -1, + }, + wantErr: "backup xdr start timeout can't be negative", + }, + { + name: "negative result queue size", + params: &models.BackupXDR{ + InfoRetryAttempts: 1, + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: 0, + InfoPolingPeriodMilliseconds: 0, + StartTimeoutMilliseconds: 0, + ResultQueueSize: -1, + }, + wantErr: "backup xdr result queue size can't be negative", + }, + { + name: "negative ack queue size", + params: &models.BackupXDR{ + InfoRetryAttempts: 1, + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: 0, + InfoPolingPeriodMilliseconds: 0, + StartTimeoutMilliseconds: 0, + ResultQueueSize: 0, + AckQueueSize: -1, + }, + wantErr: "backup xdr ack queue size can't be negative", + }, + { + name: "invalid max connections", + params: &models.BackupXDR{ + InfoRetryAttempts: 1, + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: 0, + InfoPolingPeriodMilliseconds: 0, + StartTimeoutMilliseconds: 0, + ResultQueueSize: 0, + AckQueueSize: 0, + MaxConnections: 0, + }, + wantErr: "backup xdr max connections can't be less than 1", + }, + { + name: "invalid parallel write", + params: &models.BackupXDR{ + InfoRetryAttempts: 1, + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: 0, + InfoPolingPeriodMilliseconds: 0, + StartTimeoutMilliseconds: 0, + ResultQueueSize: 0, + AckQueueSize: 0, + MaxConnections: 1, + ParallelWrite: 0, + }, + wantErr: "backup xdr parallel write can't be less than 1", + }, + { + name: "invalid file limit", + params: &models.BackupXDR{ + InfoRetryAttempts: 1, + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: 0, + InfoPolingPeriodMilliseconds: 0, + StartTimeoutMilliseconds: 0, + ResultQueueSize: 0, + AckQueueSize: 0, + MaxConnections: 1, + ParallelWrite: 1, + FileLimit: 0, + }, + wantErr: "backup xdr file limit can't be less than 1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateBackupXDRParams(tt.params) + if tt.wantErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tt.wantErr) + } + }) + } +} diff --git a/cmd/internal/flags/backup_xdr.go b/cmd/internal/flags/backup_xdr.go index de19a693..dae05384 100644 --- a/cmd/internal/flags/backup_xdr.go +++ b/cmd/internal/flags/backup_xdr.go @@ -85,6 +85,9 @@ func (f *BackupXDR) NewFlagSet() *pflag.FlagSet { 1000, "How often (in milliseconds) a backup client will send info commands to check aerospike cluster stats.\n"+ "To measure recovery state and lag.") + flagSet.IntVar(&f.InfoRetryAttempts, "info-retry-attempts", 1, + "How many times to retry to send info commands before failing. "+ + "This parameter is applied to stop xdr and unblock MRT writes requests.") flagSet.Int64Var(&f.StartTimeoutMilliseconds, "start-timeout", 30000, "Timeout for starting TCP server for XDR.\n"+ diff --git a/cmd/internal/models/backup_xdr.go b/cmd/internal/models/backup_xdr.go index b8e6be9a..f86c56c7 100644 --- a/cmd/internal/models/backup_xdr.go +++ b/cmd/internal/models/backup_xdr.go @@ -32,6 +32,7 @@ type BackupXDR struct { AckQueueSize int MaxConnections int InfoPolingPeriodMilliseconds int64 + InfoRetryAttempts int StartTimeoutMilliseconds int64 TLSConfig *tls.Config From 68cf861d8be1c93e1b138af8feee73a31280c65f Mon Sep 17 00:00:00 2001 From: Dmitrii Neeman Date: Mon, 27 Jan 2025 12:18:10 +0200 Subject: [PATCH 2/9] wip --- cmd/internal/flags/backup_xdr.go | 5 ++++- cmd/internal/models/backup_xdr.go | 35 ++++++++++++++++--------------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/cmd/internal/flags/backup_xdr.go b/cmd/internal/flags/backup_xdr.go index dae05384..b8e97123 100644 --- a/cmd/internal/flags/backup_xdr.go +++ b/cmd/internal/flags/backup_xdr.go @@ -85,9 +85,12 @@ func (f *BackupXDR) NewFlagSet() *pflag.FlagSet { 1000, "How often (in milliseconds) a backup client will send info commands to check aerospike cluster stats.\n"+ "To measure recovery state and lag.") - flagSet.IntVar(&f.InfoRetryAttempts, "info-retry-attempts", 1, + flagSet.IntVar(&f.InfoRetryAttempts, "info-retry-attempts", 3, "How many times to retry to send info commands before failing. "+ "This parameter is applied to stop xdr and unblock MRT writes requests.") + flagSet.Int64Var(&f.InfoRetryIntervalMilliseconds, "info-retry-interval", 1000, + "Delay between retrying to send info commands."+ + "This parameter is applied to stop xdr and unblock MRT writes requests.") flagSet.Int64Var(&f.StartTimeoutMilliseconds, "start-timeout", 30000, "Timeout for starting TCP server for XDR.\n"+ diff --git a/cmd/internal/models/backup_xdr.go b/cmd/internal/models/backup_xdr.go index f86c56c7..6bcddd0d 100644 --- a/cmd/internal/models/backup_xdr.go +++ b/cmd/internal/models/backup_xdr.go @@ -17,23 +17,24 @@ package models import "crypto/tls" type BackupXDR struct { - Directory string - FileLimit int64 - RemoveFiles bool - ParallelWrite int - DC string - LocalAddress string - LocalPort int - Namespace string - Rewind string - ReadTimeoutMilliseconds int64 - WriteTimeoutMilliseconds int64 - ResultQueueSize int - AckQueueSize int - MaxConnections int - InfoPolingPeriodMilliseconds int64 - InfoRetryAttempts int - StartTimeoutMilliseconds int64 + Directory string + FileLimit int64 + RemoveFiles bool + ParallelWrite int + DC string + LocalAddress string + LocalPort int + Namespace string + Rewind string + ReadTimeoutMilliseconds int64 + WriteTimeoutMilliseconds int64 + ResultQueueSize int + AckQueueSize int + MaxConnections int + InfoPolingPeriodMilliseconds int64 + InfoRetryAttempts int + InfoRetryIntervalMilliseconds int64 + StartTimeoutMilliseconds int64 TLSConfig *tls.Config From 9fa032a28c02d9b54c791d4b92790ff53e0ed393 Mon Sep 17 00:00:00 2001 From: Dmitrii Neeman Date: Mon, 27 Jan 2025 15:15:20 +0200 Subject: [PATCH 3/9] wip --- internal/asinfo/info_client.go | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/internal/asinfo/info_client.go b/internal/asinfo/info_client.go index 90e6a5bd..95e06ecc 100644 --- a/internal/asinfo/info_client.go +++ b/internal/asinfo/info_client.go @@ -18,9 +18,11 @@ import ( "encoding/base64" "errors" "fmt" + "math" "regexp" "strconv" "strings" + "time" a "github.com/aerospike/aerospike-client-go/v7" "github.com/aerospike/backup-go/models" @@ -99,14 +101,17 @@ type aerospikeClient interface { } type InfoClient struct { - policy *a.InfoPolicy - cluster *a.Cluster + policy *a.InfoPolicy + cluster *a.Cluster + retryPolicy *models.RetryPolicy } -func NewInfoClientFromAerospike(aeroClient aerospikeClient, policy *a.InfoPolicy) *InfoClient { +func NewInfoClientFromAerospike(aeroClient aerospikeClient, policy *a.InfoPolicy, retryPolicy *models.RetryPolicy, +) *InfoClient { return &InfoClient{ - cluster: aeroClient.Cluster(), - policy: policy, + cluster: aeroClient.Cluster(), + policy: policy, + retryPolicy: retryPolicy, } } @@ -867,3 +872,22 @@ func parseUDFListResponse(resp string) ([]infoMap, error) { func parseUDFGetResponse(resp string) (infoMap, error) { return parseInfoObject(resp, ";", "=") } + +func executeWithRetry(policy *models.RetryPolicy, command func() error) error { + if policy == nil { + return fmt.Errorf("retry policy cannot be nil") + } + + var err error + for i := range policy.MaxRetries { + err = command() + if err == nil { + return nil + } + + duration := time.Duration(float64(policy.BaseTimeout) * math.Pow(policy.Multiplier, float64(i))) + time.Sleep(duration) + } + + return fmt.Errorf("after %d attempts: %w", policy.MaxRetries, err) +} From bf92672fb6ad22bc55acb6f3d3a48905e704ae3c Mon Sep 17 00:00:00 2001 From: Dmitrii Neeman Date: Tue, 28 Jan 2025 10:17:17 +0200 Subject: [PATCH 4/9] FMWK-658-retry-info-commands - reworked reties --- cmd/internal/app/asbackup.go | 62 ++++------ cmd/internal/app/configs.go | 11 ++ cmd/internal/app/reports.go | 5 +- cmd/internal/app/validation.go | 6 +- cmd/internal/app/validation_test.go | 17 --- cmd/internal/flags/backup.go | 10 ++ cmd/internal/flags/backup_xdr.go | 12 +- cmd/internal/models/backup.go | 4 + cmd/internal/models/backup_xdr.go | 38 +++--- config_backup.go | 3 + config_backup_xdr.go | 3 + config_restore.go | 2 + handler_backup.go | 2 +- handler_backup_xdr.go | 10 +- internal/asinfo/info_client.go | 174 ++++++++++++++++++++-------- internal/asinfo/info_client_test.go | 7 +- models/retry_policy.go | 5 + processor_record_writer.go | 2 +- tests/test_client.go | 4 +- 19 files changed, 234 insertions(+), 143 deletions(-) diff --git a/cmd/internal/app/asbackup.go b/cmd/internal/app/asbackup.go index 3f0c489a..0c8fdcef 100644 --- a/cmd/internal/app/asbackup.go +++ b/cmd/internal/app/asbackup.go @@ -131,28 +131,28 @@ func NewASBackup( if err := checkVersion(aerospikeClient, backupXDRConfig); err != nil { return nil, err } - } - // Stop xdr. - if params.isStopXDR() { - logger.Info("stopping XDR on the database") + // Stop xdr. + if params.isStopXDR() { + logger.Info("stopping XDR on the database") + + if err = stopXDR(aerospikeClient, backupXDRConfig); err != nil { + return nil, fmt.Errorf("failed to stop XDR: %w", err) + } - if err = stopXDR(aerospikeClient, backupXDRConfig, params.BackupXDRParams.InfoRetryAttempts); err != nil { - return nil, fmt.Errorf("failed to stop XDR: %w", err) + return nil, nil } - return nil, nil - } + // Unblock mRT. + if params.isUnblockMRT() { + logger.Info("enabling MRT writes on the database") - // Unblock mRT. - if params.isUnblockMRT() { - logger.Info("enabling MRT writes on the database") + if err = unblockMRT(aerospikeClient, backupXDRConfig); err != nil { + return nil, fmt.Errorf("failed to enable MRT: %w", err) + } - if err = unblockMRT(aerospikeClient, backupXDRConfig, params.BackupXDRParams.InfoRetryAttempts); err != nil { - return nil, fmt.Errorf("failed to enable MRT: %w", err) + return nil, nil } - - return nil, nil } backupClient, err := backup.NewClient(aerospikeClient, backup.WithLogger(logger), backup.WithID(idBackup)) @@ -309,38 +309,20 @@ func getSecretAgent(b *backup.ConfigBackup, bxdr *backup.ConfigBackupXDR) *backu } } -func stopXDR(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR, attempts int) error { - infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy) +func stopXDR(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error { + infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy, cfg.InfoRetryPolicy) - var err error - - for range attempts { - err = infoClient.StopXDR(cfg.DC) - if err == nil { - return nil - } - } - - return err + return infoClient.StopXDR(cfg.DC) } -func unblockMRT(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR, attempts int) error { - infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy) - - var err error - - for range attempts { - err = infoClient.UnBlockMRTWrites(cfg.Namespace) - if err == nil { - return nil - } - } +func unblockMRT(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error { + infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy, cfg.InfoRetryPolicy) - return err + return infoClient.UnBlockMRTWrites(cfg.Namespace) } func checkVersion(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error { - infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy) + infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy, cfg.InfoRetryPolicy) version, err := infoClient.GetVersion() if err != nil { diff --git a/cmd/internal/app/configs.go b/cmd/internal/app/configs.go index 873f3f41..776b9d15 100644 --- a/cmd/internal/app/configs.go +++ b/cmd/internal/app/configs.go @@ -118,6 +118,12 @@ func mapBackupConfig(params *ASBackupParams) (*backup.ConfigBackup, error) { c.ModAfter = &modAfterTime } + c.InfoRetryPolicy = mapRetryPolicy( + params.BackupParams.InfoRetryIntervalMilliseconds, + params.BackupParams.InfoRetriesMultiplier, + params.BackupParams.InfoMaxRetries, + ) + return c, nil } @@ -148,6 +154,11 @@ func mapBackupXDRConfig(params *ASBackupParams) *backup.ConfigBackupXDR { MaxConnections: params.BackupXDRParams.MaxConnections, InfoPolingPeriodMilliseconds: params.BackupXDRParams.InfoPolingPeriodMilliseconds, StartTimeoutMilliseconds: params.BackupXDRParams.StartTimeoutMilliseconds, + InfoRetryPolicy: mapRetryPolicy( + params.BackupXDRParams.InfoRetryIntervalMilliseconds, + params.BackupXDRParams.InfoRetriesMultiplier, + params.BackupXDRParams.InfoMaxRetries, + ), } return c diff --git a/cmd/internal/app/reports.go b/cmd/internal/app/reports.go index 298c0949..e1a0d41e 100644 --- a/cmd/internal/app/reports.go +++ b/cmd/internal/app/reports.go @@ -39,10 +39,11 @@ func printBackupReport(stats, xdrStats *bModels.BackupStats) { fmt.Printf("Records Read: %d\n", stats.GetReadRecords()) - var bw, fw uint64 + var bw, fw, tr uint64 if xdrStats != nil { bw = xdrStats.GetBytesWritten() fw = xdrStats.GetFileCount() + tr = xdrStats.TotalRecords fmt.Printf("Records Received: %d\n", xdrStats.GetReadRecords()) } @@ -52,7 +53,7 @@ func printBackupReport(stats, xdrStats *bModels.BackupStats) { fmt.Println() fmt.Printf("Bytes Written: %d bytes\n", stats.GetBytesWritten()+bw) - fmt.Printf("Total Records: %d\n", stats.TotalRecords) + fmt.Printf("Total Records: %d\n", stats.TotalRecords+tr) fmt.Printf("Files Written: %d\n", stats.GetFileCount()+fw) } diff --git a/cmd/internal/app/validation.go b/cmd/internal/app/validation.go index 67b15ece..a8375388 100644 --- a/cmd/internal/app/validation.go +++ b/cmd/internal/app/validation.go @@ -51,10 +51,6 @@ func validateBackup(params *ASBackupParams) error { } func validateBackupXDRParams(params *models.BackupXDR) error { - if params.InfoRetryAttempts < 1 { - return fmt.Errorf("backup xdr retry attempts can't be less than 1") - } - if params.ReadTimeoutMilliseconds < 0 { return fmt.Errorf("backup xdr read timeout can't be negative") } @@ -83,7 +79,7 @@ func validateBackupXDRParams(params *models.BackupXDR) error { return fmt.Errorf("backup xdr max connections can't be less than 1") } - if params.ParallelWrite < 1 { + if params.ParallelWrite < 0 { return fmt.Errorf("backup xdr parallel write can't be less than 1") } diff --git a/cmd/internal/app/validation_test.go b/cmd/internal/app/validation_test.go index 2781fe31..5aa1736a 100644 --- a/cmd/internal/app/validation_test.go +++ b/cmd/internal/app/validation_test.go @@ -664,7 +664,6 @@ func Test_validateBackupXDRParams(t *testing.T) { { name: "valid params", params: &models.BackupXDR{ - InfoRetryAttempts: 3, ReadTimeoutMilliseconds: 1000, WriteTimeoutMilliseconds: 1000, InfoPolingPeriodMilliseconds: 1000, @@ -677,17 +676,9 @@ func Test_validateBackupXDRParams(t *testing.T) { }, wantErr: "", }, - { - name: "invalid info retry attempts", - params: &models.BackupXDR{ - InfoRetryAttempts: 0, - }, - wantErr: "backup xdr retry attempts can't be less than 1", - }, { name: "negative read timeout", params: &models.BackupXDR{ - InfoRetryAttempts: 1, ReadTimeoutMilliseconds: -1, }, wantErr: "backup xdr read timeout can't be negative", @@ -695,7 +686,6 @@ func Test_validateBackupXDRParams(t *testing.T) { { name: "negative write timeout", params: &models.BackupXDR{ - InfoRetryAttempts: 1, ReadTimeoutMilliseconds: 0, WriteTimeoutMilliseconds: -1, }, @@ -704,7 +694,6 @@ func Test_validateBackupXDRParams(t *testing.T) { { name: "negative info polling period", params: &models.BackupXDR{ - InfoRetryAttempts: 1, ReadTimeoutMilliseconds: 0, WriteTimeoutMilliseconds: 0, InfoPolingPeriodMilliseconds: -1, @@ -714,7 +703,6 @@ func Test_validateBackupXDRParams(t *testing.T) { { name: "negative start timeout", params: &models.BackupXDR{ - InfoRetryAttempts: 1, ReadTimeoutMilliseconds: 0, WriteTimeoutMilliseconds: 0, InfoPolingPeriodMilliseconds: 0, @@ -725,7 +713,6 @@ func Test_validateBackupXDRParams(t *testing.T) { { name: "negative result queue size", params: &models.BackupXDR{ - InfoRetryAttempts: 1, ReadTimeoutMilliseconds: 0, WriteTimeoutMilliseconds: 0, InfoPolingPeriodMilliseconds: 0, @@ -737,7 +724,6 @@ func Test_validateBackupXDRParams(t *testing.T) { { name: "negative ack queue size", params: &models.BackupXDR{ - InfoRetryAttempts: 1, ReadTimeoutMilliseconds: 0, WriteTimeoutMilliseconds: 0, InfoPolingPeriodMilliseconds: 0, @@ -750,7 +736,6 @@ func Test_validateBackupXDRParams(t *testing.T) { { name: "invalid max connections", params: &models.BackupXDR{ - InfoRetryAttempts: 1, ReadTimeoutMilliseconds: 0, WriteTimeoutMilliseconds: 0, InfoPolingPeriodMilliseconds: 0, @@ -764,7 +749,6 @@ func Test_validateBackupXDRParams(t *testing.T) { { name: "invalid parallel write", params: &models.BackupXDR{ - InfoRetryAttempts: 1, ReadTimeoutMilliseconds: 0, WriteTimeoutMilliseconds: 0, InfoPolingPeriodMilliseconds: 0, @@ -779,7 +763,6 @@ func Test_validateBackupXDRParams(t *testing.T) { { name: "invalid file limit", params: &models.BackupXDR{ - InfoRetryAttempts: 1, ReadTimeoutMilliseconds: 0, WriteTimeoutMilliseconds: 0, InfoPolingPeriodMilliseconds: 0, diff --git a/cmd/internal/flags/backup.go b/cmd/internal/flags/backup.go index 56782c33..008be0f0 100644 --- a/cmd/internal/flags/backup.go +++ b/cmd/internal/flags/backup.go @@ -146,6 +146,16 @@ func (f *Backup) NewFlagSet() *pflag.FlagSet { "Number of records will be read on one iteration for continuation backup.\n"+ "Affects size if overlap on resuming backup after an error.\n"+ "Used only with --state-file-dst or --continue.") + flagSet.Int64Var(&f.InfoRetryIntervalMilliseconds, "info-retry-timeout", 1000, + "Set the initial timeout for a retry in milliseconds when info commands are sent."+ + "This parameter is applied to stop xdr and unblock MRT writes requests.") + flagSet.Float64Var(&f.InfoRetriesMultiplier, "info-retry-multiplier", + 1, + "Used to increase the delay between subsequent retry attempts.\n"+ + "The actual delay is calculated as: retry-base-timeout * (retry-multiplier ^ attemptNumber)") + flagSet.UintVar(&f.InfoMaxRetries, "info-max-retries", 3, + "How many times to retry to send info commands before failing. "+ + "This parameter is applied to stop xdr and unblock MRT writes requests.") return flagSet } diff --git a/cmd/internal/flags/backup_xdr.go b/cmd/internal/flags/backup_xdr.go index ad6f54fc..1b19b9c3 100644 --- a/cmd/internal/flags/backup_xdr.go +++ b/cmd/internal/flags/backup_xdr.go @@ -82,11 +82,15 @@ func (f *BackupXDR) NewFlagSet() *pflag.FlagSet { 1000, "How often (in milliseconds) a backup client will send info commands to check aerospike cluster stats.\n"+ "To measure recovery state and lag.") - flagSet.IntVar(&f.InfoRetryAttempts, "info-retry-attempts", 3, - "How many times to retry to send info commands before failing. "+ + flagSet.Int64Var(&f.InfoRetryIntervalMilliseconds, "info-retry-timeout", 1000, + "Set the initial timeout for a retry in milliseconds when info commands are sent."+ "This parameter is applied to stop xdr and unblock MRT writes requests.") - flagSet.Int64Var(&f.InfoRetryIntervalMilliseconds, "info-retry-interval", 1000, - "Delay between retrying to send info commands."+ + flagSet.Float64Var(&f.InfoRetriesMultiplier, "info-retry-multiplier", + 1, + "Used to increase the delay between subsequent retry attempts.\n"+ + "The actual delay is calculated as: retry-base-timeout * (retry-multiplier ^ attemptNumber)") + flagSet.UintVar(&f.InfoMaxRetries, "info-max-retries", 3, + "How many times to retry to send info commands before failing. "+ "This parameter is applied to stop xdr and unblock MRT writes requests.") flagSet.Int64Var(&f.StartTimeoutMilliseconds, "start-timeout", 30000, diff --git a/cmd/internal/models/backup.go b/cmd/internal/models/backup.go index 7d8205b3..b5b87083 100644 --- a/cmd/internal/models/backup.go +++ b/cmd/internal/models/backup.go @@ -38,6 +38,10 @@ type Backup struct { Continue string ScanPageSize int64 OutputFilePrefix string + + InfoMaxRetries uint + InfoRetriesMultiplier float64 + InfoRetryIntervalMilliseconds int64 } // ShouldClearTarget check if we should clean target directory. diff --git a/cmd/internal/models/backup_xdr.go b/cmd/internal/models/backup_xdr.go index 6bcddd0d..f2c5182c 100644 --- a/cmd/internal/models/backup_xdr.go +++ b/cmd/internal/models/backup_xdr.go @@ -17,27 +17,29 @@ package models import "crypto/tls" type BackupXDR struct { - Directory string - FileLimit int64 - RemoveFiles bool - ParallelWrite int - DC string - LocalAddress string - LocalPort int - Namespace string - Rewind string - ReadTimeoutMilliseconds int64 - WriteTimeoutMilliseconds int64 - ResultQueueSize int - AckQueueSize int - MaxConnections int - InfoPolingPeriodMilliseconds int64 - InfoRetryAttempts int - InfoRetryIntervalMilliseconds int64 - StartTimeoutMilliseconds int64 + Directory string + FileLimit int64 + RemoveFiles bool + ParallelWrite int + DC string + LocalAddress string + LocalPort int + Namespace string + Rewind string + ReadTimeoutMilliseconds int64 + WriteTimeoutMilliseconds int64 + ResultQueueSize int + AckQueueSize int + MaxConnections int + InfoPolingPeriodMilliseconds int64 + StartTimeoutMilliseconds int64 TLSConfig *tls.Config StopXDR bool UnblockMRT bool + + InfoMaxRetries uint + InfoRetriesMultiplier float64 + InfoRetryIntervalMilliseconds int64 } diff --git a/config_backup.go b/config_backup.go index ee17c012..9264859c 100644 --- a/config_backup.go +++ b/config_backup.go @@ -19,6 +19,7 @@ import ( "time" a "github.com/aerospike/aerospike-client-go/v7" + "github.com/aerospike/backup-go/models" "github.com/aerospike/backup-go/pipeline" ) @@ -125,6 +126,8 @@ type ConfigBackup struct { PipelinesMode pipeline.Mode // When using directory parameter, prepend a prefix to the names of the generated files. OutputFilePrefix string + // Retry policy for info commands. + InfoRetryPolicy *models.RetryPolicy } // NewDefaultBackupConfig returns a new ConfigBackup with default values. diff --git a/config_backup_xdr.go b/config_backup_xdr.go index 2f635b75..9dac17ff 100644 --- a/config_backup_xdr.go +++ b/config_backup_xdr.go @@ -20,6 +20,7 @@ import ( "strconv" a "github.com/aerospike/aerospike-client-go/v7" + "github.com/aerospike/backup-go/models" ) // ConfigBackupXDR contains configuration for the xdr backup operation. @@ -78,6 +79,8 @@ type ConfigBackupXDR struct { // If the TCP server for XDR does not receive any data within this timeout period, it will shut down. // This situation can occur if the LocalAddress and LocalPort options are misconfigured. StartTimeoutMilliseconds int64 + // Retry policy for info commands. + InfoRetryPolicy *models.RetryPolicy } func (c *ConfigBackupXDR) validate() error { diff --git a/config_restore.go b/config_restore.go index 8f70f7cc..9f790c8a 100644 --- a/config_restore.go +++ b/config_restore.go @@ -75,6 +75,8 @@ type ConfigRestore struct { // E.g.: AEROSPIKE_RECORD_TOO_BIG. // By default, such errors are not ignored and restore terminates. IgnoreRecordError bool + // Retry policy for info commands. + InfoRetryPolicy *models.RetryPolicy } // NewDefaultRestoreConfig returns a new ConfigRestore with default values. diff --git a/handler_backup.go b/handler_backup.go index 7fb6b82f..11c3338c 100644 --- a/handler_backup.go +++ b/handler_backup.go @@ -135,7 +135,7 @@ func newBackupHandler( firstFileHeaderWritten: &atomic.Bool{}, encoder: NewEncoder[*models.Token](config.EncoderType, config.Namespace, config.Compact), limiter: limiter, - infoClient: asinfo.NewInfoClientFromAerospike(ac, config.InfoPolicy), + infoClient: asinfo.NewInfoClientFromAerospike(ac, config.InfoPolicy, config.InfoRetryPolicy), scanLimiter: scanLimiter, state: state, }, nil diff --git a/handler_backup_xdr.go b/handler_backup_xdr.go index 1cc986e5..5c6dfd84 100644 --- a/handler_backup_xdr.go +++ b/handler_backup_xdr.go @@ -67,7 +67,7 @@ func newBackupXDRHandler( stats := models.NewBackupStats() - infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, config.InfoPolicy) + infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, config.InfoPolicy, config.InfoRetryPolicy) readProcessor := newRecordReaderProcessor[*models.ASBXToken]( config, @@ -123,6 +123,14 @@ func (bh *HandlerBackupXDR) run() { } func (bh *HandlerBackupXDR) backup(ctx context.Context) error { + // Count total records. + var err error + + bh.stats.TotalRecords, err = bh.infoClient.GetRecordCount(bh.config.Namespace, nil) + if err != nil { + return fmt.Errorf("failed to get records count: %w", err) + } + // Read workers. readWorkers, err := bh.readProcessor.newReadWorkersXDR(ctx) if err != nil { diff --git a/internal/asinfo/info_client.go b/internal/asinfo/info_client.go index 95e06ecc..0712ea04 100644 --- a/internal/asinfo/info_client.go +++ b/internal/asinfo/info_client.go @@ -108,6 +108,10 @@ type InfoClient struct { func NewInfoClientFromAerospike(aeroClient aerospikeClient, policy *a.InfoPolicy, retryPolicy *models.RetryPolicy, ) *InfoClient { + if retryPolicy == nil { + retryPolicy = models.NewDefaultRetryPolicy() + } + return &InfoClient{ cluster: aeroClient.Cluster(), policy: policy, @@ -116,82 +120,145 @@ func NewInfoClientFromAerospike(aeroClient aerospikeClient, policy *a.InfoPolicy } func (ic *InfoClient) GetInfo(names ...string) (map[string]string, error) { - node, err := ic.cluster.GetRandomNode() - if err != nil { - return nil, err - } + var result map[string]string - return node.RequestInfo(ic.policy, names...) + err := executeWithRetry(ic.retryPolicy, func() error { + node, err := ic.cluster.GetRandomNode() + if err != nil { + return err + } + + result, err = node.RequestInfo(ic.policy, names...) + + return err + }) + + return result, err } func (ic *InfoClient) GetVersion() (AerospikeVersion, error) { - node, err := ic.cluster.GetRandomNode() - if err != nil { - return AerospikeVersion{}, err - } + var ( + version AerospikeVersion + aErr error + ) + + err := executeWithRetry(ic.retryPolicy, func() error { + node, err := ic.cluster.GetRandomNode() + if err != nil { + return err + } - return getAerospikeVersion(node, ic.policy) + version, aErr = getAerospikeVersion(node, ic.policy) + + return aErr + }) + + return version, err } func (ic *InfoClient) GetSIndexes(namespace string) ([]*models.SIndex, error) { - node, err := ic.cluster.GetRandomNode() - if err != nil { - return nil, err - } + var ( + indexes []*models.SIndex + aErr error + ) + + err := executeWithRetry(ic.retryPolicy, func() error { + node, err := ic.cluster.GetRandomNode() + if err != nil { + return err + } + + indexes, aErr = getSIndexes(node, namespace, ic.policy) + + return aErr + }) - return getSIndexes(node, namespace, ic.policy) + return indexes, err } func (ic *InfoClient) GetUDFs() ([]*models.UDF, error) { - node, err := ic.cluster.GetRandomNode() - if err != nil { - return nil, err - } + var ( + udfs []*models.UDF + aErr error + ) - return getUDFs(node, ic.policy) + err := executeWithRetry(ic.retryPolicy, func() error { + node, err := ic.cluster.GetRandomNode() + if err != nil { + return err + } + + udfs, aErr = getUDFs(node, ic.policy) + + return aErr + }) + + return udfs, err } func (ic *InfoClient) SupportsBatchWrite() (bool, error) { - version, err := ic.GetVersion() - if err != nil { - return false, fmt.Errorf("failed to get aerospike version: %w", err) - } + var supports bool + + err := executeWithRetry(ic.retryPolicy, func() error { + version, err := ic.GetVersion() + if err != nil { + return fmt.Errorf("failed to get aerospike version: %w", err) + } - return version.IsGreaterOrEqual(AerospikeVersionSupportsBatchWrites), nil + supports = version.IsGreaterOrEqual(AerospikeVersionSupportsBatchWrites) + + return nil + }) + + return supports, err } // GetRecordCount counts number of records in given namespace and sets. func (ic *InfoClient) GetRecordCount(namespace string, sets []string) (uint64, error) { - node, aerr := ic.cluster.GetRandomNode() - if aerr != nil { - return 0, aerr - } - - effectiveReplicationFactor, err := getEffectiveReplicationFactor(node, ic.policy, namespace) - if err != nil { - return 0, err - } - - var recordsNumber uint64 + var count uint64 - for _, node := range ic.cluster.GetNodes() { - if !node.IsActive() { - continue + err := executeWithRetry(ic.retryPolicy, func() error { + node, aerr := ic.cluster.GetRandomNode() + if aerr != nil { + return aerr } - recordCountForNode, err := getRecordCountForNode(node, ic.policy, namespace, sets) + effectiveReplicationFactor, err := getEffectiveReplicationFactor(node, ic.policy, namespace) if err != nil { - return 0, err + return err } - recordsNumber += recordCountForNode - } + var recordsNumber uint64 + + for _, node := range ic.cluster.GetNodes() { + if !node.IsActive() { + continue + } + + recordCountForNode, err := getRecordCountForNode(node, ic.policy, namespace, sets) + if err != nil { + return err + } + + recordsNumber += recordCountForNode + } + + count = recordsNumber / uint64(effectiveReplicationFactor) - return recordsNumber / uint64(effectiveReplicationFactor), nil + return nil + }) + + return count, err } // StartXDR creates xdr config and starts replication. func (ic *InfoClient) StartXDR(dc, hostPort, namespace, rewind string) error { + return executeWithRetry(ic.retryPolicy, func() error { + return ic.startXDR(dc, hostPort, namespace, rewind) + }) +} + +func (ic *InfoClient) startXDR(dc, hostPort, namespace, rewind string) error { if err := ic.createXDRDC(dc); err != nil { return err } @@ -212,8 +279,13 @@ func (ic *InfoClient) StartXDR(dc, hostPort, namespace, rewind string) error { } // StopXDR disable replication and remove xdr config. - func (ic *InfoClient) StopXDR(dc string) error { + return executeWithRetry(ic.retryPolicy, func() error { + return ic.stopXDR(dc) + }) +} + +func (ic *InfoClient) stopXDR(dc string) error { if err := ic.deleteXDRDC(dc); err != nil { return err } @@ -298,6 +370,11 @@ func (ic *InfoClient) deleteXDRDC(dc string) error { // BlockMRTWrites blocks MRT writes on cluster. func (ic *InfoClient) BlockMRTWrites(namespace string) error { + return executeWithRetry(ic.retryPolicy, func() error { + return ic.blockMRTWrites(namespace) + }) +} +func (ic *InfoClient) blockMRTWrites(namespace string) error { cmd := fmt.Sprintf(cmdBlockMRTWrites, namespace) resp, err := ic.GetInfo(cmd) @@ -314,6 +391,11 @@ func (ic *InfoClient) BlockMRTWrites(namespace string) error { // UnBlockMRTWrites unblocks MRT writes on cluster. func (ic *InfoClient) UnBlockMRTWrites(namespace string) error { + return executeWithRetry(ic.retryPolicy, func() error { + return ic.unBlockMRTWrites(namespace) + }) +} +func (ic *InfoClient) unBlockMRTWrites(namespace string) error { cmd := fmt.Sprintf(cmdUnBlockMRTWrites, namespace) resp, err := ic.GetInfo(cmd) @@ -888,6 +970,6 @@ func executeWithRetry(policy *models.RetryPolicy, command func() error) error { duration := time.Duration(float64(policy.BaseTimeout) * math.Pow(policy.Multiplier, float64(i))) time.Sleep(duration) } - + return fmt.Errorf("after %d attempts: %w", policy.MaxRetries, err) } diff --git a/internal/asinfo/info_client_test.go b/internal/asinfo/info_client_test.go index 5aa516b6..63f3bded 100644 --- a/internal/asinfo/info_client_test.go +++ b/internal/asinfo/info_client_test.go @@ -1713,14 +1713,13 @@ func Test_parseInfoObject(t *testing.T) { func TestInfoCommander_EnableDisableXDR(t *testing.T) { t.Parallel() - infoPolicy := a.NewInfoPolicy() asPolicy := a.NewClientPolicy() asPolicy.User = testASLoginPassword asPolicy.Password = testASLoginPassword client, aerr := a.NewClientWithPolicy(asPolicy, testASHost, testASPort) require.NoError(t, aerr) - ic := NewInfoClientFromAerospike(client, infoPolicy) + ic := NewInfoClientFromAerospike(client, a.NewInfoPolicy(), models.NewDefaultRetryPolicy()) err := ic.StartXDR(testASDC, testXDRHostPort, testASNamespace, testASRewind) require.NoError(t, err) @@ -1734,15 +1733,13 @@ func TestInfoCommander_EnableDisableXDR(t *testing.T) { func TestInfoCommander_BlockUnblockMRTWrites(t *testing.T) { t.Parallel() - - infoPolicy := a.NewInfoPolicy() asPolicy := a.NewClientPolicy() asPolicy.User = testASLoginPassword asPolicy.Password = testASLoginPassword client, aerr := a.NewClientWithPolicy(asPolicy, testASHost, testASPort) require.NoError(t, aerr) - ic := NewInfoClientFromAerospike(client, infoPolicy) + ic := NewInfoClientFromAerospike(client, a.NewInfoPolicy(), models.NewDefaultRetryPolicy()) err := ic.BlockMRTWrites(testASNamespace) require.NoError(t, err) diff --git a/models/retry_policy.go b/models/retry_policy.go index baf7cbc7..9547b0b6 100644 --- a/models/retry_policy.go +++ b/models/retry_policy.go @@ -38,3 +38,8 @@ func NewRetryPolicy(baseTimeout time.Duration, multiplier float64, maxRetries ui MaxRetries: maxRetries, } } + +// NewDefaultRetryPolicy returns new default configuration for retry attempts +func NewDefaultRetryPolicy() *RetryPolicy { + return NewRetryPolicy(1000*time.Millisecond, 1, 1) +} diff --git a/processor_record_writer.go b/processor_record_writer.go index 76891510..9268813e 100644 --- a/processor_record_writer.go +++ b/processor_record_writer.go @@ -84,7 +84,7 @@ func (rw *recordWriterProcessor[T]) useBatchWrites() (bool, error) { return false, nil } - infoClient := asinfo.NewInfoClientFromAerospike(rw.aerospikeClient, rw.config.InfoPolicy) + infoClient := asinfo.NewInfoClientFromAerospike(rw.aerospikeClient, rw.config.InfoPolicy, rw.config.InfoRetryPolicy) return infoClient.SupportsBatchWrite() } diff --git a/tests/test_client.go b/tests/test_client.go index 9ad28ba7..e97a2b15 100644 --- a/tests/test_client.go +++ b/tests/test_client.go @@ -40,9 +40,7 @@ type RecordMap map[digest]*a.Record // NewTestClient creates a new TestClient. func NewTestClient(asc *a.Client) *TestClient { - infoPolicy := a.NewInfoPolicy() - - infoClient := asinfo.NewInfoClientFromAerospike(asc, infoPolicy) + infoClient := asinfo.NewInfoClientFromAerospike(asc, a.NewInfoPolicy(), models.NewDefaultRetryPolicy()) return &TestClient{ asc: asc, From cc40dcd6f61f4e1282b0691e39186b44f90cb3db Mon Sep 17 00:00:00 2001 From: Dmitrii Neeman Date: Tue, 28 Jan 2025 10:45:50 +0200 Subject: [PATCH 5/9] FMWK-658-retry-info-commands - init info client fix --- cmd/internal/app/validation_test.go | 14 -------------- internal/asinfo/info_client.go | 2 +- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/cmd/internal/app/validation_test.go b/cmd/internal/app/validation_test.go index 5aa1736a..93c24969 100644 --- a/cmd/internal/app/validation_test.go +++ b/cmd/internal/app/validation_test.go @@ -746,20 +746,6 @@ func Test_validateBackupXDRParams(t *testing.T) { }, wantErr: "backup xdr max connections can't be less than 1", }, - { - name: "invalid parallel write", - params: &models.BackupXDR{ - ReadTimeoutMilliseconds: 0, - WriteTimeoutMilliseconds: 0, - InfoPolingPeriodMilliseconds: 0, - StartTimeoutMilliseconds: 0, - ResultQueueSize: 0, - AckQueueSize: 0, - MaxConnections: 1, - ParallelWrite: 0, - }, - wantErr: "backup xdr parallel write can't be less than 1", - }, { name: "invalid file limit", params: &models.BackupXDR{ diff --git a/internal/asinfo/info_client.go b/internal/asinfo/info_client.go index 0712ea04..bd406645 100644 --- a/internal/asinfo/info_client.go +++ b/internal/asinfo/info_client.go @@ -108,7 +108,7 @@ type InfoClient struct { func NewInfoClientFromAerospike(aeroClient aerospikeClient, policy *a.InfoPolicy, retryPolicy *models.RetryPolicy, ) *InfoClient { - if retryPolicy == nil { + if retryPolicy.MaxRetries == 0 { retryPolicy = models.NewDefaultRetryPolicy() } From e78a55d41b6d4ef35b5f49f1f4e46ff03677b30a Mon Sep 17 00:00:00 2001 From: Dmitrii Neeman Date: Tue, 28 Jan 2025 10:48:18 +0200 Subject: [PATCH 6/9] FMWK-658-retry-info-commands - tests --- internal/asinfo/info_client.go | 2 +- models/retry_policy.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/asinfo/info_client.go b/internal/asinfo/info_client.go index bd406645..0712ea04 100644 --- a/internal/asinfo/info_client.go +++ b/internal/asinfo/info_client.go @@ -108,7 +108,7 @@ type InfoClient struct { func NewInfoClientFromAerospike(aeroClient aerospikeClient, policy *a.InfoPolicy, retryPolicy *models.RetryPolicy, ) *InfoClient { - if retryPolicy.MaxRetries == 0 { + if retryPolicy == nil { retryPolicy = models.NewDefaultRetryPolicy() } diff --git a/models/retry_policy.go b/models/retry_policy.go index 9547b0b6..18cfcba8 100644 --- a/models/retry_policy.go +++ b/models/retry_policy.go @@ -41,5 +41,5 @@ func NewRetryPolicy(baseTimeout time.Duration, multiplier float64, maxRetries ui // NewDefaultRetryPolicy returns new default configuration for retry attempts func NewDefaultRetryPolicy() *RetryPolicy { - return NewRetryPolicy(1000*time.Millisecond, 1, 1) + return NewRetryPolicy(1000*time.Millisecond, 1, 3) } From 2dff4fe7401657696964c4eb29abd03a214ed302 Mon Sep 17 00:00:00 2001 From: Dmitrii Neeman Date: Tue, 28 Jan 2025 10:55:01 +0200 Subject: [PATCH 7/9] FMWK-658-retry-info-commands - err processing fix --- internal/asinfo/info_client.go | 42 +++++++++++++++++----------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/internal/asinfo/info_client.go b/internal/asinfo/info_client.go index 0712ea04..d125db4d 100644 --- a/internal/asinfo/info_client.go +++ b/internal/asinfo/info_client.go @@ -139,18 +139,18 @@ func (ic *InfoClient) GetInfo(names ...string) (map[string]string, error) { func (ic *InfoClient) GetVersion() (AerospikeVersion, error) { var ( version AerospikeVersion - aErr error + err error ) - err := executeWithRetry(ic.retryPolicy, func() error { - node, err := ic.cluster.GetRandomNode() - if err != nil { - return err + err = executeWithRetry(ic.retryPolicy, func() error { + node, aErr := ic.cluster.GetRandomNode() + if aErr != nil { + return aErr.Unwrap() } - version, aErr = getAerospikeVersion(node, ic.policy) + version, err = getAerospikeVersion(node, ic.policy) - return aErr + return err }) return version, err @@ -159,18 +159,18 @@ func (ic *InfoClient) GetVersion() (AerospikeVersion, error) { func (ic *InfoClient) GetSIndexes(namespace string) ([]*models.SIndex, error) { var ( indexes []*models.SIndex - aErr error + err error ) - err := executeWithRetry(ic.retryPolicy, func() error { - node, err := ic.cluster.GetRandomNode() - if err != nil { - return err + err = executeWithRetry(ic.retryPolicy, func() error { + node, aErr := ic.cluster.GetRandomNode() + if aErr != nil { + return aErr.Unwrap() } - indexes, aErr = getSIndexes(node, namespace, ic.policy) + indexes, err = getSIndexes(node, namespace, ic.policy) - return aErr + return err }) return indexes, err @@ -179,18 +179,18 @@ func (ic *InfoClient) GetSIndexes(namespace string) ([]*models.SIndex, error) { func (ic *InfoClient) GetUDFs() ([]*models.UDF, error) { var ( udfs []*models.UDF - aErr error + err error ) - err := executeWithRetry(ic.retryPolicy, func() error { - node, err := ic.cluster.GetRandomNode() - if err != nil { - return err + err = executeWithRetry(ic.retryPolicy, func() error { + node, aErr := ic.cluster.GetRandomNode() + if aErr != nil { + return aErr.Unwrap() } - udfs, aErr = getUDFs(node, ic.policy) + udfs, err = getUDFs(node, ic.policy) - return aErr + return err }) return udfs, err From 73e9b25bf0462f7e3af8d9aa52e4f79695b5cfb3 Mon Sep 17 00:00:00 2001 From: Dmitrii Neeman Date: Tue, 28 Jan 2025 10:59:16 +0200 Subject: [PATCH 8/9] FMWK-658-retry-info-commands - exec with retry fix --- internal/asinfo/info_client.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/asinfo/info_client.go b/internal/asinfo/info_client.go index d125db4d..512c0d03 100644 --- a/internal/asinfo/info_client.go +++ b/internal/asinfo/info_client.go @@ -971,5 +971,9 @@ func executeWithRetry(policy *models.RetryPolicy, command func() error) error { time.Sleep(duration) } - return fmt.Errorf("after %d attempts: %w", policy.MaxRetries, err) + if err != nil { + return fmt.Errorf("after %d attempts: %w", policy.MaxRetries, err) + } + + return nil } From 0c47fbca607687c8890d301e390393fb502ae788 Mon Sep 17 00:00:00 2001 From: Dmitrii Neeman Date: Tue, 28 Jan 2025 12:04:01 +0200 Subject: [PATCH 9/9] FMWK-658-retry-info-commands - review fixes --- cmd/internal/app/asbackup.go | 51 +++++++++++--------------------- cmd/internal/app/validation.go | 2 +- cmd/internal/flags/backup.go | 2 +- cmd/internal/flags/backup_xdr.go | 2 +- models/retry_policy.go | 2 +- 5 files changed, 21 insertions(+), 38 deletions(-) diff --git a/cmd/internal/app/asbackup.go b/cmd/internal/app/asbackup.go index 0c8fdcef..d6d2832b 100644 --- a/cmd/internal/app/asbackup.go +++ b/cmd/internal/app/asbackup.go @@ -19,7 +19,6 @@ import ( "fmt" "log/slog" - "github.com/aerospike/aerospike-client-go/v7" "github.com/aerospike/backup-go" "github.com/aerospike/backup-go/cmd/internal/models" "github.com/aerospike/backup-go/internal/asinfo" @@ -128,15 +127,27 @@ func NewASBackup( } if params.BackupXDRParams != nil { - if err := checkVersion(aerospikeClient, backupXDRConfig); err != nil { - return nil, err + infoClient := asinfo.NewInfoClientFromAerospike( + aerospikeClient, + backupXDRConfig.InfoPolicy, + backupXDRConfig.InfoRetryPolicy, + ) + + version, err := infoClient.GetVersion() + if err != nil { + return nil, fmt.Errorf("failed to get version: %w", err) + } + + if version.Major < xdrSupportedVersion { + return nil, fmt.Errorf("version %s is unsupported, only databse version %d+ is supproted", + version.String(), xdrSupportedVersion) } // Stop xdr. if params.isStopXDR() { logger.Info("stopping XDR on the database") - if err = stopXDR(aerospikeClient, backupXDRConfig); err != nil { + if err = infoClient.StopXDR(backupXDRConfig.DC); err != nil { return nil, fmt.Errorf("failed to stop XDR: %w", err) } @@ -147,8 +158,8 @@ func NewASBackup( if params.isUnblockMRT() { logger.Info("enabling MRT writes on the database") - if err = unblockMRT(aerospikeClient, backupXDRConfig); err != nil { - return nil, fmt.Errorf("failed to enable MRT: %w", err) + if err = infoClient.UnBlockMRTWrites(backupXDRConfig.Namespace); err != nil { + return nil, fmt.Errorf("failed to enable MRT writes: %w", err) } return nil, nil @@ -308,31 +319,3 @@ func getSecretAgent(b *backup.ConfigBackup, bxdr *backup.ConfigBackupXDR) *backu return nil } } - -func stopXDR(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error { - infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy, cfg.InfoRetryPolicy) - - return infoClient.StopXDR(cfg.DC) -} - -func unblockMRT(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error { - infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy, cfg.InfoRetryPolicy) - - return infoClient.UnBlockMRTWrites(cfg.Namespace) -} - -func checkVersion(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error { - infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy, cfg.InfoRetryPolicy) - - version, err := infoClient.GetVersion() - if err != nil { - return fmt.Errorf("failed to get version: %w", err) - } - - if version.Major < xdrSupportedVersion { - return fmt.Errorf("version %s is unsupported, only databse version %d+ is supproted", - version.String(), xdrSupportedVersion) - } - - return nil -} diff --git a/cmd/internal/app/validation.go b/cmd/internal/app/validation.go index a8375388..740728d3 100644 --- a/cmd/internal/app/validation.go +++ b/cmd/internal/app/validation.go @@ -80,7 +80,7 @@ func validateBackupXDRParams(params *models.BackupXDR) error { } if params.ParallelWrite < 0 { - return fmt.Errorf("backup xdr parallel write can't be less than 1") + return fmt.Errorf("backup xdr parallel write can't be negative") } if params.FileLimit < 1 { diff --git a/cmd/internal/flags/backup.go b/cmd/internal/flags/backup.go index 008be0f0..7b35d4af 100644 --- a/cmd/internal/flags/backup.go +++ b/cmd/internal/flags/backup.go @@ -152,7 +152,7 @@ func (f *Backup) NewFlagSet() *pflag.FlagSet { flagSet.Float64Var(&f.InfoRetriesMultiplier, "info-retry-multiplier", 1, "Used to increase the delay between subsequent retry attempts.\n"+ - "The actual delay is calculated as: retry-base-timeout * (retry-multiplier ^ attemptNumber)") + "The actual delay is calculated as: info-retry-timeout * (info-retry-multiplier ^ attemptNumber)") flagSet.UintVar(&f.InfoMaxRetries, "info-max-retries", 3, "How many times to retry to send info commands before failing. "+ "This parameter is applied to stop xdr and unblock MRT writes requests.") diff --git a/cmd/internal/flags/backup_xdr.go b/cmd/internal/flags/backup_xdr.go index 1b19b9c3..be72248a 100644 --- a/cmd/internal/flags/backup_xdr.go +++ b/cmd/internal/flags/backup_xdr.go @@ -88,7 +88,7 @@ func (f *BackupXDR) NewFlagSet() *pflag.FlagSet { flagSet.Float64Var(&f.InfoRetriesMultiplier, "info-retry-multiplier", 1, "Used to increase the delay between subsequent retry attempts.\n"+ - "The actual delay is calculated as: retry-base-timeout * (retry-multiplier ^ attemptNumber)") + "The actual delay is calculated as: info-retry-timeout * (info-retry-multiplier ^ attemptNumber)") flagSet.UintVar(&f.InfoMaxRetries, "info-max-retries", 3, "How many times to retry to send info commands before failing. "+ "This parameter is applied to stop xdr and unblock MRT writes requests.") diff --git a/models/retry_policy.go b/models/retry_policy.go index 18cfcba8..4e7720e1 100644 --- a/models/retry_policy.go +++ b/models/retry_policy.go @@ -39,7 +39,7 @@ func NewRetryPolicy(baseTimeout time.Duration, multiplier float64, maxRetries ui } } -// NewDefaultRetryPolicy returns new default configuration for retry attempts +// NewDefaultRetryPolicy returns a new RetryPolicy with default values. func NewDefaultRetryPolicy() *RetryPolicy { return NewRetryPolicy(1000*time.Millisecond, 1, 3) }