Skip to content

Commit

Permalink
don't export workers
Browse files Browse the repository at this point in the history
  • Loading branch information
dwelch-spike committed Feb 27, 2024
1 parent dddac59 commit 15e17bc
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 155 deletions.
24 changes: 12 additions & 12 deletions backup_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,32 +51,32 @@ func newBackupHandlerBase(config *BackupBaseConfig, ac *a.Client, namespace stri
return handler
}

func (bh *backupHandlerBase) run(ctx context.Context, writers []*WriteWorker[*models.Token]) error {
func (bh *backupHandlerBase) run(ctx context.Context, writers []*writeWorker[*models.Token]) error {
readWorkers := make([]pipeline.Worker[*models.Token], bh.config.Parallel)
for i := 0; i < bh.config.Parallel; i++ {
begin := (i * PARTITIONS) / bh.config.Parallel
count := PARTITIONS / bh.config.Parallel // TODO verify no off by 1 error

ARRCFG := ARRConfig{
ARRCFG := arrConfig{
Namespace: bh.namespace,
Set: bh.config.Set,
FirstPartition: begin,
NumPartitions: count,
}

recordReader := NewAerospikeRecordReader(
recordReader := newAerospikeRecordReader(
bh.aerospikeClient,
ARRCFG,
bh.config.Policies.ScanPolicy,
)

readWorkers[i] = NewReadWorker(recordReader)
readWorkers[i] = newReadWorker(recordReader)
}

processorWorkers := make([]pipeline.Worker[*models.Token], bh.config.Parallel)
for i := 0; i < bh.config.Parallel; i++ {
processor := NewNoOpProcessor()
processorWorkers[i] = NewProcessorWorker(processor)
processor := newNoOpProcessor()
processorWorkers[i] = newProcessorWorker(processor)
}

writeWorkers := make([]pipeline.Worker[*models.Token], len(writers))
Expand Down Expand Up @@ -133,7 +133,7 @@ func (bwh *BackupHandler) run(ctx context.Context, writers []io.Writer) {
defer handlePanic(errChan)

batchSize := bwh.config.Parallel
dataWriters := []*WriteWorker[*models.Token]{}
dataWriters := []*writeWorker[*models.Token]{}

for i, writer := range writers {

Expand Down Expand Up @@ -178,27 +178,27 @@ func (bwh *BackupHandler) Wait(ctx context.Context) error {
}
}

func getDataWriter(eb EncoderBuilder, w io.Writer, namespace string, first bool) (*WriteWorker[*models.Token], error) {
func getDataWriter(eb EncoderBuilder, w io.Writer, namespace string, first bool) (*writeWorker[*models.Token], error) {
enc, err := eb.CreateEncoder()
if err != nil {
return nil, err
}

switch encT := enc.(type) {
case *asb.Encoder:
asbw := NewASBWriter(encT, w)
asbw := newAsbWriter(encT, w)
err := asbw.Init(namespace, first)
if err != nil {
return nil, err
}

worker := NewWriteWorker(asbw)
worker := newWriteWorker(asbw)

return worker, err

default:
gw := NewGenericWriter(encT, w)
worker := NewWriteWorker(gw)
gw := newGenericWriter(encT, w)
worker := newWriteWorker(gw)

return worker, nil
}
Expand Down
34 changes: 17 additions & 17 deletions processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,40 @@ import (

// **** Processor Worker ****

// DataProcessor is an interface for processing data
// dataProcessor is an interface for processing data
//
//go:generate mockery --name DataProcessor
type DataProcessor[T any] interface {
type dataProcessor[T any] interface {
Process(T) (T, error)
}

// ProcessorWorker implements the pipeline.Worker interface
// processorWorker implements the pipeline.Worker interface
// It wraps a DataProcessor and processes data with it
type ProcessorWorker[T any] struct {
processor DataProcessor[T]
type processorWorker[T any] struct {
processor dataProcessor[T]
receive <-chan T
send chan<- T
}

// NewProcessorWorker creates a new ProcessorWorker
func NewProcessorWorker[T any](processor DataProcessor[T]) *ProcessorWorker[T] {
return &ProcessorWorker[T]{
// newProcessorWorker creates a new ProcessorWorker
func newProcessorWorker[T any](processor dataProcessor[T]) *processorWorker[T] {
return &processorWorker[T]{
processor: processor,
}
}

// SetReceiveChan sets the receive channel for the ProcessorWorker
func (w *ProcessorWorker[T]) SetReceiveChan(c <-chan T) {
func (w *processorWorker[T]) SetReceiveChan(c <-chan T) {
w.receive = c
}

// SetSendChan sets the send channel for the ProcessorWorker
func (w *ProcessorWorker[T]) SetSendChan(c chan<- T) {
func (w *processorWorker[T]) SetSendChan(c chan<- T) {
w.send = c
}

// Run starts the ProcessorWorker
func (w *ProcessorWorker[T]) Run(ctx context.Context) error {
func (w *processorWorker[T]) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
Expand All @@ -84,16 +84,16 @@ func (w *ProcessorWorker[T]) Run(ctx context.Context) error {

// **** NoOp Processor ****

// NoOpProcessor satisfies the DataProcessor interface
// noOpProcessor satisfies the DataProcessor interface
// It does nothing to the data
type NoOpProcessor struct{}
type noOpProcessor struct{}

// NewNoOpProcessor creates a new NOOPProcessor
func NewNoOpProcessor() *NoOpProcessor {
return &NoOpProcessor{}
// newNoOpProcessor creates a new NOOPProcessor
func newNoOpProcessor() *noOpProcessor {
return &noOpProcessor{}
}

// Process processes the data
func (p *NoOpProcessor) Process(data *models.Token) (*models.Token, error) {
func (p *noOpProcessor) Process(data *models.Token) (*models.Token, error) {
return data, nil
}
12 changes: 6 additions & 6 deletions processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (suite *proccessorTestSuite) TestProcessorWorker() {
mockProcessor := mocks.NewDataProcessor[string](suite.T())
mockProcessor.EXPECT().Process("test").Return("test", nil)

worker := NewProcessorWorker(mockProcessor)
worker := newProcessorWorker(mockProcessor)

Check failure on line 37 in processors_test.go

View workflow job for this annotation

GitHub Actions / lint

cannot infer T (processors.go:44:25) (typecheck)
suite.NotNil(worker)

receiver := make(chan string, 1)
Expand All @@ -58,7 +58,7 @@ func (suite *proccessorTestSuite) TestProcessorWorkerCancelOnReceive() {
mockProcessor := mocks.NewDataProcessor[string](suite.T())
mockProcessor.EXPECT().Process("test").Return("test", nil)

worker := NewProcessorWorker(mockProcessor)
worker := newProcessorWorker(mockProcessor)

Check failure on line 61 in processors_test.go

View workflow job for this annotation

GitHub Actions / lint

cannot infer T (processors.go:44:25) (typecheck)
suite.NotNil(worker)

receiver := make(chan string, 1)
Expand Down Expand Up @@ -96,7 +96,7 @@ func (suite *proccessorTestSuite) TestProcessorWorkerCancelOnSend() {
mockProcessor := mocks.NewDataProcessor[string](suite.T())
mockProcessor.EXPECT().Process("test").Return("test", nil)

worker := NewProcessorWorker(mockProcessor)
worker := newProcessorWorker(mockProcessor)

Check failure on line 99 in processors_test.go

View workflow job for this annotation

GitHub Actions / lint

cannot infer T (processors.go:44:25) (typecheck)
suite.NotNil(worker)

receiver := make(chan string, 1)
Expand Down Expand Up @@ -131,7 +131,7 @@ func (suite *proccessorTestSuite) TestProcessorWorkerReceiveClosed() {
mockProcessor := mocks.NewDataProcessor[string](suite.T())
mockProcessor.EXPECT().Process("test").Return("test", nil)

worker := NewProcessorWorker(mockProcessor)
worker := newProcessorWorker(mockProcessor)
suite.NotNil(worker)

receiver := make(chan string, 1)
Expand All @@ -152,7 +152,7 @@ func (suite *proccessorTestSuite) TestProcessorWorkerProcessFailed() {
mockProcessor := mocks.NewDataProcessor[string](suite.T())
mockProcessor.EXPECT().Process("test").Return("", errors.New("test"))

worker := NewProcessorWorker(mockProcessor)
worker := newProcessorWorker(mockProcessor)
suite.NotNil(worker)

receiver := make(chan string, 1)
Expand All @@ -170,7 +170,7 @@ func (suite *proccessorTestSuite) TestProcessorWorkerProcessFailed() {
}

func (suite *proccessorTestSuite) TestNOOPProcessor() {
noop := NewNoOpProcessor()
noop := newNoOpProcessor()
suite.NotNil(noop)

data := &models.Token{
Expand Down
Loading

0 comments on commit 15e17bc

Please sign in to comment.