Skip to content

Commit

Permalink
generalize pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
dwelch-spike committed Feb 26, 2024
1 parent 25c8e50 commit d7d219e
Show file tree
Hide file tree
Showing 20 changed files with 1,329 additions and 843 deletions.
42 changes: 29 additions & 13 deletions backup_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

datahandlers "github.com/aerospike/aerospike-tools-backup-lib/data_handlers"
"github.com/aerospike/aerospike-tools-backup-lib/encoding/asb"
"github.com/aerospike/aerospike-tools-backup-lib/pipeline"

a "github.com/aerospike/aerospike-client-go/v7"
)
Expand Down Expand Up @@ -54,8 +55,8 @@ func newBackupHandler(config *BackupBaseConfig, ac *a.Client, namespace string)
}

// TODO change the any typed pipeline to a message or token type
func (bh *backupHandler) run(writers []datahandlers.Writer[any]) error {
readers := make([]datahandlers.Reader[any], bh.config.Parallel)
func (bh *backupHandler) run(writers []*datahandlers.WriteWorker) error {
readWorkers := make([]pipeline.Worker[any], bh.config.Parallel)
for i := 0; i < bh.config.Parallel; i++ {
begin := (i * PARTITIONS) / bh.config.Parallel
count := PARTITIONS / bh.config.Parallel // TODO verify no off by 1 error
Expand All @@ -72,20 +73,25 @@ func (bh *backupHandler) run(writers []datahandlers.Writer[any]) error {
bh.aerospikeClient,
)

readers[i] = dataReader
readWorkers[i] = datahandlers.NewReadWorker[any](dataReader)
}

// TODO change the any typed pipeline to a message or token type
processors := make([]datahandlers.Processor[any], bh.config.Parallel)
processorWorkers := make([]pipeline.Worker[any], bh.config.Parallel)
for i := 0; i < bh.config.Parallel; i++ {
processor := datahandlers.NewNOOPProcessor()
processors[i] = processor
processorWorkers[i] = datahandlers.NewProcessorWorker(processor)
}

job := datahandlers.NewDataPipeline(
readers,
processors,
writers,
writeWorkers := make([]pipeline.Worker[any], len(writers))
for i, w := range writers {
writeWorkers[i] = w
}

job := pipeline.NewPipeline(
readWorkers,
processorWorkers,
writeWorkers,
)

return bh.worker.DoJob(job)
Expand Down Expand Up @@ -133,7 +139,7 @@ func (bwh *BackupToWriterHandler) run(writers []io.Writer) {

batchSize := bwh.config.Parallel
// TODO change the any typed pipeline to a message or token type
dataWriters := []datahandlers.Writer[any]{}
dataWriters := []*datahandlers.WriteWorker{}

for i, writer := range writers {

Expand Down Expand Up @@ -174,7 +180,7 @@ func (bwh *BackupToWriterHandler) Wait() error {
}

// TODO change the any typed pipeline to a message or token type
func getDataWriter(eb EncoderBuilder, w io.Writer, namespace string, first bool) (datahandlers.Writer[any], error) {
func getDataWriter(eb EncoderBuilder, w io.Writer, namespace string, first bool) (*datahandlers.WriteWorker, error) {
enc, err := eb.CreateEncoder()
if err != nil {
return nil, err
Expand All @@ -184,8 +190,18 @@ func getDataWriter(eb EncoderBuilder, w io.Writer, namespace string, first bool)
case *asb.Encoder:
asbw := datahandlers.NewASBWriter(encT, w)
err := asbw.Init(namespace, first)
return asbw, err
if err != nil {
return nil, err
}

worker := datahandlers.NewWriteWorker(asbw)

return worker, err

default:
return datahandlers.NewGenericWriter(encT, w), nil
gw := datahandlers.NewGenericWriter(encT, w)
worker := datahandlers.NewWriteWorker(gw)

return worker, nil
}
}
90 changes: 90 additions & 0 deletions data_handlers/mocks/DataProcessor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 121 additions & 0 deletions data_handlers/mocks/DataReader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d7d219e

Please sign in to comment.