-
Notifications
You must be signed in to change notification settings - Fork 219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Confluent kafka binding #988
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
module github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2 | ||
|
||
go 1.18 | ||
|
||
replace github.com/cloudevents/sdk-go/v2 => ../../../v2 | ||
|
||
require ( | ||
github.com/cloudevents/sdk-go/v2 v2.15.2 | ||
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 | ||
github.com/stretchr/testify v1.8.4 | ||
) | ||
|
||
require ( | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/google/go-cmp v0.5.9 // indirect | ||
github.com/google/uuid v1.3.0 // indirect | ||
github.com/json-iterator/go v1.1.11 // indirect | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect | ||
github.com/modern-go/reflect2 v1.0.1 // indirect | ||
github.com/pmezard/go-difflib v1.0.0 // indirect | ||
go.uber.org/atomic v1.4.0 // indirect | ||
go.uber.org/multierr v1.1.0 // indirect | ||
go.uber.org/zap v1.10.0 // indirect | ||
gopkg.in/yaml.v3 v3.0.1 // indirect | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= | ||
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= | ||
github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I= | ||
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= | ||
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts= | ||
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8= | ||
github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA= | ||
github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs= | ||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | ||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68= | ||
github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE= | ||
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= | ||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= | ||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= | ||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= | ||
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= | ||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= | ||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= | ||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= | ||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= | ||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= | ||
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ= | ||
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= | ||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= | ||
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= | ||
github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs= | ||
github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= | ||
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc= | ||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= | ||
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= | ||
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= | ||
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= | ||
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= | ||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= | ||
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= | ||
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3tiVYb5z54aKaDfakKn0dDjIyPpTtszkjuMzyt7ec= | ||
github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w= | ||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= | ||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | ||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= | ||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= | ||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= | ||
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= | ||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= | ||
github.com/testcontainers/testcontainers-go v0.14.0 h1:h0D5GaYG9mhOWr2qHdEKDXpkce/VlvaYOCzTRi6UBi8= | ||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= | ||
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= | ||
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= | ||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= | ||
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= | ||
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= | ||
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= | ||
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= | ||
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= | ||
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= | ||
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= | ||
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 h1:0BOZf6qNozI3pkN3fJLwNubheHJYHhMh91GRFOWWK08= | ||
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= | ||
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= | ||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | ||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= | ||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
/* | ||
Copyright 2023 The CloudEvents Authors | ||
SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package kafka_confluent | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"strconv" | ||
"strings" | ||
|
||
"github.com/cloudevents/sdk-go/v2/binding" | ||
"github.com/cloudevents/sdk-go/v2/binding/format" | ||
"github.com/cloudevents/sdk-go/v2/binding/spec" | ||
"github.com/confluentinc/confluent-kafka-go/v2/kafka" | ||
) | ||
|
||
const ( | ||
prefix = "ce-" | ||
contentTypeKey = "content-type" | ||
) | ||
|
||
const ( | ||
KafkaOffsetKey = "kafkaoffset" | ||
KafkaPartitionKey = "kafkapartition" | ||
KafkaTopicKey = "kafkatopic" | ||
KafkaMessageKey = "kafkamessagekey" | ||
) | ||
|
||
var specs = spec.WithPrefix(prefix) | ||
|
||
// Message represents a Kafka message. | ||
// This message *can* be read several times safely | ||
type Message struct { | ||
internal *kafka.Message | ||
properties map[string][]byte | ||
format format.Format | ||
version spec.Version | ||
} | ||
|
||
// Check if Message implements binding.Message | ||
var ( | ||
_ binding.Message = (*Message)(nil) | ||
_ binding.MessageMetadataReader = (*Message)(nil) | ||
) | ||
|
||
// NewMessage returns a binding.Message that holds the provided kafka.Message. | ||
// The returned binding.Message *can* be read several times safely | ||
// This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance | ||
func NewMessage(msg *kafka.Message) *Message { | ||
if msg == nil { | ||
panic("the kafka.Message shouldn't be nil") | ||
} | ||
if msg.TopicPartition.Topic == nil { | ||
panic("the topic of kafka.Message shouldn't be nil") | ||
} | ||
if msg.TopicPartition.Partition < 0 || msg.TopicPartition.Offset < 0 { | ||
panic("the partition or offset of the kafka.Message must be non-negative") | ||
} | ||
|
||
var contentType, contentVersion string | ||
yanmxa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
properties := make(map[string][]byte, len(msg.Headers)+3) | ||
for _, header := range msg.Headers { | ||
k := strings.ToLower(string(header.Key)) | ||
if k == strings.ToLower(contentTypeKey) { | ||
contentType = string(header.Value) | ||
} | ||
if k == specs.PrefixedSpecVersionName() { | ||
contentVersion = string(header.Value) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: what if
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I‘m not sure how to handle the specversion with value 1.0.2. Maybe just validate the 'major' and 'minor' versions whether compliant with "1.0"? and if not, then print a warning message? I checked the other protocol. Then all just assert whether the specversion key exists and ignore the value. Do you have any other advice? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, IMHO this is a shortcoming in the other implementations. I'm fine leaving this here as is then given it's consistent with our other implementations. cc/ @duglin There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like being forgiving and just ignoring the patch version number - if we want to do anything different at all. I'm ok with the current choice of only looking for 0.3 and 1.0. |
||
} | ||
properties[k] = header.Value | ||
} | ||
|
||
// add the kafka message key, topic, partition and partition key to the properties | ||
properties[prefix+KafkaOffsetKey] = []byte(strconv.FormatInt(int64(msg.TopicPartition.Offset), 10)) | ||
properties[prefix+KafkaPartitionKey] = []byte(strconv.FormatInt(int64(msg.TopicPartition.Partition), 10)) | ||
properties[prefix+KafkaTopicKey] = []byte(*msg.TopicPartition.Topic) | ||
if msg.Key != nil { | ||
properties[prefix+KafkaMessageKey] = msg.Key | ||
} | ||
|
||
message := &Message{ | ||
internal: msg, | ||
properties: properties, | ||
} | ||
if ft := format.Lookup(contentType); ft != nil { | ||
message.format = ft | ||
} else if v := specs.Version(contentVersion); v != nil { | ||
message.version = v | ||
} | ||
|
||
return message | ||
} | ||
|
||
func (m *Message) ReadEncoding() binding.Encoding { | ||
if m.version != nil { | ||
return binding.EncodingBinary | ||
} | ||
if m.format != nil { | ||
return binding.EncodingStructured | ||
} | ||
return binding.EncodingUnknown | ||
} | ||
|
||
func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error { | ||
if m.format != nil { | ||
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.internal.Value)) | ||
} | ||
return binding.ErrNotStructured | ||
} | ||
|
||
func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error { | ||
if m.version == nil { | ||
return binding.ErrNotBinary | ||
} | ||
|
||
var err error | ||
for k, v := range m.properties { | ||
if strings.HasPrefix(k, prefix) { | ||
attr := m.version.Attribute(k) | ||
if attr != nil { | ||
err = encoder.SetAttribute(attr, string(v)) | ||
} else { | ||
err = encoder.SetExtension(strings.TrimPrefix(k, prefix), string(v)) | ||
} | ||
} else if k == strings.ToLower(contentTypeKey) { | ||
err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(v)) | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
if m.internal.Value != nil { | ||
err = encoder.SetData(bytes.NewBuffer(m.internal.Value)) | ||
} | ||
yanmxa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return err | ||
} | ||
|
||
func (m *Message) Finish(error) error { | ||
return nil | ||
} | ||
|
||
func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) { | ||
attr := m.version.AttributeFromKind(k) | ||
if attr == nil { | ||
return nil, nil | ||
} | ||
return attr, m.properties[attr.PrefixedName()] | ||
} | ||
|
||
func (m *Message) GetExtension(name string) interface{} { | ||
return m.properties[prefix+name] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename the one above from "kafka" to "kafka_sarama" to be consistent with the rest of the naming?