Skip to content

Commit

Permalink
APPS-1471-fix-dir-validation
Browse files Browse the repository at this point in the history
- review
  • Loading branch information
filkeith committed Jan 21, 2025
1 parent b941b74 commit fdc0fce
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 8 deletions.
5 changes: 3 additions & 2 deletions cmd/internal/app/asrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,9 @@ func initializeRestoreReader(ctx context.Context, params *ASRestoreParams, sa *b
return nil, nil, fmt.Errorf("failed to create asbx reader: %w", err)
}

// Restore ASBX from list of dirs or input file is not supported.
if params.RestoreParams.InputFile == "" && params.RestoreParams.DirectoryList == "" {
// Restore ASBX from a list of dirs or input file is not supported.
// So we preprocess lists only for directory restore.
if params.RestoreParams.IsDirectoryRestore() {
// Separate each file type for different lists.
asbList, asbxList, err := prepareLists(ctx, params, reader)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/internal/models/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ type Restore struct {

Mode string
}

func (restore *Restore) IsDirectoryRestore() bool {
return restore.DirectoryList == "" && restore.InputFile == ""
}
15 changes: 9 additions & 6 deletions io/aerospike/xdr/tcp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ type TCPServer struct {
cancel context.CancelFunc

// Results will be sent here.
once sync.Once
resultChan chan *models.ASBXToken
isActive atomic.Bool

logger *slog.Logger
}
Expand Down Expand Up @@ -139,6 +139,8 @@ func (s *TCPServer) Start(ctx context.Context) (chan *models.ASBXToken, error) {
slog.String("address", s.config.Address),
slog.Bool("tls", s.config.TLSConfig != nil))

s.isActive.Store(true)

return s.resultChan, nil
}

Expand All @@ -154,9 +156,10 @@ func (s *TCPServer) Stop() {

s.wg.Wait()

s.once.Do(func() {
if s.isActive.Load() {
s.isActive.Store(false)
close(s.resultChan)
})
}

s.logger.Info("server shutdown complete")
}
Expand Down Expand Up @@ -343,7 +346,7 @@ func (h *ConnectionHandler) handleMessages(ctx context.Context) {

switch {
case err == nil:
// ok.
// ok.
case errors.Is(err, io.EOF):
// do nothing, wait for the next message.
continue
Expand Down Expand Up @@ -383,9 +386,9 @@ func (h *ConnectionHandler) processMessage(ctx context.Context) {

switch {
case err == nil:
// ok
// ok
case errors.Is(err, errSkipRecord):
// Make acknowledgement and skip record.
// Send acknowledgement and skip record.
h.ackQueue <- h.ackMsgSuccess
continue
default:
Expand Down
8 changes: 8 additions & 0 deletions porcessor_file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"
"strings"

"github.com/aerospike/backup-go/io/encoding/asbx"
"github.com/aerospike/backup-go/io/encryption"
"github.com/aerospike/backup-go/models"
"github.com/aerospike/backup-go/pipeline"
Expand Down Expand Up @@ -165,7 +166,14 @@ func distributeFiles(input chan models.File, output []chan models.File, errors c
return
}

validator := asbx.NewValidator()

for file := range input {
// Skip non asbx files.
if err := validator.Run(file.Name); err != nil {
continue
}

parts := strings.SplitN(file.Name, "_", 2)

num, err := strconv.Atoi(parts[0])
Expand Down

0 comments on commit fdc0fce

Please sign in to comment.