Skip to content

Commit

Permalink
chore: bump benthos version to 4.45.1
Browse files Browse the repository at this point in the history
Bump Benthos version to latest by adopting license changes.
Rely only on components which have permissive OSS license:

* [MIT] Benthos Core: github.com/redpanda-data/benthos
* [Apache 2.0] Community maintained components: github.com/redpanda-data/connect/public/bundle/free/v4
  • Loading branch information
chrisgacsal committed Jan 22, 2025
1 parent 38e78a0 commit dd78f4a
Show file tree
Hide file tree
Showing 9 changed files with 1,967 additions and 599 deletions.
9 changes: 5 additions & 4 deletions cmd/benthos-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package main
import (
"context"

_ "github.com/benthosdev/benthos/v4/public/components/all" // import all benthos components
_ "github.com/benthosdev/benthos/v4/public/components/io" // import io benthos components
_ "github.com/benthosdev/benthos/v4/public/components/pure" // import pure benthos components
"github.com/benthosdev/benthos/v4/public/service"
_ "github.com/redpanda-data/benthos/v4/public/components/io"
_ "github.com/redpanda-data/benthos/v4/public/components/pure"
_ "github.com/redpanda-data/benthos/v4/public/components/pure/extended"
"github.com/redpanda-data/benthos/v4/public/service"
_ "github.com/redpanda-data/connect/public/bundle/free/v4"

_ "github.com/openmeterio/openmeter/collector/benthos/input" // import input plugins
_ "github.com/openmeterio/openmeter/collector/benthos/output" // import output plugins
Expand Down
2 changes: 1 addition & 1 deletion collector/benthos/input/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"strings"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/redpanda-data/benthos/v4/public/service"
"github.com/samber/lo"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down
2 changes: 1 addition & 1 deletion collector/benthos/input/otel_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"sync"
"time"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/redpanda-data/benthos/v4/public/service"
"github.com/samber/lo"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
Expand Down
2 changes: 1 addition & 1 deletion collector/benthos/input/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"time"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/redpanda-data/benthos/v4/public/service"
_ "k8s.io/client-go/plugin/pkg/client/auth" // import kubernetes auth plugins
)

Expand Down
2 changes: 1 addition & 1 deletion collector/benthos/internal/message/batch.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package message

import "github.com/benthosdev/benthos/v4/public/service"
import "github.com/redpanda-data/benthos/v4/public/service"

type Batch = service.MessageBatch
27 changes: 22 additions & 5 deletions collector/benthos/output/openmeter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"fmt"
"net/http"

"github.com/benthosdev/benthos/v4/public/bloblang"
"github.com/benthosdev/benthos/v4/public/service"
"github.com/redpanda-data/benthos/v4/public/bloblang"
"github.com/redpanda-data/benthos/v4/public/service"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -160,18 +160,31 @@ func (o *openmeterOutput) WriteBatch(ctx context.Context, batch service.MessageB
span.End()
}()

o.logger.Debugf("received message batch [size=%d]", len(batch))

if len(batch) == 0 {
return nil
}

var events []any

walkFn := func(_ int, msg *service.Message) error {
if msg == nil {
return errors.New("message is nil")
o.logger.Error("received nil message in batch")

err = errors.New("received nil message in batch")

return err
}

var e any
e, err = msg.AsStructured()
if err != nil {
return fmt.Errorf("failed to convert message to structed data: %w", err)
err = fmt.Errorf("failed to convert message to structed data: %w", err)

return err
}

events = append(events, e)

o.UpdateMessageSpan(ctx, msg)
Expand All @@ -183,7 +196,11 @@ func (o *openmeterOutput) WriteBatch(ctx context.Context, batch service.MessageB
}

if len(events) == 0 {
return errors.New("no valid messages found in batch")
o.logger.Error("no valid messages found in batch")

err = errors.New("no valid messages found in batch")

return err
}

var data any
Expand Down
2 changes: 1 addition & 1 deletion collector/benthos/output/otel_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package output
import (
"context"

"github.com/benthosdev/benthos/v4/public/service"
"github.com/redpanda-data/benthos/v4/public/service"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
logspb "go.opentelemetry.io/proto/otlp/logs/v1"
"google.golang.org/grpc"
Expand Down
Loading

0 comments on commit dd78f4a

Please sign in to comment.