From fcff04758768c270fe77f26dbd6f159dccb49a10 Mon Sep 17 00:00:00 2001 From: Dmitrii Neeman Date: Tue, 28 Jan 2025 13:17:06 +0200 Subject: [PATCH] FMWK-658 Add retry mechanism to XDR info commands (#207) --- cmd/internal/app/asbackup.go | 83 ++++------- cmd/internal/app/configs.go | 11 ++ cmd/internal/app/reports.go | 5 +- cmd/internal/app/validation.go | 46 ++++++ cmd/internal/app/validation_test.go | 120 ++++++++++++++++ cmd/internal/flags/backup.go | 10 ++ cmd/internal/flags/backup_xdr.go | 10 ++ cmd/internal/models/backup.go | 4 + cmd/internal/models/backup_xdr.go | 4 + config_backup.go | 3 + config_backup_xdr.go | 3 + config_restore.go | 2 + handler_backup.go | 2 +- handler_backup_xdr.go | 10 +- internal/asinfo/info_client.go | 210 +++++++++++++++++++++------- internal/asinfo/info_client_test.go | 7 +- models/retry_policy.go | 5 + processor_record_writer.go | 2 +- tests/test_client.go | 4 +- 19 files changed, 424 insertions(+), 117 deletions(-) diff --git a/cmd/internal/app/asbackup.go b/cmd/internal/app/asbackup.go index f789803b..d6d2832b 100644 --- a/cmd/internal/app/asbackup.go +++ b/cmd/internal/app/asbackup.go @@ -19,7 +19,6 @@ import ( "fmt" "log/slog" - "github.com/aerospike/aerospike-client-go/v7" "github.com/aerospike/backup-go" "github.com/aerospike/backup-go/cmd/internal/models" "github.com/aerospike/backup-go/internal/asinfo" @@ -128,31 +127,43 @@ func NewASBackup( } if params.BackupXDRParams != nil { - if err := checkVersion(aerospikeClient, backupXDRConfig); err != nil { - return nil, err - } - } + infoClient := asinfo.NewInfoClientFromAerospike( + aerospikeClient, + backupXDRConfig.InfoPolicy, + backupXDRConfig.InfoRetryPolicy, + ) - // Stop xdr. - if params.isStopXDR() { - logger.Info("stopping XDR on the database") + version, err := infoClient.GetVersion() + if err != nil { + return nil, fmt.Errorf("failed to get version: %w", err) + } - if err := stopXDR(aerospikeClient, backupXDRConfig); err != nil { - return nil, err + if version.Major < xdrSupportedVersion { + return nil, fmt.Errorf("version %s is unsupported, only databse version %d+ is supproted", + version.String(), xdrSupportedVersion) } - return nil, nil - } + // Stop xdr. + if params.isStopXDR() { + logger.Info("stopping XDR on the database") - // Unblock mRT. - if params.isUnblockMRT() { - logger.Info("enabling MRT writes on the database") + if err = infoClient.StopXDR(backupXDRConfig.DC); err != nil { + return nil, fmt.Errorf("failed to stop XDR: %w", err) + } - if err := unblockMRT(aerospikeClient, backupXDRConfig); err != nil { - return nil, err + return nil, nil } - return nil, nil + // Unblock mRT. + if params.isUnblockMRT() { + logger.Info("enabling MRT writes on the database") + + if err = infoClient.UnBlockMRTWrites(backupXDRConfig.Namespace); err != nil { + return nil, fmt.Errorf("failed to enable MRT writes: %w", err) + } + + return nil, nil + } } backupClient, err := backup.NewClient(aerospikeClient, backup.WithLogger(logger), backup.WithID(idBackup)) @@ -308,39 +319,3 @@ func getSecretAgent(b *backup.ConfigBackup, bxdr *backup.ConfigBackupXDR) *backu return nil } } - -func stopXDR(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error { - infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy) - - if err := infoClient.StopXDR(cfg.DC); err != nil { - return fmt.Errorf("failed to stop xdr: %w", err) - } - - return nil -} - -func unblockMRT(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error { - infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy) - - if err := infoClient.UnBlockMRTWrites(cfg.Namespace); err != nil { - return fmt.Errorf("failed to unblock MRT: %w", err) - } - - return nil -} - -func checkVersion(aerospikeClient *aerospike.Client, cfg *backup.ConfigBackupXDR) error { - infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, cfg.InfoPolicy) - - version, err := infoClient.GetVersion() - if err != nil { - return fmt.Errorf("failed to get version: %w", err) - } - - if version.Major < xdrSupportedVersion { - return fmt.Errorf("version %s is unsupported, only databse version %d+ is supproted", - version.String(), xdrSupportedVersion) - } - - return nil -} diff --git a/cmd/internal/app/configs.go b/cmd/internal/app/configs.go index 873f3f41..776b9d15 100644 --- a/cmd/internal/app/configs.go +++ b/cmd/internal/app/configs.go @@ -118,6 +118,12 @@ func mapBackupConfig(params *ASBackupParams) (*backup.ConfigBackup, error) { c.ModAfter = &modAfterTime } + c.InfoRetryPolicy = mapRetryPolicy( + params.BackupParams.InfoRetryIntervalMilliseconds, + params.BackupParams.InfoRetriesMultiplier, + params.BackupParams.InfoMaxRetries, + ) + return c, nil } @@ -148,6 +154,11 @@ func mapBackupXDRConfig(params *ASBackupParams) *backup.ConfigBackupXDR { MaxConnections: params.BackupXDRParams.MaxConnections, InfoPolingPeriodMilliseconds: params.BackupXDRParams.InfoPolingPeriodMilliseconds, StartTimeoutMilliseconds: params.BackupXDRParams.StartTimeoutMilliseconds, + InfoRetryPolicy: mapRetryPolicy( + params.BackupXDRParams.InfoRetryIntervalMilliseconds, + params.BackupXDRParams.InfoRetriesMultiplier, + params.BackupXDRParams.InfoMaxRetries, + ), } return c diff --git a/cmd/internal/app/reports.go b/cmd/internal/app/reports.go index 298c0949..e1a0d41e 100644 --- a/cmd/internal/app/reports.go +++ b/cmd/internal/app/reports.go @@ -39,10 +39,11 @@ func printBackupReport(stats, xdrStats *bModels.BackupStats) { fmt.Printf("Records Read: %d\n", stats.GetReadRecords()) - var bw, fw uint64 + var bw, fw, tr uint64 if xdrStats != nil { bw = xdrStats.GetBytesWritten() fw = xdrStats.GetFileCount() + tr = xdrStats.TotalRecords fmt.Printf("Records Received: %d\n", xdrStats.GetReadRecords()) } @@ -52,7 +53,7 @@ func printBackupReport(stats, xdrStats *bModels.BackupStats) { fmt.Println() fmt.Printf("Bytes Written: %d bytes\n", stats.GetBytesWritten()+bw) - fmt.Printf("Total Records: %d\n", stats.TotalRecords) + fmt.Printf("Total Records: %d\n", stats.TotalRecords+tr) fmt.Printf("Files Written: %d\n", stats.GetFileCount()+fw) } diff --git a/cmd/internal/app/validation.go b/cmd/internal/app/validation.go index 50137360..740728d3 100644 --- a/cmd/internal/app/validation.go +++ b/cmd/internal/app/validation.go @@ -37,6 +37,12 @@ func validateBackup(params *ASBackupParams) error { } } + if params.BackupXDRParams != nil { + if err := validateBackupXDRParams(params.BackupXDRParams); err != nil { + return err + } + } + if err := validateStorages(params.AwsS3, params.GcpStorage, params.AzureBlob); err != nil { return err } @@ -44,6 +50,46 @@ func validateBackup(params *ASBackupParams) error { return nil } +func validateBackupXDRParams(params *models.BackupXDR) error { + if params.ReadTimeoutMilliseconds < 0 { + return fmt.Errorf("backup xdr read timeout can't be negative") + } + + if params.WriteTimeoutMilliseconds < 0 { + return fmt.Errorf("backup xdr write timeout can't be negative") + } + + if params.InfoPolingPeriodMilliseconds < 0 { + return fmt.Errorf("backup xdr info poling period can't be negative") + } + + if params.StartTimeoutMilliseconds < 0 { + return fmt.Errorf("backup xdr start timeout can't be negative") + } + + if params.ResultQueueSize < 0 { + return fmt.Errorf("backup xdr result queue size can't be negative") + } + + if params.AckQueueSize < 0 { + return fmt.Errorf("backup xdr ack queue size can't be negative") + } + + if params.MaxConnections < 1 { + return fmt.Errorf("backup xdr max connections can't be less than 1") + } + + if params.ParallelWrite < 0 { + return fmt.Errorf("backup xdr parallel write can't be negative") + } + + if params.FileLimit < 1 { + return fmt.Errorf("backup xdr file limit can't be less than 1") + } + + return nil +} + func validateRestore(params *ASRestoreParams) error { if params.RestoreParams != nil && params.CommonParams != nil { switch params.RestoreParams.Mode { diff --git a/cmd/internal/app/validation_test.go b/cmd/internal/app/validation_test.go index 9e691090..93c24969 100644 --- a/cmd/internal/app/validation_test.go +++ b/cmd/internal/app/validation_test.go @@ -654,3 +654,123 @@ func TestValidateRestoreParams(t *testing.T) { }) } } + +func Test_validateBackupXDRParams(t *testing.T) { + tests := []struct { + name string + params *models.BackupXDR + wantErr string + }{ + { + name: "valid params", + params: &models.BackupXDR{ + ReadTimeoutMilliseconds: 1000, + WriteTimeoutMilliseconds: 1000, + InfoPolingPeriodMilliseconds: 1000, + StartTimeoutMilliseconds: 1000, + ResultQueueSize: 100, + AckQueueSize: 100, + MaxConnections: 10, + ParallelWrite: 5, + FileLimit: 1000, + }, + wantErr: "", + }, + { + name: "negative read timeout", + params: &models.BackupXDR{ + ReadTimeoutMilliseconds: -1, + }, + wantErr: "backup xdr read timeout can't be negative", + }, + { + name: "negative write timeout", + params: &models.BackupXDR{ + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: -1, + }, + wantErr: "backup xdr write timeout can't be negative", + }, + { + name: "negative info polling period", + params: &models.BackupXDR{ + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: 0, + InfoPolingPeriodMilliseconds: -1, + }, + wantErr: "backup xdr info poling period can't be negative", + }, + { + name: "negative start timeout", + params: &models.BackupXDR{ + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: 0, + InfoPolingPeriodMilliseconds: 0, + StartTimeoutMilliseconds: -1, + }, + wantErr: "backup xdr start timeout can't be negative", + }, + { + name: "negative result queue size", + params: &models.BackupXDR{ + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: 0, + InfoPolingPeriodMilliseconds: 0, + StartTimeoutMilliseconds: 0, + ResultQueueSize: -1, + }, + wantErr: "backup xdr result queue size can't be negative", + }, + { + name: "negative ack queue size", + params: &models.BackupXDR{ + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: 0, + InfoPolingPeriodMilliseconds: 0, + StartTimeoutMilliseconds: 0, + ResultQueueSize: 0, + AckQueueSize: -1, + }, + wantErr: "backup xdr ack queue size can't be negative", + }, + { + name: "invalid max connections", + params: &models.BackupXDR{ + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: 0, + InfoPolingPeriodMilliseconds: 0, + StartTimeoutMilliseconds: 0, + ResultQueueSize: 0, + AckQueueSize: 0, + MaxConnections: 0, + }, + wantErr: "backup xdr max connections can't be less than 1", + }, + { + name: "invalid file limit", + params: &models.BackupXDR{ + ReadTimeoutMilliseconds: 0, + WriteTimeoutMilliseconds: 0, + InfoPolingPeriodMilliseconds: 0, + StartTimeoutMilliseconds: 0, + ResultQueueSize: 0, + AckQueueSize: 0, + MaxConnections: 1, + ParallelWrite: 1, + FileLimit: 0, + }, + wantErr: "backup xdr file limit can't be less than 1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateBackupXDRParams(tt.params) + if tt.wantErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tt.wantErr) + } + }) + } +} diff --git a/cmd/internal/flags/backup.go b/cmd/internal/flags/backup.go index 56782c33..7b35d4af 100644 --- a/cmd/internal/flags/backup.go +++ b/cmd/internal/flags/backup.go @@ -146,6 +146,16 @@ func (f *Backup) NewFlagSet() *pflag.FlagSet { "Number of records will be read on one iteration for continuation backup.\n"+ "Affects size if overlap on resuming backup after an error.\n"+ "Used only with --state-file-dst or --continue.") + flagSet.Int64Var(&f.InfoRetryIntervalMilliseconds, "info-retry-timeout", 1000, + "Set the initial timeout for a retry in milliseconds when info commands are sent."+ + "This parameter is applied to stop xdr and unblock MRT writes requests.") + flagSet.Float64Var(&f.InfoRetriesMultiplier, "info-retry-multiplier", + 1, + "Used to increase the delay between subsequent retry attempts.\n"+ + "The actual delay is calculated as: info-retry-timeout * (info-retry-multiplier ^ attemptNumber)") + flagSet.UintVar(&f.InfoMaxRetries, "info-max-retries", 3, + "How many times to retry to send info commands before failing. "+ + "This parameter is applied to stop xdr and unblock MRT writes requests.") return flagSet } diff --git a/cmd/internal/flags/backup_xdr.go b/cmd/internal/flags/backup_xdr.go index 519f55be..be72248a 100644 --- a/cmd/internal/flags/backup_xdr.go +++ b/cmd/internal/flags/backup_xdr.go @@ -82,6 +82,16 @@ func (f *BackupXDR) NewFlagSet() *pflag.FlagSet { 1000, "How often (in milliseconds) a backup client will send info commands to check aerospike cluster stats.\n"+ "To measure recovery state and lag.") + flagSet.Int64Var(&f.InfoRetryIntervalMilliseconds, "info-retry-timeout", 1000, + "Set the initial timeout for a retry in milliseconds when info commands are sent."+ + "This parameter is applied to stop xdr and unblock MRT writes requests.") + flagSet.Float64Var(&f.InfoRetriesMultiplier, "info-retry-multiplier", + 1, + "Used to increase the delay between subsequent retry attempts.\n"+ + "The actual delay is calculated as: info-retry-timeout * (info-retry-multiplier ^ attemptNumber)") + flagSet.UintVar(&f.InfoMaxRetries, "info-max-retries", 3, + "How many times to retry to send info commands before failing. "+ + "This parameter is applied to stop xdr and unblock MRT writes requests.") flagSet.Int64Var(&f.StartTimeoutMilliseconds, "start-timeout", 30000, "Timeout for starting TCP server for XDR.\n"+ diff --git a/cmd/internal/models/backup.go b/cmd/internal/models/backup.go index 7d8205b3..b5b87083 100644 --- a/cmd/internal/models/backup.go +++ b/cmd/internal/models/backup.go @@ -38,6 +38,10 @@ type Backup struct { Continue string ScanPageSize int64 OutputFilePrefix string + + InfoMaxRetries uint + InfoRetriesMultiplier float64 + InfoRetryIntervalMilliseconds int64 } // ShouldClearTarget check if we should clean target directory. diff --git a/cmd/internal/models/backup_xdr.go b/cmd/internal/models/backup_xdr.go index b8e6be9a..f2c5182c 100644 --- a/cmd/internal/models/backup_xdr.go +++ b/cmd/internal/models/backup_xdr.go @@ -38,4 +38,8 @@ type BackupXDR struct { StopXDR bool UnblockMRT bool + + InfoMaxRetries uint + InfoRetriesMultiplier float64 + InfoRetryIntervalMilliseconds int64 } diff --git a/config_backup.go b/config_backup.go index ee17c012..9264859c 100644 --- a/config_backup.go +++ b/config_backup.go @@ -19,6 +19,7 @@ import ( "time" a "github.com/aerospike/aerospike-client-go/v7" + "github.com/aerospike/backup-go/models" "github.com/aerospike/backup-go/pipeline" ) @@ -125,6 +126,8 @@ type ConfigBackup struct { PipelinesMode pipeline.Mode // When using directory parameter, prepend a prefix to the names of the generated files. OutputFilePrefix string + // Retry policy for info commands. + InfoRetryPolicy *models.RetryPolicy } // NewDefaultBackupConfig returns a new ConfigBackup with default values. diff --git a/config_backup_xdr.go b/config_backup_xdr.go index 2f635b75..9dac17ff 100644 --- a/config_backup_xdr.go +++ b/config_backup_xdr.go @@ -20,6 +20,7 @@ import ( "strconv" a "github.com/aerospike/aerospike-client-go/v7" + "github.com/aerospike/backup-go/models" ) // ConfigBackupXDR contains configuration for the xdr backup operation. @@ -78,6 +79,8 @@ type ConfigBackupXDR struct { // If the TCP server for XDR does not receive any data within this timeout period, it will shut down. // This situation can occur if the LocalAddress and LocalPort options are misconfigured. StartTimeoutMilliseconds int64 + // Retry policy for info commands. + InfoRetryPolicy *models.RetryPolicy } func (c *ConfigBackupXDR) validate() error { diff --git a/config_restore.go b/config_restore.go index 8f70f7cc..9f790c8a 100644 --- a/config_restore.go +++ b/config_restore.go @@ -75,6 +75,8 @@ type ConfigRestore struct { // E.g.: AEROSPIKE_RECORD_TOO_BIG. // By default, such errors are not ignored and restore terminates. IgnoreRecordError bool + // Retry policy for info commands. + InfoRetryPolicy *models.RetryPolicy } // NewDefaultRestoreConfig returns a new ConfigRestore with default values. diff --git a/handler_backup.go b/handler_backup.go index 7fb6b82f..11c3338c 100644 --- a/handler_backup.go +++ b/handler_backup.go @@ -135,7 +135,7 @@ func newBackupHandler( firstFileHeaderWritten: &atomic.Bool{}, encoder: NewEncoder[*models.Token](config.EncoderType, config.Namespace, config.Compact), limiter: limiter, - infoClient: asinfo.NewInfoClientFromAerospike(ac, config.InfoPolicy), + infoClient: asinfo.NewInfoClientFromAerospike(ac, config.InfoPolicy, config.InfoRetryPolicy), scanLimiter: scanLimiter, state: state, }, nil diff --git a/handler_backup_xdr.go b/handler_backup_xdr.go index 1cc986e5..5c6dfd84 100644 --- a/handler_backup_xdr.go +++ b/handler_backup_xdr.go @@ -67,7 +67,7 @@ func newBackupXDRHandler( stats := models.NewBackupStats() - infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, config.InfoPolicy) + infoClient := asinfo.NewInfoClientFromAerospike(aerospikeClient, config.InfoPolicy, config.InfoRetryPolicy) readProcessor := newRecordReaderProcessor[*models.ASBXToken]( config, @@ -123,6 +123,14 @@ func (bh *HandlerBackupXDR) run() { } func (bh *HandlerBackupXDR) backup(ctx context.Context) error { + // Count total records. + var err error + + bh.stats.TotalRecords, err = bh.infoClient.GetRecordCount(bh.config.Namespace, nil) + if err != nil { + return fmt.Errorf("failed to get records count: %w", err) + } + // Read workers. readWorkers, err := bh.readProcessor.newReadWorkersXDR(ctx) if err != nil { diff --git a/internal/asinfo/info_client.go b/internal/asinfo/info_client.go index 90e6a5bd..512c0d03 100644 --- a/internal/asinfo/info_client.go +++ b/internal/asinfo/info_client.go @@ -18,9 +18,11 @@ import ( "encoding/base64" "errors" "fmt" + "math" "regexp" "strconv" "strings" + "time" a "github.com/aerospike/aerospike-client-go/v7" "github.com/aerospike/backup-go/models" @@ -99,94 +101,164 @@ type aerospikeClient interface { } type InfoClient struct { - policy *a.InfoPolicy - cluster *a.Cluster + policy *a.InfoPolicy + cluster *a.Cluster + retryPolicy *models.RetryPolicy } -func NewInfoClientFromAerospike(aeroClient aerospikeClient, policy *a.InfoPolicy) *InfoClient { +func NewInfoClientFromAerospike(aeroClient aerospikeClient, policy *a.InfoPolicy, retryPolicy *models.RetryPolicy, +) *InfoClient { + if retryPolicy == nil { + retryPolicy = models.NewDefaultRetryPolicy() + } + return &InfoClient{ - cluster: aeroClient.Cluster(), - policy: policy, + cluster: aeroClient.Cluster(), + policy: policy, + retryPolicy: retryPolicy, } } func (ic *InfoClient) GetInfo(names ...string) (map[string]string, error) { - node, err := ic.cluster.GetRandomNode() - if err != nil { - return nil, err - } + var result map[string]string + + err := executeWithRetry(ic.retryPolicy, func() error { + node, err := ic.cluster.GetRandomNode() + if err != nil { + return err + } + + result, err = node.RequestInfo(ic.policy, names...) + + return err + }) - return node.RequestInfo(ic.policy, names...) + return result, err } func (ic *InfoClient) GetVersion() (AerospikeVersion, error) { - node, err := ic.cluster.GetRandomNode() - if err != nil { - return AerospikeVersion{}, err - } + var ( + version AerospikeVersion + err error + ) + + err = executeWithRetry(ic.retryPolicy, func() error { + node, aErr := ic.cluster.GetRandomNode() + if aErr != nil { + return aErr.Unwrap() + } + + version, err = getAerospikeVersion(node, ic.policy) + + return err + }) - return getAerospikeVersion(node, ic.policy) + return version, err } func (ic *InfoClient) GetSIndexes(namespace string) ([]*models.SIndex, error) { - node, err := ic.cluster.GetRandomNode() - if err != nil { - return nil, err - } + var ( + indexes []*models.SIndex + err error + ) - return getSIndexes(node, namespace, ic.policy) + err = executeWithRetry(ic.retryPolicy, func() error { + node, aErr := ic.cluster.GetRandomNode() + if aErr != nil { + return aErr.Unwrap() + } + + indexes, err = getSIndexes(node, namespace, ic.policy) + + return err + }) + + return indexes, err } func (ic *InfoClient) GetUDFs() ([]*models.UDF, error) { - node, err := ic.cluster.GetRandomNode() - if err != nil { - return nil, err - } + var ( + udfs []*models.UDF + err error + ) + + err = executeWithRetry(ic.retryPolicy, func() error { + node, aErr := ic.cluster.GetRandomNode() + if aErr != nil { + return aErr.Unwrap() + } + + udfs, err = getUDFs(node, ic.policy) - return getUDFs(node, ic.policy) + return err + }) + + return udfs, err } func (ic *InfoClient) SupportsBatchWrite() (bool, error) { - version, err := ic.GetVersion() - if err != nil { - return false, fmt.Errorf("failed to get aerospike version: %w", err) - } + var supports bool + + err := executeWithRetry(ic.retryPolicy, func() error { + version, err := ic.GetVersion() + if err != nil { + return fmt.Errorf("failed to get aerospike version: %w", err) + } + + supports = version.IsGreaterOrEqual(AerospikeVersionSupportsBatchWrites) + + return nil + }) - return version.IsGreaterOrEqual(AerospikeVersionSupportsBatchWrites), nil + return supports, err } // GetRecordCount counts number of records in given namespace and sets. func (ic *InfoClient) GetRecordCount(namespace string, sets []string) (uint64, error) { - node, aerr := ic.cluster.GetRandomNode() - if aerr != nil { - return 0, aerr - } - - effectiveReplicationFactor, err := getEffectiveReplicationFactor(node, ic.policy, namespace) - if err != nil { - return 0, err - } + var count uint64 - var recordsNumber uint64 - - for _, node := range ic.cluster.GetNodes() { - if !node.IsActive() { - continue + err := executeWithRetry(ic.retryPolicy, func() error { + node, aerr := ic.cluster.GetRandomNode() + if aerr != nil { + return aerr } - recordCountForNode, err := getRecordCountForNode(node, ic.policy, namespace, sets) + effectiveReplicationFactor, err := getEffectiveReplicationFactor(node, ic.policy, namespace) if err != nil { - return 0, err + return err } - recordsNumber += recordCountForNode - } + var recordsNumber uint64 + + for _, node := range ic.cluster.GetNodes() { + if !node.IsActive() { + continue + } + + recordCountForNode, err := getRecordCountForNode(node, ic.policy, namespace, sets) + if err != nil { + return err + } + + recordsNumber += recordCountForNode + } - return recordsNumber / uint64(effectiveReplicationFactor), nil + count = recordsNumber / uint64(effectiveReplicationFactor) + + return nil + }) + + return count, err } // StartXDR creates xdr config and starts replication. func (ic *InfoClient) StartXDR(dc, hostPort, namespace, rewind string) error { + return executeWithRetry(ic.retryPolicy, func() error { + return ic.startXDR(dc, hostPort, namespace, rewind) + }) +} + +func (ic *InfoClient) startXDR(dc, hostPort, namespace, rewind string) error { if err := ic.createXDRDC(dc); err != nil { return err } @@ -207,8 +279,13 @@ func (ic *InfoClient) StartXDR(dc, hostPort, namespace, rewind string) error { } // StopXDR disable replication and remove xdr config. - func (ic *InfoClient) StopXDR(dc string) error { + return executeWithRetry(ic.retryPolicy, func() error { + return ic.stopXDR(dc) + }) +} + +func (ic *InfoClient) stopXDR(dc string) error { if err := ic.deleteXDRDC(dc); err != nil { return err } @@ -293,6 +370,11 @@ func (ic *InfoClient) deleteXDRDC(dc string) error { // BlockMRTWrites blocks MRT writes on cluster. func (ic *InfoClient) BlockMRTWrites(namespace string) error { + return executeWithRetry(ic.retryPolicy, func() error { + return ic.blockMRTWrites(namespace) + }) +} +func (ic *InfoClient) blockMRTWrites(namespace string) error { cmd := fmt.Sprintf(cmdBlockMRTWrites, namespace) resp, err := ic.GetInfo(cmd) @@ -309,6 +391,11 @@ func (ic *InfoClient) BlockMRTWrites(namespace string) error { // UnBlockMRTWrites unblocks MRT writes on cluster. func (ic *InfoClient) UnBlockMRTWrites(namespace string) error { + return executeWithRetry(ic.retryPolicy, func() error { + return ic.unBlockMRTWrites(namespace) + }) +} +func (ic *InfoClient) unBlockMRTWrites(namespace string) error { cmd := fmt.Sprintf(cmdUnBlockMRTWrites, namespace) resp, err := ic.GetInfo(cmd) @@ -867,3 +954,26 @@ func parseUDFListResponse(resp string) ([]infoMap, error) { func parseUDFGetResponse(resp string) (infoMap, error) { return parseInfoObject(resp, ";", "=") } + +func executeWithRetry(policy *models.RetryPolicy, command func() error) error { + if policy == nil { + return fmt.Errorf("retry policy cannot be nil") + } + + var err error + for i := range policy.MaxRetries { + err = command() + if err == nil { + return nil + } + + duration := time.Duration(float64(policy.BaseTimeout) * math.Pow(policy.Multiplier, float64(i))) + time.Sleep(duration) + } + + if err != nil { + return fmt.Errorf("after %d attempts: %w", policy.MaxRetries, err) + } + + return nil +} diff --git a/internal/asinfo/info_client_test.go b/internal/asinfo/info_client_test.go index 5aa516b6..63f3bded 100644 --- a/internal/asinfo/info_client_test.go +++ b/internal/asinfo/info_client_test.go @@ -1713,14 +1713,13 @@ func Test_parseInfoObject(t *testing.T) { func TestInfoCommander_EnableDisableXDR(t *testing.T) { t.Parallel() - infoPolicy := a.NewInfoPolicy() asPolicy := a.NewClientPolicy() asPolicy.User = testASLoginPassword asPolicy.Password = testASLoginPassword client, aerr := a.NewClientWithPolicy(asPolicy, testASHost, testASPort) require.NoError(t, aerr) - ic := NewInfoClientFromAerospike(client, infoPolicy) + ic := NewInfoClientFromAerospike(client, a.NewInfoPolicy(), models.NewDefaultRetryPolicy()) err := ic.StartXDR(testASDC, testXDRHostPort, testASNamespace, testASRewind) require.NoError(t, err) @@ -1734,15 +1733,13 @@ func TestInfoCommander_EnableDisableXDR(t *testing.T) { func TestInfoCommander_BlockUnblockMRTWrites(t *testing.T) { t.Parallel() - - infoPolicy := a.NewInfoPolicy() asPolicy := a.NewClientPolicy() asPolicy.User = testASLoginPassword asPolicy.Password = testASLoginPassword client, aerr := a.NewClientWithPolicy(asPolicy, testASHost, testASPort) require.NoError(t, aerr) - ic := NewInfoClientFromAerospike(client, infoPolicy) + ic := NewInfoClientFromAerospike(client, a.NewInfoPolicy(), models.NewDefaultRetryPolicy()) err := ic.BlockMRTWrites(testASNamespace) require.NoError(t, err) diff --git a/models/retry_policy.go b/models/retry_policy.go index baf7cbc7..4e7720e1 100644 --- a/models/retry_policy.go +++ b/models/retry_policy.go @@ -38,3 +38,8 @@ func NewRetryPolicy(baseTimeout time.Duration, multiplier float64, maxRetries ui MaxRetries: maxRetries, } } + +// NewDefaultRetryPolicy returns a new RetryPolicy with default values. +func NewDefaultRetryPolicy() *RetryPolicy { + return NewRetryPolicy(1000*time.Millisecond, 1, 3) +} diff --git a/processor_record_writer.go b/processor_record_writer.go index 76891510..9268813e 100644 --- a/processor_record_writer.go +++ b/processor_record_writer.go @@ -84,7 +84,7 @@ func (rw *recordWriterProcessor[T]) useBatchWrites() (bool, error) { return false, nil } - infoClient := asinfo.NewInfoClientFromAerospike(rw.aerospikeClient, rw.config.InfoPolicy) + infoClient := asinfo.NewInfoClientFromAerospike(rw.aerospikeClient, rw.config.InfoPolicy, rw.config.InfoRetryPolicy) return infoClient.SupportsBatchWrite() } diff --git a/tests/test_client.go b/tests/test_client.go index 9ad28ba7..e97a2b15 100644 --- a/tests/test_client.go +++ b/tests/test_client.go @@ -40,9 +40,7 @@ type RecordMap map[digest]*a.Record // NewTestClient creates a new TestClient. func NewTestClient(asc *a.Client) *TestClient { - infoPolicy := a.NewInfoPolicy() - - infoClient := asinfo.NewInfoClientFromAerospike(asc, infoPolicy) + infoClient := asinfo.NewInfoClientFromAerospike(asc, a.NewInfoPolicy(), models.NewDefaultRetryPolicy()) return &TestClient{ asc: asc,