Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-651 Update documentation and improve test coverage #212

Merged
merged 8 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
561 changes: 476 additions & 85 deletions README.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type Client struct {
type ClientOpt func(*Client)

// WithID sets the ID for the [Client].
// This ID is used for logging purposes.
func WithID(id string) ClientOpt {
return func(c *Client) {
c.id = id
Expand Down
222 changes: 117 additions & 105 deletions cmd/asbackup/readme.md

Large diffs are not rendered by default.

15 changes: 0 additions & 15 deletions cmd/internal/app/asrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,21 +189,6 @@ func initializeRestoreReader(ctx context.Context, params *ASRestoreParams, sa *b
return nil, nil, fmt.Errorf("failed to create asbx reader: %w", err)
}

// List all files first.
list, err := xdrReader.ListObjects(ctx, params.CommonParams.Directory)
if err != nil {
return nil, nil, fmt.Errorf("failed to list asbx files: %w", err)
}

// Sort files.
list, err = util.SortBackupFiles(list)
if err != nil {
return nil, nil, fmt.Errorf("failed to sort asbx files: %w", err)
}

// Load ASBX files for reading.
xdrReader.SetObjectsToStream(list)

return nil, xdrReader, nil
case models.RestoreModeAuto:
reader, err = newReader(ctx, params, sa, false)
Expand Down
22 changes: 14 additions & 8 deletions cmd/internal/app/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,17 @@ func newReader(

return newAzureReader(ctx, params.AzureBlob, directory, inputFile, parentDirectory, directoryList, isXdr)
default:
return newLocalReader(directory, inputFile, parentDirectory, directoryList, isXdr)
return newLocalReader(ctx, directory, inputFile, parentDirectory, directoryList, isXdr)
}
}

func newLocalReader(directory, inputFile, parentDirectory, directoryList string, isXDR bool,
func newLocalReader(
ctx context.Context,
directory,
inputFile,
parentDirectory,
directoryList string,
isXDR bool,
) (backup.StreamingReader, error) {
opts := make([]local.Opt, 0)

Expand All @@ -72,9 +78,9 @@ func newLocalReader(directory, inputFile, parentDirectory, directoryList string,
opts = append(opts, local.WithDir(directory), local.WithSkipDirCheck())
// Append Validator only for directory.
if isXDR {
opts = append(opts, local.WithValidator(asbx.NewValidator()), local.WithSorted(local.SortAsc))
opts = append(opts, local.WithValidator(asbx.NewValidator()), local.WithSorting())
// ASBX restore supports only restore from directory.
return local.NewReader(opts...)
return local.NewReader(ctx, opts...)
} else {
opts = append(opts, local.WithValidator(asb.NewValidator()))
}
Expand All @@ -85,7 +91,7 @@ func newLocalReader(directory, inputFile, parentDirectory, directoryList string,
opts = append(opts, local.WithDirList(dirList))
}

return local.NewReader(opts...)
return local.NewReader(ctx, opts...)
}

//nolint:dupl // This code is not duplicated, it is a different initialization.
Expand All @@ -111,7 +117,7 @@ func newS3Reader(
opts = append(opts, s3.WithDir(directory), s3.WithSkipDirCheck())
// Append Validator only for directory.
if isXDR {
opts = append(opts, s3.WithValidator(asbx.NewValidator()), s3.WithSorted(s3.SortAsc))
opts = append(opts, s3.WithValidator(asbx.NewValidator()), s3.WithSorting())
// ASBX restore supports only restore from directory.
return s3.NewReader(ctx, client, a.BucketName, opts...)
} else {
Expand Down Expand Up @@ -150,7 +156,7 @@ func newGcpReader(
opts = append(opts, storage.WithDir(directory), storage.WithSkipDirCheck())
// Append Validator only for directory.
if isXDR {
opts = append(opts, storage.WithValidator(asbx.NewValidator()), storage.WithSorted(storage.SortAsc))
opts = append(opts, storage.WithValidator(asbx.NewValidator()), storage.WithSorting())
// ASBX restore supports only restore from directory.
return storage.NewReader(ctx, client, g.BucketName, opts...)
} else {
Expand Down Expand Up @@ -188,7 +194,7 @@ func newAzureReader(
opts = append(opts, blob.WithDir(directory), blob.WithSkipDirCheck())
// Append Validator only for directory.
if isXDR {
opts = append(opts, blob.WithValidator(asbx.NewValidator()), blob.WithSorted(blob.SortAsc))
opts = append(opts, blob.WithValidator(asbx.NewValidator()), blob.WithSorting())
// ASBX restore supports only restore from directory.
return blob.NewReader(ctx, client, a.ContainerName, opts...)
} else {
Expand Down
9 changes: 9 additions & 0 deletions config_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ type ConfigRestore struct {
// Secret agent config.
SecretAgentConfig *SecretAgentConfig
// The sets to restore (optional, given an empty list, all sets will be restored).
// Not applicable for XDR restore.
SetList []string
// The bins to restore (optional, given an empty list, all bins will be restored).
// Not applicable for XDR restore.
BinList []string
// EncoderType describes an Encoder type that will be used on restoring.
// Default `EncoderTypeASB` = 0.
Expand All @@ -59,21 +61,28 @@ type ConfigRestore struct {
// Don't restore any records.
NoRecords bool
// Don't restore any secondary indexes.
// Not applicable for XDR restore.
NoIndexes bool
// Don't restore any UDFs.
// Not applicable for XDR restore.
NoUDFs bool
// Disables the use of batch writes when restoring records to the Aerospike cluster.
// Not applicable for XDR restore.
DisableBatchWrites bool
// The max allowed number of records per batch write call.
// Not applicable for XDR restore.
BatchSize int
// Max number of parallel writers to target AS cluster.
// Not applicable for XDR restore.
MaxAsyncBatches int
// Amount of extra time-to-live to add to records that have expirable void-times.
// Must be set in seconds.
// Not applicable for XDR restore.
ExtraTTL int64
// Ignore permanent record-specific error.
// E.g.: AEROSPIKE_RECORD_TOO_BIG.
// By default, such errors are not ignored and restore terminates.
// Not applicable for XDR restore.
IgnoreRecordError bool
// Retry policy for info commands.
InfoRetryPolicy *models.RetryPolicy
Expand Down
14 changes: 7 additions & 7 deletions examples/aws/s3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func initBackupClient() *backup.Client {

backupClient, err := backup.NewClient(aerospikeClient, backup.WithID("client_id"))
if err != nil {
panic(err)
log.Fatal(err)
}

return backupClient
Expand All @@ -76,7 +76,7 @@ func runBackup(ctx context.Context, c *backup.Client) {
"http://localhost:9000",
)
if err != nil {
panic(err)
log.Fatal(err)
}

writers, err := s3Storasge.NewWriter(
Expand All @@ -87,7 +87,7 @@ func runBackup(ctx context.Context, c *backup.Client) {
s3Storasge.WithRemoveFiles(),
)
if err != nil {
panic(err)
log.Fatal(err)
}

backupCfg := backup.NewDefaultBackupConfig()
Expand All @@ -99,7 +99,7 @@ func runBackup(ctx context.Context, c *backup.Client) {

backupHandler, err := c.Backup(ctx, backupCfg, writers, nil)
if err != nil {
panic(err)
log.Fatal(err)
}

// Use backupHandler.Wait(ctx) to wait for the job to finish or fail.
Expand All @@ -121,7 +121,7 @@ func runRestore(ctx context.Context, c *backup.Client) {
"http://localhost:9000",
)
if err != nil {
panic(err)
log.Fatal(err)
}

reader, err := s3Storasge.NewReader(
Expand All @@ -132,7 +132,7 @@ func runRestore(ctx context.Context, c *backup.Client) {
s3Storasge.WithValidator(asb.NewValidator()),
)
if err != nil {
panic(err)
log.Fatal(err)
}

source := dbNameSource
Expand All @@ -149,7 +149,7 @@ func runRestore(ctx context.Context, c *backup.Client) {

restoreHandler, err := c.Restore(ctx, restoreCfg, reader)
if err != nil {
panic(err)
log.Fatal(err)
}

// Use restoreHandler.Wait(ctx) to wait for the job to finish or fail.
Expand Down
18 changes: 9 additions & 9 deletions examples/azure/blob/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func initBackupClient() *backup.Client {

backupClient, err := backup.NewClient(aerospikeClient, backup.WithID("client_id"))
if err != nil {
panic(err)
log.Fatal(err)
}

return backupClient
Expand All @@ -74,12 +74,12 @@ func initBackupClient() *backup.Client {
func runBackup(ctx context.Context, c *backup.Client) {
cred, err := azblob.NewSharedKeyCredential(azureAccountName, azureAccountKey)
if err != nil {
panic(err)
log.Fatal(err)
}

client, err := azblob.NewClientWithSharedKeyCredential(azureAddress, cred, nil)
if err != nil {
panic(err)
log.Fatal(err)
}

// For backup to single file use gcpStorage.WithFile(fileName)
Expand All @@ -91,7 +91,7 @@ func runBackup(ctx context.Context, c *backup.Client) {
blob.WithRemoveFiles(),
)
if err != nil {
panic(err)
log.Fatal(err)
}

backupCfg := backup.NewDefaultBackupConfig()
Expand All @@ -103,7 +103,7 @@ func runBackup(ctx context.Context, c *backup.Client) {

backupHandler, err := c.Backup(ctx, backupCfg, writers, nil)
if err != nil {
panic(err)
log.Fatal(err)
}

// use backupHandler.Wait() to wait for the job to finish or fail.
Expand All @@ -120,12 +120,12 @@ func runBackup(ctx context.Context, c *backup.Client) {
func runRestore(ctx context.Context, c *backup.Client) {
cred, err := azblob.NewSharedKeyCredential(azureAccountName, azureAccountKey)
if err != nil {
panic(err)
log.Fatal(err)
}

client, err := azblob.NewClientWithSharedKeyCredential(azureAddress, cred, nil)
if err != nil {
panic(err)
log.Fatal(err)
}

// For restore from single file use gcpStorage.WithFile(fileName)
Expand All @@ -137,7 +137,7 @@ func runRestore(ctx context.Context, c *backup.Client) {
blob.WithValidator(asb.NewValidator()),
)
if err != nil {
panic(err)
log.Fatal(err)
}

source := dbNameSource
Expand All @@ -154,7 +154,7 @@ func runRestore(ctx context.Context, c *backup.Client) {

restoreHandler, err := c.Restore(ctx, restoreCfg, readers)
if err != nil {
panic(err)
log.Fatal(err)
}

// use restoreHandler.Wait() to wait for the job to finish or fail.
Expand Down
14 changes: 7 additions & 7 deletions examples/gcp/storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func initBackupClient() *backup.Client {

backupClient, err := backup.NewClient(aerospikeClient, backup.WithID("client_id"))
if err != nil {
panic(err)
log.Fatal(err)
}

return backupClient
Expand All @@ -70,7 +70,7 @@ func initBackupClient() *backup.Client {
func runBackup(ctx context.Context, c *backup.Client) {
client, err := storage.NewClient(ctx)
if err != nil {
panic(err)
log.Fatal(err)
}

// For backup to single file use gcpStorage.WithFile(fileName)
Expand All @@ -82,7 +82,7 @@ func runBackup(ctx context.Context, c *backup.Client) {
gcpStorage.WithRemoveFiles(),
)
if err != nil {
panic(err)
log.Fatal(err)
}

backupCfg := backup.NewDefaultBackupConfig()
Expand All @@ -94,7 +94,7 @@ func runBackup(ctx context.Context, c *backup.Client) {

backupHandler, err := c.Backup(ctx, backupCfg, writers, nil)
if err != nil {
panic(err)
log.Fatal(err)
}

// use backupHandler.Wait() to wait for the job to finish or fail.
Expand All @@ -111,7 +111,7 @@ func runBackup(ctx context.Context, c *backup.Client) {
func runRestore(ctx context.Context, c *backup.Client) {
client, err := storage.NewClient(ctx)
if err != nil {
panic(err)
log.Fatal(err)
}

// For restore from single file use gcpStorage.WithFile(fileName)
Expand All @@ -123,7 +123,7 @@ func runRestore(ctx context.Context, c *backup.Client) {
gcpStorage.WithValidator(asb.NewValidator()),
)
if err != nil {
panic(err)
log.Fatal(err)
}

source := dbNameSource
Expand All @@ -140,7 +140,7 @@ func runRestore(ctx context.Context, c *backup.Client) {

restoreHandler, err := c.Restore(ctx, restoreCfg, readers)
if err != nil {
panic(err)
log.Fatal(err)
}

// use restoreHandler.Wait() to wait for the job to finish or fail.
Expand Down
14 changes: 8 additions & 6 deletions examples/readme/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {

backupClient, err := backup.NewClient(aerospikeClient, backup.WithID("client_id"))
if err != nil {
panic(err)
log.Fatal(err)
}

ctx := context.Background()
Expand All @@ -44,16 +44,17 @@ func main() {
local.WithDir("backups_folder"),
)
if err != nil {
panic(err)
log.Fatal(err)
}

backupCfg := backup.NewDefaultBackupConfig()
backupCfg.Namespace = "test"
backupCfg.ParallelRead = 5
backupCfg.ParallelRead = 10
backupCfg.ParallelWrite = 10

backupHandler, err := backupClient.Backup(ctx, backupCfg, writers, nil)
if err != nil {
panic(err)
log.Fatal(err)
}

// Use backupHandler.Wait(ctx) to wait for the job to finish or fail.
Expand All @@ -69,16 +70,17 @@ func main() {

// For restore from single file use local.WithFile(fileName)
reader, err := local.NewReader(
ctx,
local.WithDir("backups_folder"),
local.WithValidator(asb.NewValidator()),
)
if err != nil {
panic(err)
log.Fatal(err)
}

restoreHandler, err := backupClient.Restore(ctx, restoreCfg, reader)
if err != nil {
panic(err)
log.Fatal(err)
}

// Use restoreHandler.Wait(ctx) to wait for the job to finish or fail.
Expand Down
Loading