From fdc0fcebb72047bb0ea7a48e93d7b40af87c42d5 Mon Sep 17 00:00:00 2001 From: Dmitrii Neeman Date: Tue, 21 Jan 2025 17:11:11 +0200 Subject: [PATCH] APPS-1471-fix-dir-validation - review --- cmd/internal/app/asrestore.go | 5 +++-- cmd/internal/models/restore.go | 4 ++++ io/aerospike/xdr/tcp_server.go | 15 +++++++++------ porcessor_file_reader.go | 8 ++++++++ 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/cmd/internal/app/asrestore.go b/cmd/internal/app/asrestore.go index 79f020c1..c7f2eb29 100644 --- a/cmd/internal/app/asrestore.go +++ b/cmd/internal/app/asrestore.go @@ -196,8 +196,9 @@ func initializeRestoreReader(ctx context.Context, params *ASRestoreParams, sa *b return nil, nil, fmt.Errorf("failed to create asbx reader: %w", err) } - // Restore ASBX from list of dirs or input file is not supported. - if params.RestoreParams.InputFile == "" && params.RestoreParams.DirectoryList == "" { + // Restore ASBX from a list of dirs or input file is not supported. + // So we preprocess lists only for directory restore. + if params.RestoreParams.IsDirectoryRestore() { // Separate each file type for different lists. asbList, asbxList, err := prepareLists(ctx, params, reader) if err != nil { diff --git a/cmd/internal/models/restore.go b/cmd/internal/models/restore.go index 4e541598..c989295a 100644 --- a/cmd/internal/models/restore.go +++ b/cmd/internal/models/restore.go @@ -40,3 +40,7 @@ type Restore struct { Mode string } + +func (restore *Restore) IsDirectoryRestore() bool { + return restore.DirectoryList == "" && restore.InputFile == "" +} diff --git a/io/aerospike/xdr/tcp_server.go b/io/aerospike/xdr/tcp_server.go index b16ef37f..261fe53d 100644 --- a/io/aerospike/xdr/tcp_server.go +++ b/io/aerospike/xdr/tcp_server.go @@ -93,8 +93,8 @@ type TCPServer struct { cancel context.CancelFunc // Results will be sent here. - once sync.Once resultChan chan *models.ASBXToken + isActive atomic.Bool logger *slog.Logger } @@ -139,6 +139,8 @@ func (s *TCPServer) Start(ctx context.Context) (chan *models.ASBXToken, error) { slog.String("address", s.config.Address), slog.Bool("tls", s.config.TLSConfig != nil)) + s.isActive.Store(true) + return s.resultChan, nil } @@ -154,9 +156,10 @@ func (s *TCPServer) Stop() { s.wg.Wait() - s.once.Do(func() { + if s.isActive.Load() { + s.isActive.Store(false) close(s.resultChan) - }) + } s.logger.Info("server shutdown complete") } @@ -343,7 +346,7 @@ func (h *ConnectionHandler) handleMessages(ctx context.Context) { switch { case err == nil: - // ok. + // ok. case errors.Is(err, io.EOF): // do nothing, wait for the next message. continue @@ -383,9 +386,9 @@ func (h *ConnectionHandler) processMessage(ctx context.Context) { switch { case err == nil: - // ok + // ok case errors.Is(err, errSkipRecord): - // Make acknowledgement and skip record. + // Send acknowledgement and skip record. h.ackQueue <- h.ackMsgSuccess continue default: diff --git a/porcessor_file_reader.go b/porcessor_file_reader.go index 7210d8fd..2c5f1488 100644 --- a/porcessor_file_reader.go +++ b/porcessor_file_reader.go @@ -22,6 +22,7 @@ import ( "strconv" "strings" + "github.com/aerospike/backup-go/io/encoding/asbx" "github.com/aerospike/backup-go/io/encryption" "github.com/aerospike/backup-go/models" "github.com/aerospike/backup-go/pipeline" @@ -165,7 +166,14 @@ func distributeFiles(input chan models.File, output []chan models.File, errors c return } + validator := asbx.NewValidator() + for file := range input { + // Skip non asbx files. + if err := validator.Run(file.Name); err != nil { + continue + } + parts := strings.SplitN(file.Name, "_", 2) num, err := strconv.Atoi(parts[0])