Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confluent kafka binding #988

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ jobs:
- 9091:9091
- 9092:9092

kafka_confluent:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename the one above from "kafka" to "kafka_sarama" to be consistent with the rest of the naming?

image: confluentinc/confluent-local:7.6.0
ports:
- "9192:9192"
env:
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:29192,PLAINTEXT_HOST://localhost:9192'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@localhost:29193'
KAFKA_LISTENERS: 'PLAINTEXT://localhost:29192,CONTROLLER://localhost:29193,PLAINTEXT_HOST://0.0.0.0:9192'

natss:
image: nats-streaming:0.22.1
ports:
Expand Down
3 changes: 2 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ err := json.Unmarshal(bytes, &event)
| AVRO Event Format | :x: | :x: |
| [HTTP Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/http) | :heavy_check_mark: | :heavy_check_mark: |
| [JSON Event Format](event_data_structure.md#marshalunmarshal-event-to-json) | :heavy_check_mark: | :heavy_check_mark: |
| [Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka) | :heavy_check_mark: | :heavy_check_mark: |
| [Sarama Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka) | :heavy_check_mark: | :heavy_check_mark: |
| [Confluent Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka_confluent) | :heavy_check_mark: | :heavy_check_mark: |
| MQTT Protocol Binding | :x: | :x: |
| [NATS Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/nats) | :heavy_check_mark: | :heavy_check_mark: |
| [STAN Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/stan) | :heavy_check_mark: | :heavy_check_mark: |
Expand Down
25 changes: 25 additions & 0 deletions protocol/kafka_confluent/v2/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2

go 1.18

replace github.com/cloudevents/sdk-go/v2 => ../../../v2

require (
github.com/cloudevents/sdk-go/v2 v2.15.2
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/stretchr/testify v1.8.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
68 changes: 68 additions & 0 deletions protocol/kafka_confluent/v2/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I=
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts=
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8=
github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA=
github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68=
github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs=
github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78=
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3tiVYb5z54aKaDfakKn0dDjIyPpTtszkjuMzyt7ec=
github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/testcontainers/testcontainers-go v0.14.0 h1:h0D5GaYG9mhOWr2qHdEKDXpkce/VlvaYOCzTRi6UBi8=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 h1:0BOZf6qNozI3pkN3fJLwNubheHJYHhMh91GRFOWWK08=
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
156 changes: 156 additions & 0 deletions protocol/kafka_confluent/v2/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package kafka_confluent

import (
"bytes"
"context"
"strconv"
"strings"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/cloudevents/sdk-go/v2/binding/spec"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

const (
prefix = "ce-"
contentTypeKey = "content-type"
)

const (
KafkaOffsetKey = "kafkaoffset"
KafkaPartitionKey = "kafkapartition"
KafkaTopicKey = "kafkatopic"
KafkaMessageKey = "kafkamessagekey"
)

var specs = spec.WithPrefix(prefix)

// Message represents a Kafka message.
// This message *can* be read several times safely
type Message struct {
internal *kafka.Message
properties map[string][]byte
format format.Format
version spec.Version
}

// Check if Message implements binding.Message
var (
_ binding.Message = (*Message)(nil)
_ binding.MessageMetadataReader = (*Message)(nil)
)

// NewMessage returns a binding.Message that holds the provided kafka.Message.
// The returned binding.Message *can* be read several times safely
// This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance
func NewMessage(msg *kafka.Message) *Message {
if msg == nil {
panic("the kafka.Message shouldn't be nil")
}
if msg.TopicPartition.Topic == nil {
panic("the topic of kafka.Message shouldn't be nil")
}
if msg.TopicPartition.Partition < 0 || msg.TopicPartition.Offset < 0 {
panic("the partition or offset of the kafka.Message must be non-negative")
}

var contentType, contentVersion string
yanmxa marked this conversation as resolved.
Show resolved Hide resolved
properties := make(map[string][]byte, len(msg.Headers)+3)
for _, header := range msg.Headers {
k := strings.ToLower(string(header.Key))
if k == strings.ToLower(contentTypeKey) {
contentType = string(header.Value)
}
if k == specs.PrefixedSpecVersionName() {
contentVersion = string(header.Value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: what if header.Value is 1.0.2 but MUST be 1.0 according to

Compliant event producers MUST use a value of 1.0 when referring to this version of the specification
https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#specversion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I‘m not sure how to handle the specversion with value 1.0.2. Maybe just validate the 'major' and 'minor' versions whether compliant with "1.0"? and if not, then print a warning message?

I checked the other protocol. Then all just assert whether the specversion key exists and ignore the value.

Do you have any other advice?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, IMHO this is a shortcoming in the other implementations. I'm fine leaving this here as is then given it's consistent with our other implementations. cc/ @duglin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like being forgiving and just ignoring the patch version number - if we want to do anything different at all. I'm ok with the current choice of only looking for 0.3 and 1.0.

}
properties[k] = header.Value
}

// add the kafka message key, topic, partition and partition key to the properties
properties[prefix+KafkaOffsetKey] = []byte(strconv.FormatInt(int64(msg.TopicPartition.Offset), 10))
properties[prefix+KafkaPartitionKey] = []byte(strconv.FormatInt(int64(msg.TopicPartition.Partition), 10))
properties[prefix+KafkaTopicKey] = []byte(*msg.TopicPartition.Topic)
if msg.Key != nil {
properties[prefix+KafkaMessageKey] = msg.Key
}

message := &Message{
internal: msg,
properties: properties,
}
if ft := format.Lookup(contentType); ft != nil {
message.format = ft
} else if v := specs.Version(contentVersion); v != nil {
message.version = v
}

return message
}

func (m *Message) ReadEncoding() binding.Encoding {
if m.version != nil {
return binding.EncodingBinary
}
if m.format != nil {
return binding.EncodingStructured
}
return binding.EncodingUnknown
}

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
if m.format != nil {
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.internal.Value))
}
return binding.ErrNotStructured
}

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error {
if m.version == nil {
return binding.ErrNotBinary
}

var err error
for k, v := range m.properties {
if strings.HasPrefix(k, prefix) {
attr := m.version.Attribute(k)
if attr != nil {
err = encoder.SetAttribute(attr, string(v))
} else {
err = encoder.SetExtension(strings.TrimPrefix(k, prefix), string(v))
}
} else if k == strings.ToLower(contentTypeKey) {
err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(v))
}
if err != nil {
return err
}
}

if m.internal.Value != nil {
err = encoder.SetData(bytes.NewBuffer(m.internal.Value))
}
yanmxa marked this conversation as resolved.
Show resolved Hide resolved
return err
}

func (m *Message) Finish(error) error {
return nil
}

func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) {
attr := m.version.AttributeFromKind(k)
if attr == nil {
return nil, nil
}
return attr, m.properties[attr.PrefixedName()]
}

func (m *Message) GetExtension(name string) interface{} {
return m.properties[prefix+name]
}
Loading