Skip to content

Commit

Permalink
[chore] Remove unused/unnecessary queue.Sizer types (#11455)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Oct 16, 2024
1 parent 527df61 commit 4f2a8d3
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 36 deletions.
26 changes: 6 additions & 20 deletions exporter/exporterqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ type Factory[T any] func(context.Context, Settings, Config) Queue[T]
// NewMemoryQueueFactory returns a factory to create a new memory queue.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewMemoryQueueFactory[T itemsCounter]() Factory[T] {
func NewMemoryQueueFactory[T any]() Factory[T] {
return func(_ context.Context, _ Settings, cfg Config) Queue[T] {
return queue.NewBoundedMemoryQueue[T](queue.MemoryQueueSettings[T]{
Sizer: sizerFromConfig[T](cfg),
Capacity: capacityFromConfig(cfg),
Sizer: &queue.RequestSizer[T]{},
Capacity: int64(cfg.QueueSize),
})
}
}
Expand All @@ -70,14 +70,14 @@ type PersistentQueueSettings[T any] struct {
// If cfg.StorageID is nil then it falls back to memory queue.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewPersistentQueueFactory[T itemsCounter](storageID *component.ID, factorySettings PersistentQueueSettings[T]) Factory[T] {
func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings PersistentQueueSettings[T]) Factory[T] {
if storageID == nil {
return NewMemoryQueueFactory[T]()
}
return func(_ context.Context, set Settings, cfg Config) Queue[T] {
return queue.NewPersistentQueue[T](queue.PersistentQueueSettings[T]{
Sizer: sizerFromConfig[T](cfg),
Capacity: capacityFromConfig(cfg),
Sizer: &queue.RequestSizer[T]{},
Capacity: int64(cfg.QueueSize),
Signal: set.Signal,
StorageID: *storageID,
Marshaler: factorySettings.Marshaler,
Expand All @@ -86,17 +86,3 @@ func NewPersistentQueueFactory[T itemsCounter](storageID *component.ID, factoryS
})
}
}

type itemsCounter interface {
ItemsCount() int
}

func sizerFromConfig[T itemsCounter](Config) queue.Sizer[T] {
// TODO: Handle other ways to measure the queue size once they are added.
return &queue.RequestSizer[T]{}
}

func capacityFromConfig(cfg Config) int64 {
// TODO: Handle other ways to measure the queue size once they are added.
return int64(cfg.QueueSize)
}
6 changes: 3 additions & 3 deletions exporter/internal/queue/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,20 @@ func Benchmark_QueueUsage_100000_requests(b *testing.B) {

func Benchmark_QueueUsage_10000_items(b *testing.B) {
// each request has 10 items: 1000 requests = 10000 items
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000)
benchmarkQueueUsage(b, &itemsSizer[fakeReq]{}, 1000)
}

func Benchmark_QueueUsage_1M_items(b *testing.B) {
// each request has 10 items: 100000 requests = 1M items
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 100000)
benchmarkQueueUsage(b, &itemsSizer[fakeReq]{}, 100000)
}

func TestQueueUsage(t *testing.T) {
t.Run("requests_based", func(t *testing.T) {
queueUsage(t, &RequestSizer[fakeReq]{}, 10)
})
t.Run("items_based", func(t *testing.T) {
queueUsage(t, &ItemsSizer[fakeReq]{}, 10)
queueUsage(t, &itemsSizer[fakeReq]{}, 10)
})
}

Expand Down
15 changes: 13 additions & 2 deletions exporter/internal/queue/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ import (
"go.opentelemetry.io/collector/pipeline"
)

type itemsCounter interface {
ItemsCount() int
}

// itemsSizer is a Sizer implementation that returns the size of a queue element as the number of items it contains.
type itemsSizer[T itemsCounter] struct{}

func (is *itemsSizer[T]) Sizeof(el T) int64 {
return int64(el.ItemsCount())
}

type tracesRequest struct {
traces ptrace.Traces
}
Expand Down Expand Up @@ -97,7 +108,7 @@ func createTestPersistentQueueWithRequestsCapacity(t testing.TB, ext storage.Ext
}

func createTestPersistentQueueWithItemsCapacity(t testing.TB, ext storage.Extension, capacity int64) *persistentQueue[tracesRequest] {
return createTestPersistentQueueWithCapacityLimiter(t, ext, &ItemsSizer[tracesRequest]{}, capacity)
return createTestPersistentQueueWithCapacityLimiter(t, ext, &itemsSizer[tracesRequest]{}, capacity)
}

func createTestPersistentQueueWithCapacityLimiter(t testing.TB, ext storage.Extension, sizer Sizer[tracesRequest],
Expand Down Expand Up @@ -130,7 +141,7 @@ func TestPersistentQueue_FullCapacity(t *testing.T) {
},
{
name: "items_capacity",
sizer: &ItemsSizer[tracesRequest]{},
sizer: &itemsSizer[tracesRequest]{},
capacity: 55,
sizeMultiplier: 10,
},
Expand Down
11 changes: 0 additions & 11 deletions exporter/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,11 @@ type Queue[T any] interface {
OnProcessingFinished(index uint64, consumeErr error)
}

type itemsCounter interface {
ItemsCount() int
}

// Sizer is an interface that returns the size of the given element.
type Sizer[T any] interface {
Sizeof(T) int64
}

// ItemsSizer is a Sizer implementation that returns the size of a queue element as the number of items it contains.
type ItemsSizer[T itemsCounter] struct{}

func (is *ItemsSizer[T]) Sizeof(el T) int64 {
return int64(el.ItemsCount())
}

// RequestSizer is a Sizer implementation that returns the size of a queue element as one request.
type RequestSizer[T any] struct{}

Expand Down

0 comments on commit 4f2a8d3

Please sign in to comment.