diff --git a/backup_handlers.go b/backup_handlers.go index eccd80ad..a711f2c5 100644 --- a/backup_handlers.go +++ b/backup_handlers.go @@ -19,6 +19,7 @@ import ( datahandlers "github.com/aerospike/aerospike-tools-backup-lib/data_handlers" "github.com/aerospike/aerospike-tools-backup-lib/encoding/asb" + "github.com/aerospike/aerospike-tools-backup-lib/pipeline" a "github.com/aerospike/aerospike-client-go/v7" ) @@ -54,8 +55,8 @@ func newBackupHandler(config *BackupBaseConfig, ac *a.Client, namespace string) } // TODO change the any typed pipeline to a message or token type -func (bh *backupHandler) run(writers []datahandlers.Writer[any]) error { - readers := make([]datahandlers.Reader[any], bh.config.Parallel) +func (bh *backupHandler) run(writers []*datahandlers.WriteWorker) error { + readWorkers := make([]pipeline.Worker[any], bh.config.Parallel) for i := 0; i < bh.config.Parallel; i++ { begin := (i * PARTITIONS) / bh.config.Parallel count := PARTITIONS / bh.config.Parallel // TODO verify no off by 1 error @@ -72,20 +73,25 @@ func (bh *backupHandler) run(writers []datahandlers.Writer[any]) error { bh.aerospikeClient, ) - readers[i] = dataReader + readWorkers[i] = datahandlers.NewReadWorker[any](dataReader) } // TODO change the any typed pipeline to a message or token type - processors := make([]datahandlers.Processor[any], bh.config.Parallel) + processorWorkers := make([]pipeline.Worker[any], bh.config.Parallel) for i := 0; i < bh.config.Parallel; i++ { processor := datahandlers.NewNOOPProcessor() - processors[i] = processor + processorWorkers[i] = datahandlers.NewProcessorWorker(processor) } - job := datahandlers.NewDataPipeline( - readers, - processors, - writers, + writeWorkers := make([]pipeline.Worker[any], len(writers)) + for i, w := range writers { + writeWorkers[i] = w + } + + job := pipeline.NewPipeline( + readWorkers, + processorWorkers, + writeWorkers, ) return bh.worker.DoJob(job) @@ -133,7 +139,7 @@ func (bwh *BackupToWriterHandler) run(writers []io.Writer) { batchSize := bwh.config.Parallel // TODO change the any typed pipeline to a message or token type - dataWriters := []datahandlers.Writer[any]{} + dataWriters := []*datahandlers.WriteWorker{} for i, writer := range writers { @@ -174,7 +180,7 @@ func (bwh *BackupToWriterHandler) Wait() error { } // TODO change the any typed pipeline to a message or token type -func getDataWriter(eb EncoderBuilder, w io.Writer, namespace string, first bool) (datahandlers.Writer[any], error) { +func getDataWriter(eb EncoderBuilder, w io.Writer, namespace string, first bool) (*datahandlers.WriteWorker, error) { enc, err := eb.CreateEncoder() if err != nil { return nil, err @@ -184,8 +190,18 @@ func getDataWriter(eb EncoderBuilder, w io.Writer, namespace string, first bool) case *asb.Encoder: asbw := datahandlers.NewASBWriter(encT, w) err := asbw.Init(namespace, first) - return asbw, err + if err != nil { + return nil, err + } + + worker := datahandlers.NewWriteWorker(asbw) + + return worker, err + default: - return datahandlers.NewGenericWriter(encT, w), nil + gw := datahandlers.NewGenericWriter(encT, w) + worker := datahandlers.NewWriteWorker(gw) + + return worker, nil } } diff --git a/data_handlers/mocks/DataProcessor.go b/data_handlers/mocks/DataProcessor.go new file mode 100644 index 00000000..268f13c5 --- /dev/null +++ b/data_handlers/mocks/DataProcessor.go @@ -0,0 +1,90 @@ +// Code generated by mockery v2.41.0. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// DataProcessor is an autogenerated mock type for the DataProcessor type +type DataProcessor struct { + mock.Mock +} + +type DataProcessor_Expecter struct { + mock *mock.Mock +} + +func (_m *DataProcessor) EXPECT() *DataProcessor_Expecter { + return &DataProcessor_Expecter{mock: &_m.Mock} +} + +// Process provides a mock function with given fields: _a0 +func (_m *DataProcessor) Process(_a0 interface{}) (interface{}, error) { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for Process") + } + + var r0 interface{} + var r1 error + if rf, ok := ret.Get(0).(func(interface{}) (interface{}, error)); ok { + return rf(_a0) + } + if rf, ok := ret.Get(0).(func(interface{}) interface{}); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(interface{}) + } + } + + if rf, ok := ret.Get(1).(func(interface{}) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DataProcessor_Process_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Process' +type DataProcessor_Process_Call struct { + *mock.Call +} + +// Process is a helper method to define mock.On call +// - _a0 interface{} +func (_e *DataProcessor_Expecter) Process(_a0 interface{}) *DataProcessor_Process_Call { + return &DataProcessor_Process_Call{Call: _e.mock.On("Process", _a0)} +} + +func (_c *DataProcessor_Process_Call) Run(run func(_a0 interface{})) *DataProcessor_Process_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(interface{})) + }) + return _c +} + +func (_c *DataProcessor_Process_Call) Return(_a0 interface{}, _a1 error) *DataProcessor_Process_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *DataProcessor_Process_Call) RunAndReturn(run func(interface{}) (interface{}, error)) *DataProcessor_Process_Call { + _c.Call.Return(run) + return _c +} + +// NewDataProcessor creates a new instance of DataProcessor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDataProcessor(t interface { + mock.TestingT + Cleanup(func()) +}) *DataProcessor { + mock := &DataProcessor{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/data_handlers/mocks/DataReader.go b/data_handlers/mocks/DataReader.go new file mode 100644 index 00000000..89e6a5d0 --- /dev/null +++ b/data_handlers/mocks/DataReader.go @@ -0,0 +1,121 @@ +// Code generated by mockery v2.41.0. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// DataReader is an autogenerated mock type for the DataReader type +type DataReader struct { + mock.Mock +} + +type DataReader_Expecter struct { + mock *mock.Mock +} + +func (_m *DataReader) EXPECT() *DataReader_Expecter { + return &DataReader_Expecter{mock: &_m.Mock} +} + +// Cancel provides a mock function with given fields: +func (_m *DataReader) Cancel() { + _m.Called() +} + +// DataReader_Cancel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Cancel' +type DataReader_Cancel_Call struct { + *mock.Call +} + +// Cancel is a helper method to define mock.On call +func (_e *DataReader_Expecter) Cancel() *DataReader_Cancel_Call { + return &DataReader_Cancel_Call{Call: _e.mock.On("Cancel")} +} + +func (_c *DataReader_Cancel_Call) Run(run func()) *DataReader_Cancel_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *DataReader_Cancel_Call) Return() *DataReader_Cancel_Call { + _c.Call.Return() + return _c +} + +func (_c *DataReader_Cancel_Call) RunAndReturn(run func()) *DataReader_Cancel_Call { + _c.Call.Return(run) + return _c +} + +// Read provides a mock function with given fields: +func (_m *DataReader) Read() (interface{}, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Read") + } + + var r0 interface{} + var r1 error + if rf, ok := ret.Get(0).(func() (interface{}, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() interface{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(interface{}) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DataReader_Read_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Read' +type DataReader_Read_Call struct { + *mock.Call +} + +// Read is a helper method to define mock.On call +func (_e *DataReader_Expecter) Read() *DataReader_Read_Call { + return &DataReader_Read_Call{Call: _e.mock.On("Read")} +} + +func (_c *DataReader_Read_Call) Run(run func()) *DataReader_Read_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *DataReader_Read_Call) Return(_a0 interface{}, _a1 error) *DataReader_Read_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *DataReader_Read_Call) RunAndReturn(run func() (interface{}, error)) *DataReader_Read_Call { + _c.Call.Return(run) + return _c +} + +// NewDataReader creates a new instance of DataReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDataReader(t interface { + mock.TestingT + Cleanup(func()) +}) *DataReader { + mock := &DataReader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/data_handlers/mocks/DataWriter.go b/data_handlers/mocks/DataWriter.go new file mode 100644 index 00000000..0a1aede7 --- /dev/null +++ b/data_handlers/mocks/DataWriter.go @@ -0,0 +1,110 @@ +// Code generated by mockery v2.41.0. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// DataWriter is an autogenerated mock type for the DataWriter type +type DataWriter struct { + mock.Mock +} + +type DataWriter_Expecter struct { + mock *mock.Mock +} + +func (_m *DataWriter) EXPECT() *DataWriter_Expecter { + return &DataWriter_Expecter{mock: &_m.Mock} +} + +// Cancel provides a mock function with given fields: +func (_m *DataWriter) Cancel() { + _m.Called() +} + +// DataWriter_Cancel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Cancel' +type DataWriter_Cancel_Call struct { + *mock.Call +} + +// Cancel is a helper method to define mock.On call +func (_e *DataWriter_Expecter) Cancel() *DataWriter_Cancel_Call { + return &DataWriter_Cancel_Call{Call: _e.mock.On("Cancel")} +} + +func (_c *DataWriter_Cancel_Call) Run(run func()) *DataWriter_Cancel_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *DataWriter_Cancel_Call) Return() *DataWriter_Cancel_Call { + _c.Call.Return() + return _c +} + +func (_c *DataWriter_Cancel_Call) RunAndReturn(run func()) *DataWriter_Cancel_Call { + _c.Call.Return(run) + return _c +} + +// Write provides a mock function with given fields: _a0 +func (_m *DataWriter) Write(_a0 interface{}) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for Write") + } + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DataWriter_Write_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Write' +type DataWriter_Write_Call struct { + *mock.Call +} + +// Write is a helper method to define mock.On call +// - _a0 interface{} +func (_e *DataWriter_Expecter) Write(_a0 interface{}) *DataWriter_Write_Call { + return &DataWriter_Write_Call{Call: _e.mock.On("Write", _a0)} +} + +func (_c *DataWriter_Write_Call) Run(run func(_a0 interface{})) *DataWriter_Write_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(interface{})) + }) + return _c +} + +func (_c *DataWriter_Write_Call) Return(_a0 error) *DataWriter_Write_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *DataWriter_Write_Call) RunAndReturn(run func(interface{}) error) *DataWriter_Write_Call { + _c.Call.Return(run) + return _c +} + +// NewDataWriter creates a new instance of DataWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDataWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *DataWriter { + mock := &DataWriter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/data_handlers/mocks/Processor.go b/data_handlers/mocks/Processor.go deleted file mode 100644 index af604462..00000000 --- a/data_handlers/mocks/Processor.go +++ /dev/null @@ -1,88 +0,0 @@ -// Code generated by mockery v2.41.0. DO NOT EDIT. - -package mocks - -import mock "github.com/stretchr/testify/mock" - -// Processor is an autogenerated mock type for the Processor type -type Processor[T interface{}] struct { - mock.Mock -} - -type Processor_Expecter[T interface{}] struct { - mock *mock.Mock -} - -func (_m *Processor[T]) EXPECT() *Processor_Expecter[T] { - return &Processor_Expecter[T]{mock: &_m.Mock} -} - -// Process provides a mock function with given fields: _a0 -func (_m *Processor[T]) Process(_a0 T) (T, error) { - ret := _m.Called(_a0) - - if len(ret) == 0 { - panic("no return value specified for Process") - } - - var r0 T - var r1 error - if rf, ok := ret.Get(0).(func(T) (T, error)); ok { - return rf(_a0) - } - if rf, ok := ret.Get(0).(func(T) T); ok { - r0 = rf(_a0) - } else { - r0 = ret.Get(0).(T) - } - - if rf, ok := ret.Get(1).(func(T) error); ok { - r1 = rf(_a0) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Processor_Process_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Process' -type Processor_Process_Call[T interface{}] struct { - *mock.Call -} - -// Process is a helper method to define mock.On call -// - _a0 T -func (_e *Processor_Expecter[T]) Process(_a0 interface{}) *Processor_Process_Call[T] { - return &Processor_Process_Call[T]{Call: _e.mock.On("Process", _a0)} -} - -func (_c *Processor_Process_Call[T]) Run(run func(_a0 T)) *Processor_Process_Call[T] { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(T)) - }) - return _c -} - -func (_c *Processor_Process_Call[T]) Return(_a0 T, _a1 error) *Processor_Process_Call[T] { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *Processor_Process_Call[T]) RunAndReturn(run func(T) (T, error)) *Processor_Process_Call[T] { - _c.Call.Return(run) - return _c -} - -// NewProcessor creates a new instance of Processor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewProcessor[T interface{}](t interface { - mock.TestingT - Cleanup(func()) -}) *Processor[T] { - mock := &Processor[T]{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/data_handlers/mocks/Reader.go b/data_handlers/mocks/Reader.go deleted file mode 100644 index 46c1b687..00000000 --- a/data_handlers/mocks/Reader.go +++ /dev/null @@ -1,119 +0,0 @@ -// Code generated by mockery v2.41.0. DO NOT EDIT. - -package mocks - -import mock "github.com/stretchr/testify/mock" - -// Reader is an autogenerated mock type for the Reader type -type Reader[T interface{}] struct { - mock.Mock -} - -type Reader_Expecter[T interface{}] struct { - mock *mock.Mock -} - -func (_m *Reader[T]) EXPECT() *Reader_Expecter[T] { - return &Reader_Expecter[T]{mock: &_m.Mock} -} - -// Cancel provides a mock function with given fields: -func (_m *Reader[T]) Cancel() { - _m.Called() -} - -// Reader_Cancel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Cancel' -type Reader_Cancel_Call[T interface{}] struct { - *mock.Call -} - -// Cancel is a helper method to define mock.On call -func (_e *Reader_Expecter[T]) Cancel() *Reader_Cancel_Call[T] { - return &Reader_Cancel_Call[T]{Call: _e.mock.On("Cancel")} -} - -func (_c *Reader_Cancel_Call[T]) Run(run func()) *Reader_Cancel_Call[T] { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *Reader_Cancel_Call[T]) Return() *Reader_Cancel_Call[T] { - _c.Call.Return() - return _c -} - -func (_c *Reader_Cancel_Call[T]) RunAndReturn(run func()) *Reader_Cancel_Call[T] { - _c.Call.Return(run) - return _c -} - -// Read provides a mock function with given fields: -func (_m *Reader[T]) Read() (T, error) { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for Read") - } - - var r0 T - var r1 error - if rf, ok := ret.Get(0).(func() (T, error)); ok { - return rf() - } - if rf, ok := ret.Get(0).(func() T); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(T) - } - - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Reader_Read_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Read' -type Reader_Read_Call[T interface{}] struct { - *mock.Call -} - -// Read is a helper method to define mock.On call -func (_e *Reader_Expecter[T]) Read() *Reader_Read_Call[T] { - return &Reader_Read_Call[T]{Call: _e.mock.On("Read")} -} - -func (_c *Reader_Read_Call[T]) Run(run func()) *Reader_Read_Call[T] { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *Reader_Read_Call[T]) Return(_a0 T, _a1 error) *Reader_Read_Call[T] { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *Reader_Read_Call[T]) RunAndReturn(run func() (T, error)) *Reader_Read_Call[T] { - _c.Call.Return(run) - return _c -} - -// NewReader creates a new instance of Reader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewReader[T interface{}](t interface { - mock.TestingT - Cleanup(func()) -}) *Reader[T] { - mock := &Reader[T]{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/data_handlers/mocks/Writer.go b/data_handlers/mocks/Writer.go deleted file mode 100644 index cbecdaff..00000000 --- a/data_handlers/mocks/Writer.go +++ /dev/null @@ -1,110 +0,0 @@ -// Code generated by mockery v2.41.0. DO NOT EDIT. - -package mocks - -import mock "github.com/stretchr/testify/mock" - -// Writer is an autogenerated mock type for the Writer type -type Writer[T interface{}] struct { - mock.Mock -} - -type Writer_Expecter[T interface{}] struct { - mock *mock.Mock -} - -func (_m *Writer[T]) EXPECT() *Writer_Expecter[T] { - return &Writer_Expecter[T]{mock: &_m.Mock} -} - -// Cancel provides a mock function with given fields: -func (_m *Writer[T]) Cancel() { - _m.Called() -} - -// Writer_Cancel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Cancel' -type Writer_Cancel_Call[T interface{}] struct { - *mock.Call -} - -// Cancel is a helper method to define mock.On call -func (_e *Writer_Expecter[T]) Cancel() *Writer_Cancel_Call[T] { - return &Writer_Cancel_Call[T]{Call: _e.mock.On("Cancel")} -} - -func (_c *Writer_Cancel_Call[T]) Run(run func()) *Writer_Cancel_Call[T] { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *Writer_Cancel_Call[T]) Return() *Writer_Cancel_Call[T] { - _c.Call.Return() - return _c -} - -func (_c *Writer_Cancel_Call[T]) RunAndReturn(run func()) *Writer_Cancel_Call[T] { - _c.Call.Return(run) - return _c -} - -// Write provides a mock function with given fields: _a0 -func (_m *Writer[T]) Write(_a0 T) error { - ret := _m.Called(_a0) - - if len(ret) == 0 { - panic("no return value specified for Write") - } - - var r0 error - if rf, ok := ret.Get(0).(func(T) error); ok { - r0 = rf(_a0) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Writer_Write_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Write' -type Writer_Write_Call[T interface{}] struct { - *mock.Call -} - -// Write is a helper method to define mock.On call -// - _a0 T -func (_e *Writer_Expecter[T]) Write(_a0 interface{}) *Writer_Write_Call[T] { - return &Writer_Write_Call[T]{Call: _e.mock.On("Write", _a0)} -} - -func (_c *Writer_Write_Call[T]) Run(run func(_a0 T)) *Writer_Write_Call[T] { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(T)) - }) - return _c -} - -func (_c *Writer_Write_Call[T]) Return(_a0 error) *Writer_Write_Call[T] { - _c.Call.Return(_a0) - return _c -} - -func (_c *Writer_Write_Call[T]) RunAndReturn(run func(T) error) *Writer_Write_Call[T] { - _c.Call.Return(run) - return _c -} - -// NewWriter creates a new instance of Writer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewWriter[T interface{}](t interface { - mock.TestingT - Cleanup(func()) -}) *Writer[T] { - mock := &Writer[T]{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/data_handlers/pipeline.go b/data_handlers/pipeline.go deleted file mode 100644 index 2aa2a804..00000000 --- a/data_handlers/pipeline.go +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright 2024-2024 Aerospike, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datahandlers - -import ( - "context" - "io" - "math" - "sync" -) - -// Reader is an interface for reading data -// -//go:generate mockery --name Reader -type Reader[T any] interface { - // Read must return io.EOF when there is no more data to read - Read() (T, error) - // Cancel tells the reader to clean up its resources - // usually this is a no-op - Cancel() -} - -// Writer is an interface for writing data -// -//go:generate mockery --name Writer -type Writer[T any] interface { - Write(T) error - // Cancel tells the writer to clean up its resources - // usually this is a no-op - Cancel() -} - -// Processor is an interface for processing data -// -//go:generate mockery --name Processor -type Processor[T any] interface { - Process(T) (T, error) -} - -// DataPipeline is a generic pipeline for reading, processing, and writing data. -// Pipelines are created with slices of Readers, Processors, and Writers. -// The Run method starts the pipeline. -// Each reader, writer, and processor runs in its own goroutine. -// Each type of stage (read, process, write) is connected by a single channel. -type DataPipeline[T any] struct { - readers []readStage[T] - processors []processStage[T] - writers []writeStage[T] - readSendChan chan T - processSendChan chan T -} - -// NewDataPipeline creates a new DataPipeline -func NewDataPipeline[T any](r []Reader[T], p []Processor[T], w []Writer[T]) *DataPipeline[T] { - if len(r) == 0 || len(p) == 0 || len(w) == 0 { - return nil - } - - chanSize := int(math.Max(math.Max(float64(len(r)), float64(len(p))), float64(len(w)))) - - readSendChan := make(chan T, chanSize) - readers := make([]readStage[T], len(r)) - for i, reader := range r { - readers[i] = readStage[T]{ - r: reader, - send: readSendChan, - } - } - - processSendChan := make(chan T, chanSize) - processors := make([]processStage[T], len(p)) - for i, processor := range p { - processors[i] = processStage[T]{ - p: processor, - receive: readSendChan, - send: processSendChan, - } - } - - writers := make([]writeStage[T], len(w)) - for i, writer := range w { - writers[i] = writeStage[T]{ - w: writer, - receive: processSendChan, - } - } - - return &DataPipeline[T]{ - readers: readers, - processors: processors, - writers: writers, - readSendChan: readSendChan, - processSendChan: processSendChan, - } -} - -// Run starts the pipeline -// The pipeline stops when all readers, processors, and writers finish or -// if any stage returns an error. -func (dp *DataPipeline[T]) Run() error { - chanLen := len(dp.readers) + len(dp.processors) + len(dp.writers) - - if chanLen == 0 { - return nil - } - - errc := make(chan error, chanLen) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - rg := &sync.WaitGroup{} - for _, reader := range dp.readers { - rg.Add(1) - go func(reader readStage[T]) { - defer rg.Done() - err := reader.Run(ctx) - if err != nil { - cancel() - errc <- err - } - }(reader) - } - - pg := &sync.WaitGroup{} - for _, processor := range dp.processors { - pg.Add(1) - go func(processor processStage[T]) { - defer pg.Done() - err := processor.Run(ctx) - if err != nil { - cancel() - errc <- err - } - }(processor) - } - - wg := &sync.WaitGroup{} - for _, writer := range dp.writers { - wg.Add(1) - go func(writer writeStage[T]) { - defer wg.Done() - err := writer.Run(ctx) - if err != nil { - cancel() - errc <- err - } - }(writer) - } - - rg.Wait() - close(dp.readSendChan) - pg.Wait() - close(dp.processSendChan) - wg.Wait() - close(errc) - - // TODO improve error handling - // this should return a slice of errors - // each error should identify the stage it came from - return <-errc -} - -type readStage[T any] struct { - r Reader[T] - send chan T -} - -// TODO support passing in a context -func (rs *readStage[T]) Run(ctx context.Context) error { - defer rs.r.Cancel() - for { - v, err := rs.r.Read() - if err == io.EOF { - return nil - } - if err != nil { - return err - } - select { - case <-ctx.Done(): - return nil - case rs.send <- v: - } - } -} - -type processStage[T any] struct { - p Processor[T] - send chan T - receive chan T -} - -func (ps *processStage[T]) Run(ctx context.Context) error { - for { - var ( - v T - active bool - ) - select { - case <-ctx.Done(): - return nil - case v, active = <-ps.receive: - } - if !active { - return nil - } - v, err := ps.p.Process(v) - if err != nil { - return err - } - select { - case <-ctx.Done(): - return nil - case ps.send <- v: - } - } -} - -type writeStage[T any] struct { - w Writer[T] - receive chan T -} - -func (ws *writeStage[T]) Run(ctx context.Context) error { - defer ws.w.Cancel() - for { - var ( - v T - active bool - ) - select { - case <-ctx.Done(): - return nil - case v, active = <-ws.receive: - } - if !active { - return nil - } - err := ws.w.Write(v) - if err != nil { - return err - } - } -} diff --git a/data_handlers/pipline_test.go b/data_handlers/pipline_test.go deleted file mode 100644 index 076e6acc..00000000 --- a/data_handlers/pipline_test.go +++ /dev/null @@ -1,241 +0,0 @@ -// Copyright 2024-2024 Aerospike, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datahandlers - -import ( - "context" - "errors" - "io" - "testing" - - "github.com/aerospike/aerospike-tools-backup-lib/data_handlers/mocks" - - "github.com/stretchr/testify/suite" -) - -type pipelineTestSuite struct { - suite.Suite -} - -func (suite *pipelineTestSuite) TestNewDataPipeline() { - reader := mocks.NewReader[string](suite.T()) - processor := mocks.NewProcessor[string](suite.T()) - writer := mocks.NewWriter[string](suite.T()) - - readers := []Reader[string]{reader} - processors := []Processor[string]{processor} - writers := []Writer[string]{writer} - - pipeline := NewDataPipeline(readers, processors, writers) - suite.NotNil(pipeline) -} - -func (suite *pipelineTestSuite) TestNewDataPipelineNoArgs() { - pipeline := NewDataPipeline[string](nil, nil, nil) - suite.Nil(pipeline) -} - -func (suite *pipelineTestSuite) TestDataPipelineRun() { - reader := mocks.NewReader[string](suite.T()) - processor := mocks.NewProcessor[string](suite.T()) - writer := mocks.NewWriter[string](suite.T()) - - readers := []Reader[string]{reader} - processors := []Processor[string]{processor} - writers := []Writer[string]{writer} - - pipeline := NewDataPipeline(readers, processors, writers) - suite.NotNil(pipeline) - - readCalls := 0 - reader.EXPECT().Read().RunAndReturn(func() (string, error) { - readCalls++ - if readCalls <= 3 { - return "hi", nil - } - return "", io.EOF - }) - reader.EXPECT().Cancel() - - processor.EXPECT().Process("hi").Return("hi", nil) - - writer.EXPECT().Write("hi").Return(nil) - writer.EXPECT().Cancel() - - err := pipeline.Run() - suite.Nil(err) -} - -func (suite *pipelineTestSuite) TestDataPipelineReaderFails() { - reader := mocks.NewReader[string](suite.T()) - processor := mocks.NewProcessor[string](suite.T()) - writer := mocks.NewWriter[string](suite.T()) - - readers := []Reader[string]{reader} - processors := []Processor[string]{processor} - writers := []Writer[string]{writer} - - pipeline := NewDataPipeline(readers, processors, writers) - suite.NotNil(pipeline) - - readCalls := 0 - reader.EXPECT().Read().RunAndReturn(func() (string, error) { - readCalls++ - if readCalls <= 3 { - return "hi", nil - } - return "", errors.New("error") - }) - reader.EXPECT().Cancel() - - processor.EXPECT().Process("hi").Return("hi", nil) - - writer.EXPECT().Write("hi").Return(nil) - writer.EXPECT().Cancel() - - err := pipeline.Run() - suite.NotNil(err) -} - -func (suite *pipelineTestSuite) TestDataPipelineProcessorFails() { - reader := mocks.NewReader[string](suite.T()) - processor := mocks.NewProcessor[string](suite.T()) - writer := mocks.NewWriter[string](suite.T()) - - readers := []Reader[string]{reader} - processors := []Processor[string]{processor} - writers := []Writer[string]{writer} - - reader.EXPECT().Read().Return("hi", nil) - reader.EXPECT().Cancel() - - processCalls := 0 - processor.EXPECT().Process("hi").RunAndReturn(func(string) (string, error) { - processCalls++ - if processCalls <= 3 { - return "hi", nil - } - return "", errors.New("error") - }) - - writer.EXPECT().Write("hi").Return(nil) - writer.EXPECT().Cancel() - - pipeline := NewDataPipeline(readers, processors, writers) - suite.NotNil(pipeline) - - err := pipeline.Run() - suite.NotNil(err) -} - -func (suite *pipelineTestSuite) TestDataPipelineWriterFails() { - reader := mocks.NewReader[string](suite.T()) - processor := mocks.NewProcessor[string](suite.T()) - writer := mocks.NewWriter[string](suite.T()) - - readers := []Reader[string]{reader} - processors := []Processor[string]{processor} - writers := []Writer[string]{writer} - - reader.EXPECT().Read().Return("hi", nil) - reader.EXPECT().Cancel() - - processor.EXPECT().Process("hi").Return("hi", nil) - - writeCalls := 0 - writer.EXPECT().Write("hi").RunAndReturn(func(string) error { - writeCalls++ - if writeCalls <= 3 { - return nil - } - return errors.New("error") - }) - writer.EXPECT().Cancel() - - pipeline := NewDataPipeline(readers, processors, writers) - suite.NotNil(pipeline) - - err := pipeline.Run() - suite.NotNil(err) -} - -func (suite *pipelineTestSuite) TestReadStageRun() { - ctx, cancel := context.WithCancel(context.Background()) - - reader := mocks.NewReader[string](suite.T()) - reader.EXPECT().Read().Return("hi", nil) - reader.EXPECT().Cancel() - - stage := readStage[string]{ - r: reader, - send: make(chan string), - } - - errChan := make(chan error) - go func() { - errChan <- stage.Run(ctx) - close(errChan) - }() - cancel() - - err := <-errChan - suite.Nil(err) -} - -func (suite *pipelineTestSuite) TestReadStageRunEOF() { - ctx := context.Background() - reader := mocks.NewReader[string](suite.T()) - reader.EXPECT().Read().Return("", io.EOF) - reader.EXPECT().Cancel() - - stage := readStage[string]{ - r: reader, - send: make(chan string), - } - - errChan := make(chan error) - go func() { - errChan <- stage.Run(ctx) - close(errChan) - }() - - err := <-errChan - suite.Nil(err) -} - -func (suite *pipelineTestSuite) TestReadStageError() { - ctx := context.Background() - reader := mocks.NewReader[string](suite.T()) - reader.EXPECT().Read().Return("", errors.New("error")) - reader.EXPECT().Cancel() - - stage := readStage[string]{ - r: reader, - send: make(chan string), - } - - errChan := make(chan error) - go func() { - errChan <- stage.Run(ctx) - close(errChan) - }() - - err := <-errChan - suite.NotNil(err) -} - -func TestPipelineTestSuite(t *testing.T) { - suite.Run(t, new(pipelineTestSuite)) -} diff --git a/data_handlers/processors.go b/data_handlers/processors.go index 218f299e..c62d47e1 100644 --- a/data_handlers/processors.go +++ b/data_handlers/processors.go @@ -14,9 +14,70 @@ package datahandlers +import "context" + // processors.go contains the implementations of the DataProcessor interface // used by dataPipelines in the backuplib package +// **** Processor Worker **** + +// DataProcessor is an interface for processing data +// +//go:generate mockery --name DataProcessor +type DataProcessor interface { + Process(any) (any, error) +} + +// ProcessorWorker implements the pipeline.Worker interface +// It wraps a DataProcessor and processes data with it +type ProcessorWorker struct { + processor DataProcessor + receive <-chan any + send chan<- any +} + +// NewProcessorWorker creates a new ProcessorWorker +func NewProcessorWorker(processor DataProcessor) *ProcessorWorker { + return &ProcessorWorker{ + processor: processor, + } +} + +// SetReceiveChan sets the receive channel for the ProcessorWorker +func (w *ProcessorWorker) SetReceiveChan(c <-chan any) { + w.receive = c +} + +// SetSendChan sets the send channel for the ProcessorWorker +func (w *ProcessorWorker) SetSendChan(c chan<- any) { + w.send = c +} + +// Run starts the ProcessorWorker +func (w *ProcessorWorker) Run(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case data, active := <-w.receive: + if !active { + return nil + } + + processed, err := w.processor.Process(data) + if err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + case w.send <- processed: + } + } + } +} + // **** NOOP Processor **** // NOOPProcessor satisfies the DataProcessor interface diff --git a/data_handlers/processors_test.go b/data_handlers/processors_test.go index 5a3a3afd..6c0e2268 100644 --- a/data_handlers/processors_test.go +++ b/data_handlers/processors_test.go @@ -15,8 +15,13 @@ package datahandlers import ( + "context" + "errors" + "sync" "testing" + "time" + "github.com/aerospike/aerospike-tools-backup-lib/data_handlers/mocks" "github.com/stretchr/testify/suite" ) @@ -24,6 +29,145 @@ type proccessorTestSuite struct { suite.Suite } +func (suite *proccessorTestSuite) TestProcessorWorker() { + mockProcessor := mocks.NewDataProcessor(suite.T()) + mockProcessor.EXPECT().Process("test").Return("test", nil) + + worker := NewProcessorWorker(mockProcessor) + suite.NotNil(worker) + + receiver := make(chan any, 1) + receiver <- "test" + close(receiver) + + worker.SetReceiveChan(receiver) + + sender := make(chan any, 1) + worker.SetSendChan(sender) + + ctx := context.Background() + err := worker.Run(ctx) + suite.Nil(err) + + data := <-sender + suite.Equal("test", data) +} + +func (suite *proccessorTestSuite) TestProcessorWorkerCancelOnReceive() { + mockProcessor := mocks.NewDataProcessor(suite.T()) + mockProcessor.EXPECT().Process("test").Return("test", nil) + + worker := NewProcessorWorker(mockProcessor) + suite.NotNil(worker) + + receiver := make(chan any, 1) + receiver <- "test" + + worker.SetReceiveChan(receiver) + + sender := make(chan any, 1) + worker.SetSendChan(sender) + + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + wg.Add(1) + errors := make(chan error, 1) + go func() { + defer wg.Done() + err := worker.Run(ctx) + errors <- err + }() + + // give the worker some time to start + time.Sleep(100 * time.Millisecond) + + cancel() + wg.Wait() + + err := <-errors + suite.NotNil(err) + + data := <-sender + suite.Equal("test", data) +} + +func (suite *proccessorTestSuite) TestProcessorWorkerCancelOnSend() { + mockProcessor := mocks.NewDataProcessor(suite.T()) + mockProcessor.EXPECT().Process("test").Return("test", nil) + + worker := NewProcessorWorker(mockProcessor) + suite.NotNil(worker) + + receiver := make(chan any, 1) + receiver <- "test" + + worker.SetReceiveChan(receiver) + + sender := make(chan any) + worker.SetSendChan(sender) + + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + wg.Add(1) + errors := make(chan error, 1) + go func() { + defer wg.Done() + err := worker.Run(ctx) + errors <- err + }() + + // give the worker some time to start + time.Sleep(100 * time.Millisecond) + + cancel() + wg.Wait() + + err := <-errors + suite.NotNil(err) +} + +func (suite *proccessorTestSuite) TestProcessorWorkerReceiveClosed() { + mockProcessor := mocks.NewDataProcessor(suite.T()) + mockProcessor.EXPECT().Process("test").Return("test", nil) + + worker := NewProcessorWorker(mockProcessor) + suite.NotNil(worker) + + receiver := make(chan any, 1) + receiver <- "test" + close(receiver) + + worker.SetReceiveChan(receiver) + + sender := make(chan any, 1) + worker.SetSendChan(sender) + + ctx := context.Background() + err := worker.Run(ctx) + suite.Nil(err) +} + +func (suite *proccessorTestSuite) TestProcessorWorkerProcessFailed() { + mockProcessor := mocks.NewDataProcessor(suite.T()) + mockProcessor.EXPECT().Process("test").Return(nil, errors.New("test")) + + worker := NewProcessorWorker(mockProcessor) + suite.NotNil(worker) + + receiver := make(chan any, 1) + receiver <- "test" + close(receiver) + + worker.SetReceiveChan(receiver) + + sender := make(chan any, 1) + worker.SetSendChan(sender) + + ctx := context.Background() + err := worker.Run(ctx) + suite.NotNil(err) +} + func (suite *proccessorTestSuite) TestNOOPProcessor() { noop := NewNOOPProcessor() suite.NotNil(noop) diff --git a/data_handlers/readers.go b/data_handlers/readers.go index 362b47d3..1d8e1811 100644 --- a/data_handlers/readers.go +++ b/data_handlers/readers.go @@ -15,6 +15,8 @@ package datahandlers import ( + "context" + "errors" "io" "github.com/aerospike/aerospike-tools-backup-lib/models" @@ -25,6 +27,60 @@ import ( // readers.go contains the implementations of the DataReader interface // used by dataPipelines in the backuplib package +// **** Read Worker **** + +// DataReader is an interface for reading data from a source. +// +//go:generate mockery --name DataReader +type DataReader interface { + Read() (any, error) + Cancel() +} + +// ReadWorker implements the pipeline.Worker interface +// It wraps a DataReader and reads data from it +type ReadWorker[T any] struct { + reader DataReader + send chan<- any +} + +// NewReadWorker creates a new ReadWorker +func NewReadWorker[T any](reader DataReader) *ReadWorker[T] { + return &ReadWorker[T]{ + reader: reader, + } +} + +// SetReceiveChan satisfies the pipeline.Worker interface +// but is a no-op for the ReadWorker +func (w *ReadWorker[T]) SetReceiveChan(c <-chan any) { + // no-op +} + +// SetSendChan sets the send channel for the ReadWorker +func (w *ReadWorker[T]) SetSendChan(c chan<- any) { + w.send = c +} + +// Run runs the ReadWorker +func (w *ReadWorker[T]) Run(ctx context.Context) error { + for { + defer w.reader.Cancel() + data, err := w.reader.Read() + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case w.send <- data: + } + } +} + // Decoder is an interface for reading backup data as tokens. // It is used to support different data formats. // While the return type is `any`, the actual types returned should diff --git a/data_handlers/readers_test.go b/data_handlers/readers_test.go index 265256a8..f04b9a58 100644 --- a/data_handlers/readers_test.go +++ b/data_handlers/readers_test.go @@ -15,10 +15,13 @@ package datahandlers import ( + "context" "fmt" "io" "reflect" + "sync" "testing" + "time" "unsafe" "github.com/aerospike/aerospike-tools-backup-lib/data_handlers/mocks" @@ -32,6 +35,60 @@ type readersTestSuite struct { suite.Suite } +func (suite *readersTestSuite) TestReadWorker() { + mockReader := mocks.NewDataReader(suite.T()) + + readCalls := 0 + mockReader.EXPECT().Read().RunAndReturn(func() (any, error) { + readCalls++ + if readCalls <= 3 { + return "hi", nil + } + return "", io.EOF + }) + mockReader.EXPECT().Cancel() + + worker := NewReadWorker[string](mockReader) + suite.NotNil(worker) + + send := make(chan any, 3) + worker.SetSendChan(send) + + ctx := context.Background() + worker.Run(ctx) + close(send) + + suite.Equal(3, len(send)) + + for v := range send { + suite.Equal("hi", v) + } +} + +func (suite *readersTestSuite) TestReadWorkerCancel() { + mockReader := mocks.NewDataReader(suite.T()) + mockReader.EXPECT().Read().Return("hi", nil) + mockReader.EXPECT().Cancel() + + worker := NewReadWorker[string](mockReader) + suite.NotNil(worker) + + ctx, cancel := context.WithCancel(context.Background()) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + worker.Run(ctx) + }() + + // give the worker some time to start + time.Sleep(100 * time.Millisecond) + + cancel() + wg.Wait() +} + func (suite *readersTestSuite) TestGenericReader() { mockDecoder := mocks.NewDecoder(suite.T()) mockDecoder.EXPECT().NextToken().Return("hi", nil) diff --git a/data_handlers/writers.go b/data_handlers/writers.go index 0686ac85..93ecd0eb 100644 --- a/data_handlers/writers.go +++ b/data_handlers/writers.go @@ -16,6 +16,7 @@ package datahandlers import ( "bytes" + "context" "errors" "fmt" "io" @@ -28,6 +29,60 @@ import ( // writers.go contains the implementations of the DataWriter interface // used by dataPipelines in the backuplib package +// **** Write Worker **** + +// DataWriter is an interface for writing data to a destination. +// +//go:generate mockery --name DataWriter +type DataWriter interface { + Write(any) error + Cancel() +} + +// WriteWorker implements the pipeline.Worker interface +// It wraps a DataWriter and writes data to it +type WriteWorker struct { + writer DataWriter + receive <-chan any +} + +// NewWriteWorker creates a new WriteWorker +func NewWriteWorker(writer DataWriter) *WriteWorker { + return &WriteWorker{ + writer: writer, + } +} + +// SetReceiveChan sets the receive channel for the WriteWorker +func (w *WriteWorker) SetReceiveChan(c <-chan any) { + w.receive = c +} + +// SetSendChan satisfies the pipeline.Worker interface +// but is a no-op for the WriteWorker +func (w *WriteWorker) SetSendChan(c chan<- any) { + // no-op +} + +// Run runs the WriteWorker +func (w *WriteWorker) Run(ctx context.Context) error { + for { + defer w.writer.Cancel() + select { + case <-ctx.Done(): + return ctx.Err() + case data, active := <-w.receive: + if !active { + return nil + } + + if err := w.writer.Write(data); err != nil { + return err + } + } + } +} + // **** Generic Writer **** // Encoder is an interface for encoding the types from the models package. diff --git a/data_handlers/writers_test.go b/data_handlers/writers_test.go index 922467a5..7520db9b 100644 --- a/data_handlers/writers_test.go +++ b/data_handlers/writers_test.go @@ -16,6 +16,7 @@ package datahandlers import ( "bytes" + "context" "errors" "testing" @@ -30,6 +31,58 @@ type writersTestSuite struct { suite.Suite } +func (suite *writersTestSuite) TestWriteWorker() { + mockWriter := mocks.NewDataWriter(suite.T()) + mockWriter.EXPECT().Write("test").Return(nil) + mockWriter.EXPECT().Cancel() + + worker := NewWriteWorker(mockWriter) + suite.NotNil(worker) + + receiver := make(chan any, 1) + receiver <- "test" + close(receiver) + + worker.SetReceiveChan(receiver) + + ctx := context.Background() + err := worker.Run(ctx) + suite.Nil(err) +} + +func (suite *writersTestSuite) TestWriteWorkerCancel() { + mockWriter := mocks.NewDataWriter(suite.T()) + mockWriter.EXPECT().Cancel() + + worker := NewWriteWorker(mockWriter) + suite.NotNil(worker) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := worker.Run(ctx) + suite.NotNil(err) +} + +func (suite *writersTestSuite) TestWriteWorkerWriteFailed() { + mockWriter := mocks.NewDataWriter(suite.T()) + mockWriter.EXPECT().Write("test").Return(errors.New("error")) + mockWriter.EXPECT().Cancel() + + worker := NewWriteWorker(mockWriter) + suite.NotNil(worker) + + receiver := make(chan any, 1) + receiver <- "test" + close(receiver) + + worker.SetReceiveChan(receiver) + + ctx := context.Background() + err := worker.Run(ctx) + suite.NotNil(err) +} + func (suite *writersTestSuite) TestGenericWriter() { namespace := "test" set := "" diff --git a/pipeline/mocks/Worker.go b/pipeline/mocks/Worker.go new file mode 100644 index 00000000..252c3cfc --- /dev/null +++ b/pipeline/mocks/Worker.go @@ -0,0 +1,148 @@ +// Code generated by mockery v2.41.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// Worker is an autogenerated mock type for the Worker type +type Worker[T interface{}] struct { + mock.Mock +} + +type Worker_Expecter[T interface{}] struct { + mock *mock.Mock +} + +func (_m *Worker[T]) EXPECT() *Worker_Expecter[T] { + return &Worker_Expecter[T]{mock: &_m.Mock} +} + +// Run provides a mock function with given fields: _a0 +func (_m *Worker[T]) Run(_a0 context.Context) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for Run") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Worker_Run_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Run' +type Worker_Run_Call[T interface{}] struct { + *mock.Call +} + +// Run is a helper method to define mock.On call +// - _a0 context.Context +func (_e *Worker_Expecter[T]) Run(_a0 interface{}) *Worker_Run_Call[T] { + return &Worker_Run_Call[T]{Call: _e.mock.On("Run", _a0)} +} + +func (_c *Worker_Run_Call[T]) Run(run func(_a0 context.Context)) *Worker_Run_Call[T] { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *Worker_Run_Call[T]) Return(_a0 error) *Worker_Run_Call[T] { + _c.Call.Return(_a0) + return _c +} + +func (_c *Worker_Run_Call[T]) RunAndReturn(run func(context.Context) error) *Worker_Run_Call[T] { + _c.Call.Return(run) + return _c +} + +// SetReceiveChan provides a mock function with given fields: _a0 +func (_m *Worker[T]) SetReceiveChan(_a0 <-chan T) { + _m.Called(_a0) +} + +// Worker_SetReceiveChan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetReceiveChan' +type Worker_SetReceiveChan_Call[T interface{}] struct { + *mock.Call +} + +// SetReceiveChan is a helper method to define mock.On call +// - _a0 <-chan T +func (_e *Worker_Expecter[T]) SetReceiveChan(_a0 interface{}) *Worker_SetReceiveChan_Call[T] { + return &Worker_SetReceiveChan_Call[T]{Call: _e.mock.On("SetReceiveChan", _a0)} +} + +func (_c *Worker_SetReceiveChan_Call[T]) Run(run func(_a0 <-chan T)) *Worker_SetReceiveChan_Call[T] { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(<-chan T)) + }) + return _c +} + +func (_c *Worker_SetReceiveChan_Call[T]) Return() *Worker_SetReceiveChan_Call[T] { + _c.Call.Return() + return _c +} + +func (_c *Worker_SetReceiveChan_Call[T]) RunAndReturn(run func(<-chan T)) *Worker_SetReceiveChan_Call[T] { + _c.Call.Return(run) + return _c +} + +// SetSendChan provides a mock function with given fields: _a0 +func (_m *Worker[T]) SetSendChan(_a0 chan<- T) { + _m.Called(_a0) +} + +// Worker_SetSendChan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetSendChan' +type Worker_SetSendChan_Call[T interface{}] struct { + *mock.Call +} + +// SetSendChan is a helper method to define mock.On call +// - _a0 chan<- T +func (_e *Worker_Expecter[T]) SetSendChan(_a0 interface{}) *Worker_SetSendChan_Call[T] { + return &Worker_SetSendChan_Call[T]{Call: _e.mock.On("SetSendChan", _a0)} +} + +func (_c *Worker_SetSendChan_Call[T]) Run(run func(_a0 chan<- T)) *Worker_SetSendChan_Call[T] { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(chan<- T)) + }) + return _c +} + +func (_c *Worker_SetSendChan_Call[T]) Return() *Worker_SetSendChan_Call[T] { + _c.Call.Return() + return _c +} + +func (_c *Worker_SetSendChan_Call[T]) RunAndReturn(run func(chan<- T)) *Worker_SetSendChan_Call[T] { + _c.Call.Return(run) + return _c +} + +// NewWorker creates a new instance of Worker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewWorker[T interface{}](t interface { + mock.TestingT + Cleanup(func()) +}) *Worker[T] { + mock := &Worker[T]{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go new file mode 100644 index 00000000..2be97707 --- /dev/null +++ b/pipeline/pipeline.go @@ -0,0 +1,193 @@ +// Copyright 2024-2024 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "sync" +) + +// Worker is an interface for a pipeline item +// Each worker has a send and receive channel +// that connects it to the previous and next stage in the pipeline +// The Run method starts the worker +// +//go:generate mockery --name Worker +type Worker[T any] interface { + SetSendChan(chan<- T) + SetReceiveChan(<-chan T) + Run(context.Context) error +} + +// Pipeline is a generic pipeline for reading, processing, and writing data. +// Pipelines are created with slices of Readers, Processors, and Writers. +// The Run method starts the pipeline. +// Each reader, writer, and processor runs in its own goroutine. +// Each type of stage (read, process, write) is connected by a single channel. +type Pipeline[T any] struct { + stages []*stage[T] + receive <-chan T + send chan<- T +} + +// NewPipeline creates a new DataPipeline +func NewPipeline[T any](workGroups ...[]Worker[T]) *Pipeline[T] { + stages := make([]*stage[T], len(workGroups)) + + for i, workers := range workGroups { + stages[i] = NewStage(workers...) + } + + return &Pipeline[T]{ + stages: stages, + } +} + +// SetReceiveChan sets the receive channel for the pipeline +// The receive channel is used as the input to the first stage +// This satisfies the Worker interface so that pipelines can be chained together +// Usually users will not need to call this method +func (dp *Pipeline[T]) SetReceiveChan(c <-chan T) { + dp.receive = c +} + +// SetSendChan sets the send channel for the pipeline +// The send channel is used as the output from the last stage +// This satisfies the Worker interface so that pipelines can be chained together +// Usually users will not need to call this method +func (dp *Pipeline[T]) SetSendChan(c chan<- T) { + dp.send = c +} + +// Run starts the pipeline +// The pipeline stops when a worker returns an error, +// all workers are done, or the context is canceled +func (dp *Pipeline[T]) Run(ctx context.Context) error { + if len(dp.stages) == 0 { + return nil + } + + errors := make(chan error, len(dp.stages)) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var lastSend chan T + for i, s := range dp.stages { + var ( + send chan T + ) + + if i == len(dp.stages)-1 { + s.SetSendChan(dp.send) + } else { + send = make(chan T, len(s.workers)) + s.SetSendChan(send) + } + + if i == 0 { + s.SetReceiveChan(dp.receive) + } else { + s.SetReceiveChan(lastSend) + } + + lastSend = send + } + + wg := &sync.WaitGroup{} + for _, s := range dp.stages { + wg.Add(1) + go func(s *stage[T]) { + defer wg.Done() + err := s.Run(ctx) + if err != nil { + errors <- err + cancel() + } + }(s) + } + + wg.Wait() + + close(errors) + if len(errors) > 0 { + return <-errors + } + + return nil +} + +type stage[T any] struct { + workers []Worker[T] + receive <-chan T + send chan<- T +} + +func (s *stage[T]) SetReceiveChan(c <-chan T) { + s.receive = c +} + +func (s *stage[T]) SetSendChan(c chan<- T) { + s.send = c +} + +func NewStage[T any](workers ...Worker[T]) *stage[T] { + s := stage[T]{ + workers: workers, + } + + return &s +} + +func (s *stage[T]) Run(ctx context.Context) error { + if len(s.workers) == 0 { + return nil + } + + for _, w := range s.workers { + w.SetReceiveChan(s.receive) + w.SetSendChan(s.send) + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + errors := make(chan error, len(s.workers)) + + wg := &sync.WaitGroup{} + for _, w := range s.workers { + wg.Add(1) + go func(w Worker[T]) { + defer wg.Done() + err := w.Run(ctx) + if err != nil { + errors <- err + cancel() + } + }(w) + } + + wg.Wait() + if s.send != nil { + close(s.send) + } + + close(errors) + if len(errors) > 0 { + return <-errors + } + + return nil +} diff --git a/pipeline/pipline_test.go b/pipeline/pipline_test.go new file mode 100644 index 00000000..61fd4520 --- /dev/null +++ b/pipeline/pipline_test.go @@ -0,0 +1,184 @@ +// Copyright 2024-2024 Aerospike, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "errors" + "strconv" + "testing" + + "github.com/aerospike/aerospike-tools-backup-lib/pipeline/mocks" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type pipelineTestSuite struct { + suite.Suite +} + +func (suite *pipelineTestSuite) TestNewDataPipeline() { + w1 := mocks.NewWorker[string](suite.T()) + w2 := mocks.NewWorker[string](suite.T()) + w3 := mocks.NewWorker[string](suite.T()) + + workers := [][]Worker[string]{{w1, w2}, {w3}} + + pipeline := NewPipeline(workers...) + suite.NotNil(pipeline) +} + +func (suite *pipelineTestSuite) TestDataPipelineRun() { + w1 := mocks.NewWorker[string](suite.T()) + w1.EXPECT().SetReceiveChan(mock.Anything) + w1.EXPECT().SetSendChan(mock.Anything) + w1.EXPECT().Run(mock.Anything).Return(nil) + + w2 := mocks.NewWorker[string](suite.T()) + w2.EXPECT().SetReceiveChan(mock.Anything) + w2.EXPECT().SetSendChan(mock.Anything) + w2.EXPECT().Run(mock.Anything).Return(nil) + + w3 := mocks.NewWorker[string](suite.T()) + w3.EXPECT().SetReceiveChan(mock.Anything) + w3.EXPECT().SetSendChan(mock.Anything) + w3.EXPECT().Run(mock.Anything).Return(nil) + + workers := [][]Worker[string]{{w1, w2}, {w3}} + + pipeline := NewPipeline(workers...) + suite.NotNil(pipeline) + + ctx := context.Background() + err := pipeline.Run(ctx) + suite.Nil(err) +} + +type mockWorker struct { + mocks.Worker[string] + receive <-chan string + send chan<- string +} + +func newMockWorker(t *testing.T) *mockWorker { + return &mockWorker{ + Worker: *mocks.NewWorker[string](t), + } +} + +func (w *mockWorker) SetReceiveChan(c <-chan string) { + w.receive = c +} + +func (w *mockWorker) SetSendChan(c chan<- string) { + w.send = c +} + +func (w *mockWorker) Run(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case msg, active := <-w.receive: + if !active { + return nil + } + w.send <- msg + } + } +} + +func (suite *pipelineTestSuite) TestDataPipelineRunWithChannels() { + ctx := context.Background() + + w1 := newMockWorker(suite.T()) + w1.EXPECT().SetReceiveChan(mock.Anything) + w1.EXPECT().SetSendChan(mock.Anything) + w1.EXPECT().Run(ctx) + + w2 := newMockWorker(suite.T()) + w2.EXPECT().SetReceiveChan(mock.Anything) + w2.EXPECT().SetSendChan(mock.Anything) + w2.EXPECT().Run(ctx) + + w3 := newMockWorker(suite.T()) + w3.EXPECT().SetReceiveChan(mock.Anything) + w3.EXPECT().SetSendChan(mock.Anything) + w3.EXPECT().Run(ctx) + + w4 := newMockWorker(suite.T()) + w4.EXPECT().SetReceiveChan(mock.Anything) + w4.EXPECT().SetSendChan(mock.Anything) + w4.EXPECT().Run(ctx) + + workers := [][]Worker[string]{{w1, w2}, {w3}, {w4}} + pipeline := NewPipeline(workers...) + suite.NotNil(pipeline) + + receive := make(chan string, 2) + pipeline.SetReceiveChan(receive) + + send := make(chan string, 2) + pipeline.SetSendChan(send) + + receive <- "0" + receive <- "1" + close(receive) + + err := pipeline.Run(ctx) + suite.Nil(err) + + var count int + for res := range send { + suite.Equal(strconv.Itoa(count), res, "expected %s to be %s", res, strconv.Itoa(count)) + count++ + } +} + +func (suite *pipelineTestSuite) TestDataPipelineRunWorkerFails() { + w1 := mocks.NewWorker[string](suite.T()) + w1.EXPECT().SetReceiveChan(mock.Anything) + w1.EXPECT().SetSendChan(mock.Anything) + w1.EXPECT().Run(mock.Anything).Return(nil) + + w2 := mocks.NewWorker[string](suite.T()) + w2.EXPECT().SetReceiveChan(mock.Anything) + w2.EXPECT().SetSendChan(mock.Anything) + w2.EXPECT().Run(mock.Anything).Return(nil) + + w3 := mocks.NewWorker[string](suite.T()) + w3.EXPECT().SetReceiveChan(mock.Anything) + w3.EXPECT().SetSendChan(mock.Anything) + w3.EXPECT().Run(mock.Anything).Return(errors.New("error")) + + w4 := mocks.NewWorker[string](suite.T()) + w4.EXPECT().SetReceiveChan(mock.Anything) + w4.EXPECT().SetSendChan(mock.Anything) + w4.EXPECT().Run(mock.Anything).Return(nil) + + workers := [][]Worker[string]{{w1, w2}, {w3}, {w4}} + + pipeline := NewPipeline(workers...) + suite.NotNil(pipeline) + + ctx := context.Background() + err := pipeline.Run(ctx) + suite.NotNil(err) +} + +func TestPipelineTestSuite(t *testing.T) { + suite.Run(t, new(pipelineTestSuite)) +} diff --git a/restore_handlers.go b/restore_handlers.go index b39b88ed..0981f2f6 100644 --- a/restore_handlers.go +++ b/restore_handlers.go @@ -18,6 +18,7 @@ import ( "io" datahandlers "github.com/aerospike/aerospike-tools-backup-lib/data_handlers" + "github.com/aerospike/aerospike-tools-backup-lib/pipeline" ) // **** Generic Restore Handler **** @@ -35,7 +36,7 @@ type DBRestoreClient interface { // worker is an interface for running a job type worker interface { // TODO change the any typed pipeline to a message or token type - DoJob(*datahandlers.DataPipeline[any]) error + DoJob(*pipeline.Pipeline[any]) error } // restoreHandler handles generic restore jobs on data readers @@ -57,26 +58,31 @@ func newRestoreHandler(config *RestoreBaseConfig, ac DBRestoreClient, w worker) // run runs the restore job // TODO change the any typed pipeline to a message or token type -func (rh *restoreHandler) run(readers []datahandlers.Reader[any]) error { +func (rh *restoreHandler) run(readers []*datahandlers.ReadWorker[any]) error { // TODO change the any typed pipeline to a message or token type - processors := make([]datahandlers.Processor[any], rh.config.Parallel) + processorWorkers := make([]pipeline.Worker[any], rh.config.Parallel) for i := 0; i < rh.config.Parallel; i++ { processor := datahandlers.NewNOOPProcessor() - processors[i] = processor + processorWorkers[i] = datahandlers.NewProcessorWorker(processor) } // TODO change the any typed pipeline to a message or token type - writers := make([]datahandlers.Writer[any], rh.config.Parallel) + writeWorkers := make([]pipeline.Worker[any], rh.config.Parallel) for i := 0; i < rh.config.Parallel; i++ { writer := datahandlers.NewRestoreWriter(rh.dbClient) - writers[i] = writer + writeWorkers[i] = datahandlers.NewWriteWorker(writer) } - job := datahandlers.NewDataPipeline( - readers, - processors, - writers, + readWorkers := make([]pipeline.Worker[any], len(readers)) + for i, r := range readers { + readWorkers[i] = r + } + + job := pipeline.NewPipeline( + readWorkers, + processorWorkers, + writeWorkers, ) return rh.worker.DoJob(job) @@ -126,7 +132,7 @@ func (rrh *RestoreFromReaderHandler) run(readers []io.Reader) { batchSize := rrh.config.Parallel // TODO change the any type to a message or token type - dataReaders := []datahandlers.Reader[any]{} + dataReaders := []*datahandlers.ReadWorker[any]{} for i, reader := range readers { @@ -138,7 +144,8 @@ func (rrh *RestoreFromReaderHandler) run(readers []io.Reader) { } dr := datahandlers.NewGenericReader(decoder) - dataReaders = append(dataReaders, dr) + readWorker := datahandlers.NewReadWorker[any](dr) + dataReaders = append(dataReaders, readWorker) // if we have not reached the batch size and we have more readers // continue to the next reader // if we are at the end of readers then run no matter what diff --git a/work_handler.go b/work_handler.go index bde460b0..11a63f20 100644 --- a/work_handler.go +++ b/work_handler.go @@ -14,7 +14,11 @@ package backuplib -import datahandlers "github.com/aerospike/aerospike-tools-backup-lib/data_handlers" +import ( + "context" + + "github.com/aerospike/aerospike-tools-backup-lib/pipeline" +) // workHandler is a generic worker for running a data pipeline (job) type workHandler struct{} @@ -24,6 +28,8 @@ func newWorkHandler() *workHandler { } // TODO change the any typed pipeline to a message or token type -func (wh *workHandler) DoJob(job *datahandlers.DataPipeline[any]) error { - return job.Run() +func (wh *workHandler) DoJob(job *pipeline.Pipeline[any]) error { + // TODO allow for context to be passed in + ctx := context.Background() + return job.Run(ctx) }