diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 000000000..38011303d --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @cloudevents/sdk-go-maintainers diff --git a/.github/workflows/go-lint.yaml b/.github/workflows/go-lint.yaml index 66f3181a4..33cdf95c9 100644 --- a/.github/workflows/go-lint.yaml +++ b/.github/workflows/go-lint.yaml @@ -30,7 +30,7 @@ jobs: - name: Go Lint on ./v2 if: steps.golangci_configuration.outputs.files_exists == 'true' - uses: golangci/golangci-lint-action@v4 + uses: golangci/golangci-lint-action@v6 with: version: v1.54 working-directory: v2 diff --git a/docs/Gemfile.lock b/docs/Gemfile.lock index c165e1ff9..29f563429 100644 --- a/docs/Gemfile.lock +++ b/docs/Gemfile.lock @@ -201,14 +201,14 @@ GEM rb-fsevent (~> 0.10, >= 0.10.3) rb-inotify (~> 0.9, >= 0.9.10) mercenary (0.3.6) - mini_portile2 (2.8.5) + mini_portile2 (2.8.6) minima (2.5.1) jekyll (>= 3.5, < 5.0) jekyll-feed (~> 0.9) jekyll-seo-tag (~> 2.1) minitest (5.17.0) multipart-post (2.1.1) - nokogiri (1.16.2) + nokogiri (1.16.5) mini_portile2 (~> 2.8.2) racc (~> 1.4) octokit (4.18.0) diff --git a/observability/opencensus/v2/go.mod b/observability/opencensus/v2/go.mod index 73ce7da13..6b90506b9 100644 --- a/observability/opencensus/v2/go.mod +++ b/observability/opencensus/v2/go.mod @@ -22,7 +22,7 @@ require ( go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect - golang.org/x/net v0.17.0 // indirect + golang.org/x/net v0.23.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/observability/opencensus/v2/go.sum b/observability/opencensus/v2/go.sum index a86dbdedf..ec15c5f41 100644 --- a/observability/opencensus/v2/go.sum +++ b/observability/opencensus/v2/go.sum @@ -80,8 +80,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -97,11 +97,11 @@ golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/protocol/amqp/v2/message.go b/protocol/amqp/v2/message.go index 698bf041d..0756ddfc7 100644 --- a/protocol/amqp/v2/message.go +++ b/protocol/amqp/v2/message.go @@ -50,8 +50,10 @@ func NewMessage(message *amqp.Message, receiver *amqp.Receiver) *Message { return &Message{AMQP: message, AMQPrcv: receiver, format: fmt, version: vn} } -var _ binding.Message = (*Message)(nil) -var _ binding.MessageMetadataReader = (*Message)(nil) +var ( + _ binding.Message = (*Message)(nil) + _ binding.MessageMetadataReader = (*Message)(nil) +) func getSpecVersion(message *amqp.Message) spec.Version { if sv, ok := message.ApplicationProperties[specs.PrefixedSpecVersionName()]; ok { @@ -74,7 +76,8 @@ func (m *Message) ReadEncoding() binding.Encoding { func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error { if m.format != nil { - return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.AMQP.GetData())) + data := m.getAmqpData() + return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(data)) } return binding.ErrNotStructured } @@ -106,7 +109,7 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) } } - data := m.AMQP.GetData() + data := m.getAmqpData() if len(data) != 0 { // Some data err = encoder.SetData(bytes.NewBuffer(data)) if err != nil { @@ -137,3 +140,15 @@ func (m *Message) Finish(err error) error { } return m.AMQPrcv.AcceptMessage(context.Background(), m.AMQP) } + +// fixes: github.com/cloudevents/spec/issues/1275 +func (m *Message) getAmqpData() []byte { + var data []byte + amqpData := m.AMQP.Data + + // TODO: replace with slices.Concat once go mod bumped to 1.22 + for idx := range amqpData { + data = append(data, amqpData[idx]...) + } + return data +} diff --git a/protocol/amqp/v2/message_test.go b/protocol/amqp/v2/message_test.go index 74d3a4bad..71a9d8409 100644 --- a/protocol/amqp/v2/message_test.go +++ b/protocol/amqp/v2/message_test.go @@ -62,3 +62,58 @@ func TestNewMessage_message_unknown(t *testing.T) { got := NewMessage(message, &rcv) require.Equal(t, binding.EncodingUnknown, got.ReadEncoding()) } + +func TestMessage_getAmqpData(t *testing.T) { + tests := []struct { + name string + message *amqp.Message + want []byte + }{ + { + name: "nil data", + message: amqp.NewMessage(nil), + want: nil, + }, + { + name: "empty string", + message: amqp.NewMessage([]byte(`""`)), + want: []byte(`""`), + }, + { + name: "simple string", + message: amqp.NewMessage([]byte("hello world")), + want: []byte("hello world"), + }, + { + name: "multiple data with simple strings", + message: &amqp.Message{Data: [][]byte{ + []byte("hello"), + []byte(" "), + []byte("world"), + }}, + want: []byte("hello world"), + }, + { + name: "multiple data to build JSON array", + message: &amqp.Message{Data: [][]byte{ + []byte("["), + []byte("Foo"), + []byte(","), + []byte("Bar"), + []byte(","), + []byte("Baz"), + []byte("]"), + }}, + want: []byte("[Foo,Bar,Baz]"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &Message{ + AMQP: tt.message, + } + got := m.getAmqpData() + require.Equal(t, tt.want, got) + }) + } +} diff --git a/protocol/kafka_confluent/v2/message.go b/protocol/kafka_confluent/v2/message.go index 164879a11..43df8d4ff 100644 --- a/protocol/kafka_confluent/v2/message.go +++ b/protocol/kafka_confluent/v2/message.go @@ -11,14 +11,15 @@ import ( "strconv" "strings" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/binding/format" "github.com/cloudevents/sdk-go/v2/binding/spec" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) const ( - prefix = "ce-" + prefix = "ce_" contentTypeKey = "content-type" ) diff --git a/protocol/kafka_confluent/v2/message_test.go b/protocol/kafka_confluent/v2/message_test.go index e7f599b63..9676fe7ac 100644 --- a/protocol/kafka_confluent/v2/message_test.go +++ b/protocol/kafka_confluent/v2/message_test.go @@ -42,14 +42,14 @@ var ( TopicPartition: topicPartition, Value: []byte("hello world!"), Headers: mapToKafkaHeaders(map[string]string{ - "ce-type": testEvent.Type(), - "ce-source": testEvent.Source(), - "ce-id": testEvent.ID(), - "ce-time": test.Timestamp.String(), - "ce-specversion": "1.0", - "ce-dataschema": test.Schema.String(), - "ce-datacontenttype": "text/json", - "ce-subject": "receiverTopic", + "ce_type": testEvent.Type(), + "ce_source": testEvent.Source(), + "ce_id": testEvent.ID(), + "ce_time": test.Timestamp.String(), + "ce_specversion": "1.0", + "ce_dataschema": test.Schema.String(), + "ce_datacontenttype": "text/json", + "ce_subject": "receiverTopic", "exta": "someext", }), } @@ -89,14 +89,14 @@ func TestNewMessage(t *testing.T) { TopicPartition: topicPartition, Value: nil, Headers: mapToKafkaHeaders(map[string]string{ - "ce-type": testEvent.Type(), - "ce-source": testEvent.Source(), - "ce-id": testEvent.ID(), - "ce-time": test.Timestamp.String(), - "ce-specversion": "1.0", - "ce-dataschema": test.Schema.String(), - "ce-datacontenttype": "text/json", - "ce-subject": "receiverTopic", + "ce_type": testEvent.Type(), + "ce_source": testEvent.Source(), + "ce_id": testEvent.ID(), + "ce_time": test.Timestamp.String(), + "ce_specversion": "1.0", + "ce_dataschema": test.Schema.String(), + "ce_datacontenttype": "text/json", + "ce_subject": "receiverTopic", }), }, expectedEncoding: binding.EncodingBinary, diff --git a/protocol/kafka_confluent/v2/option.go b/protocol/kafka_confluent/v2/option.go index e3b0b566f..7eef74007 100644 --- a/protocol/kafka_confluent/v2/option.go +++ b/protocol/kafka_confluent/v2/option.go @@ -15,7 +15,7 @@ import ( // Option is the function signature required to be considered an kafka_confluent.Option. type Option func(*Protocol) error -// WithConfigMap sets the configMap to init the kafka client. This option is not required. +// WithConfigMap sets the configMap to init the kafka client. func WithConfigMap(config *kafka.ConfigMap) Option { return func(p *Protocol) error { if config == nil { @@ -26,7 +26,7 @@ func WithConfigMap(config *kafka.ConfigMap) Option { } } -// WithSenderTopic sets the defaultTopic for the kafka.Producer. This option is not required. +// WithSenderTopic sets the defaultTopic for the kafka.Producer. func WithSenderTopic(defaultTopic string) Option { return func(p *Protocol) error { if defaultTopic == "" { @@ -37,7 +37,7 @@ func WithSenderTopic(defaultTopic string) Option { } } -// WithReceiverTopics sets the topics for the kafka.Consumer. This option is not required. +// WithReceiverTopics sets the topics for the kafka.Consumer. func WithReceiverTopics(topics []string) Option { return func(p *Protocol) error { if topics == nil { @@ -48,7 +48,7 @@ func WithReceiverTopics(topics []string) Option { } } -// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. This option is not required. +// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option { return func(p *Protocol) error { if rebalanceCb == nil { @@ -59,7 +59,7 @@ func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option { } } -// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. This option is not required. +// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. func WithPollTimeout(timeoutMs int) Option { return func(p *Protocol) error { p.consumerPollTimeout = timeoutMs @@ -67,7 +67,7 @@ func WithPollTimeout(timeoutMs int) Option { } } -// WithSender set a kafka.Producer instance to init the client directly. This option is not required. +// WithSender set a kafka.Producer instance to init the client directly. func WithSender(producer *kafka.Producer) Option { return func(p *Protocol) error { if producer == nil { @@ -78,7 +78,7 @@ func WithSender(producer *kafka.Producer) Option { } } -// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. This option is not required. +// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option { return func(p *Protocol) error { p.consumerErrorHandler = handler @@ -86,7 +86,7 @@ func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option } } -// WithSender set a kafka.Consumer instance to init the client directly. This option is not required. +// WithReceiver set a kafka.Consumer instance to init the client directly. func WithReceiver(consumer *kafka.Consumer) Option { return func(p *Protocol) error { if consumer == nil { @@ -97,12 +97,12 @@ func WithReceiver(consumer *kafka.Consumer) Option { } } -// Opaque key type used to store topicPartitionOffsets: assign them from ctx. This option is not required. +// Opaque key type used to store topicPartitionOffsets: assign them from ctx. type topicPartitionOffsetsType struct{} var offsetKey = topicPartitionOffsetsType{} -// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. This option is not required. +// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context { if len(topicPartitionOffsets) == 0 { panic("the topicPartitionOffsets cannot be empty") diff --git a/protocol/kafka_confluent/v2/protocol.go b/protocol/kafka_confluent/v2/protocol.go index 8aa906853..c527f1061 100644 --- a/protocol/kafka_confluent/v2/protocol.go +++ b/protocol/kafka_confluent/v2/protocol.go @@ -33,15 +33,14 @@ type Protocol struct { consumerTopics []string consumerRebalanceCb kafka.RebalanceCb // optional consumerPollTimeout int // optional - consumerErrorHandler func(ctx context.Context, err kafka.Error) //optional + consumerErrorHandler func(ctx context.Context, err kafka.Error) // optional consumerMux sync.Mutex consumerIncoming chan *kafka.Message consumerCtx context.Context consumerCancel context.CancelFunc producer *kafka.Producer - producerDeliveryChan chan kafka.Event // optional - producerDefaultTopic string // optional + producerDefaultTopic string // optional closerMux sync.Mutex } @@ -85,12 +84,18 @@ func New(opts ...Option) (*Protocol, error) { if p.kafkaConfigMap == nil && p.producer == nil && p.consumer == nil { return nil, errors.New("at least one of the following to initialize the protocol must be set: config, producer, or consumer") } - if p.producer != nil { - p.producerDeliveryChan = make(chan kafka.Event) - } return p, nil } +// Events returns the events channel used by Confluent Kafka to deliver the result from a produce, i.e., send, operation. +// When using this SDK to produce (send) messages, this channel must be monitored to avoid resource leaks and this channel becoming full. See Confluent SDK for Go for details on the implementation. +func (p *Protocol) Events() (chan kafka.Event, error) { + if p.producer == nil { + return nil, errors.New("producer not set") + } + return p.producer.Events(), nil +} + func (p *Protocol) applyOptions(opts ...Option) error { for _, fn := range opts { if err := fn(p); err != nil { @@ -100,6 +105,7 @@ func (p *Protocol) applyOptions(opts ...Option) error { return nil } +// Send message by kafka.Producer. You must monitor the Events() channel when using this function. func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error) { if p.producer == nil { return errors.New("producer client must be set") @@ -128,19 +134,12 @@ func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers .. kafkaMsg.Key = []byte(messageKey) } - err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...) - if err != nil { - return err + if err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...); err != nil { + return fmt.Errorf("create producer message: %w", err) } - err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan) - if err != nil { - return err - } - e := <-p.producerDeliveryChan - m := e.(*kafka.Message) - if m.TopicPartition.Error != nil { - return m.TopicPartition.Error + if err = p.producer.Produce(kafkaMsg, nil); err != nil { + return fmt.Errorf("produce message: %w", err) } return nil } @@ -231,15 +230,18 @@ func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) { func (p *Protocol) Close(ctx context.Context) error { p.closerMux.Lock() defer p.closerMux.Unlock() + logger := cecontext.LoggerFrom(ctx) if p.consumerCancel != nil { p.consumerCancel() } if p.producer != nil && !p.producer.IsClosed() { + // Flush and close the producer with a 10 seconds timeout (closes Events channel) + for p.producer.Flush(10000) > 0 { + logger.Info("Flushing outstanding messages") + } p.producer.Close() - close(p.producerDeliveryChan) } - return nil } diff --git a/protocol/kafka_sarama/v2/go.mod b/protocol/kafka_sarama/v2/go.mod index ff2fc11ed..24ed04fe8 100644 --- a/protocol/kafka_sarama/v2/go.mod +++ b/protocol/kafka_sarama/v2/go.mod @@ -37,7 +37,7 @@ require ( go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect - golang.org/x/crypto v0.17.0 // indirect - golang.org/x/net v0.17.0 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/net v0.23.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/protocol/kafka_sarama/v2/go.sum b/protocol/kafka_sarama/v2/go.sum index 7a05f314b..38a1dd9ad 100644 --- a/protocol/kafka_sarama/v2/go.sum +++ b/protocol/kafka_sarama/v2/go.sum @@ -76,13 +76,13 @@ go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/protocol/mqtt_paho/v2/message.go b/protocol/mqtt_paho/v2/message.go index 8dd938545..2bd7ed35d 100644 --- a/protocol/mqtt_paho/v2/message.go +++ b/protocol/mqtt_paho/v2/message.go @@ -17,8 +17,7 @@ import ( ) const ( - prefix = "ce-" - contentType = "Content-Type" + prefix = "ce-" ) var specs = spec.WithPrefix(prefix) @@ -41,8 +40,7 @@ func NewMessage(msg *paho.Publish) *Message { var f format.Format var v spec.Version if msg.Properties != nil { - // Use properties.User["Content-type"] to determine if message is structured - if s := msg.Properties.User.Get(contentType); format.IsFormat(s) { + if s := msg.Properties.ContentType; format.IsFormat(s) { f = format.Lookup(s) } else if s := msg.Properties.User.Get(specs.PrefixedSpecVersionName()); s != "" { v = specs.Version(s) @@ -88,14 +86,20 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) } else { err = encoder.SetExtension(strings.TrimPrefix(userProperty.Key, prefix), userProperty.Value) } - } else if userProperty.Key == contentType { - err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(userProperty.Value)) } if err != nil { return } } + contentType := m.internal.Properties.ContentType + if contentType != "" { + err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), contentType) + if err != nil { + return err + } + } + if m.internal.Payload != nil { return encoder.SetData(bytes.NewBuffer(m.internal.Payload)) } diff --git a/protocol/mqtt_paho/v2/message_test.go b/protocol/mqtt_paho/v2/message_test.go index 757f81f5c..ff51a0a0c 100644 --- a/protocol/mqtt_paho/v2/message_test.go +++ b/protocol/mqtt_paho/v2/message_test.go @@ -32,7 +32,7 @@ func TestReadStructured(t *testing.T) { msg: &paho.Publish{ Payload: []byte(""), Properties: &paho.PublishProperties{ - User: []paho.UserProperty{{Key: contentType, Value: event.ApplicationCloudEventsJSON}}, + ContentType: event.ApplicationCloudEventsJSON, }, }, }, diff --git a/protocol/mqtt_paho/v2/write_message.go b/protocol/mqtt_paho/v2/write_message.go index a4b87f4aa..9db47e918 100644 --- a/protocol/mqtt_paho/v2/write_message.go +++ b/protocol/mqtt_paho/v2/write_message.go @@ -42,11 +42,9 @@ var ( func (b *pubMessageWriter) SetStructuredEvent(ctx context.Context, f format.Format, event io.Reader) error { if b.Properties == nil { - b.Properties = &paho.PublishProperties{ - User: make([]paho.UserProperty, 0), - } + b.Properties = &paho.PublishProperties{} } - b.Properties.User.Add(contentType, f.MediaType()) + b.Properties.ContentType = f.MediaType() var buf bytes.Buffer _, err := io.Copy(&buf, event) if err != nil { @@ -85,15 +83,13 @@ func (b *pubMessageWriter) SetData(reader io.Reader) error { func (b *pubMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error { if attribute.Kind() == spec.DataContentType { if value == nil { - b.removeProperty(contentType) + b.Properties.ContentType = "" } s, err := types.Format(value) if err != nil { return err } - if err := b.addProperty(contentType, s); err != nil { - return err - } + b.Properties.ContentType = s } else { if value == nil { b.removeProperty(prefix + attribute.Name()) diff --git a/protocol/pubsub/v2/go.mod b/protocol/pubsub/v2/go.mod index 663693de4..e9cd57d6d 100644 --- a/protocol/pubsub/v2/go.mod +++ b/protocol/pubsub/v2/go.mod @@ -33,10 +33,10 @@ require ( go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect - golang.org/x/net v0.17.0 // indirect + golang.org/x/net v0.23.0 // indirect golang.org/x/oauth2 v0.7.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/sys v0.18.0 // indirect + golang.org/x/text v0.14.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/protocol/pubsub/v2/go.sum b/protocol/pubsub/v2/go.sum index 84de05242..2928930ad 100644 --- a/protocol/pubsub/v2/go.sum +++ b/protocol/pubsub/v2/go.sum @@ -96,8 +96,8 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g= golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= @@ -110,13 +110,13 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/samples/http/go.mod b/samples/http/go.mod index 973513c05..a590f18e2 100644 --- a/samples/http/go.mod +++ b/samples/http/go.mod @@ -58,9 +58,9 @@ require ( go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect golang.org/x/arch v0.3.0 // indirect - golang.org/x/crypto v0.17.0 // indirect - golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/net v0.23.0 // indirect + golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect google.golang.org/grpc v1.56.3 // indirect diff --git a/samples/http/go.sum b/samples/http/go.sum index 99a7484c2..ab2d5dcfd 100644 --- a/samples/http/go.sum +++ b/samples/http/go.sum @@ -212,8 +212,8 @@ golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -229,8 +229,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -257,8 +257,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= diff --git a/samples/kafka/go.mod b/samples/kafka/go.mod index d30381549..e42c46f64 100644 --- a/samples/kafka/go.mod +++ b/samples/kafka/go.mod @@ -32,8 +32,8 @@ require ( go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect - golang.org/x/crypto v0.17.0 // indirect - golang.org/x/net v0.17.0 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/net v0.23.0 // indirect ) replace github.com/cloudevents/sdk-go/v2 => ../../v2 diff --git a/samples/kafka/go.sum b/samples/kafka/go.sum index 15429c866..7db1f7227 100644 --- a/samples/kafka/go.sum +++ b/samples/kafka/go.sum @@ -70,13 +70,13 @@ go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/samples/kafka_confluent/receiver/main.go b/samples/kafka_confluent/receiver/main.go index 1817dd2f1..3b9efab6c 100644 --- a/samples/kafka_confluent/receiver/main.go +++ b/samples/kafka_confluent/receiver/main.go @@ -33,8 +33,9 @@ func main() { } defer receiver.Close(ctx) - // Setting the 'client.WithPollGoroutines(1)' to make sure the events from kafka partition are processed in order - c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1)) + // The 'WithBlockingCallback()' is to make event processing serialized (no concurrency), use this option along with WithPollGoroutines(1). + // These two options make sure the events from kafka partition are processed in order + c, err := cloudevents.NewClient(receiver, client.WithBlockingCallback(), client.WithPollGoroutines(1)) if err != nil { log.Fatalf("failed to create client, %v", err) } diff --git a/samples/kafka_confluent/sender/main.go b/samples/kafka_confluent/sender/main.go index bfebf816c..777509224 100644 --- a/samples/kafka_confluent/sender/main.go +++ b/samples/kafka_confluent/sender/main.go @@ -8,6 +8,7 @@ package main import ( "context" "log" + "sync" confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -26,8 +27,49 @@ func main() { sender, err := confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{ "bootstrap.servers": "127.0.0.1:9092", }), confluent.WithSenderTopic(topic)) + if err != nil { + log.Fatalf("failed to create protocol, %v", err) + } + + var wg sync.WaitGroup + wg.Add(1) - defer sender.Close(ctx) + // Listen to all the events on the default events channel + // It's important to read these events otherwise the events channel will eventually fill up + go func() { + defer wg.Done() + eventChan, err := sender.Events() + if err != nil { + log.Fatalf("failed to get events channel for sender, %v", err) + } + for e := range eventChan { + switch ev := e.(type) { + case *kafka.Message: + // The message delivery report, indicating success or + // permanent failure after retries have been exhausted. + // Application level retries won't help since the client + // is already configured to do that. + m := ev + if m.TopicPartition.Error != nil { + log.Printf("Delivery failed: %v\n", m.TopicPartition.Error) + } else { + log.Printf("Delivered message to topic %s [%d] at offset %v\n", + *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) + } + case kafka.Error: + // Generic client instance-level errors, such as + // broker connection failures, authentication issues, etc. + // + // These errors should generally be considered informational + // as the underlying client will automatically try to + // recover from any errors encountered, the application + // does not need to take action on them. + log.Printf("Error: %v\n", ev) + default: + log.Printf("Ignored event: %v\n", ev) + } + } + }() c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) if err != nil { @@ -43,14 +85,13 @@ func main() { "message": "Hello, World!", }) - if result := c.Send( - // Set the producer message key - confluent.WithMessageKey(ctx, e.ID()), - e, - ); cloudevents.IsUndelivered(result) { + if result := c.Send(confluent.WithMessageKey(ctx, e.ID()), e); cloudevents.IsUndelivered(result) { log.Printf("failed to send: %v", result) } else { log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result)) } } + + sender.Close(ctx) + wg.Wait() } diff --git a/samples/pubsub/go.mod b/samples/pubsub/go.mod index 7b8630fcc..2883ec801 100644 --- a/samples/pubsub/go.mod +++ b/samples/pubsub/go.mod @@ -27,11 +27,11 @@ require ( go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect - golang.org/x/net v0.17.0 // indirect + golang.org/x/net v0.23.0 // indirect golang.org/x/oauth2 v0.7.0 // indirect golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/sys v0.18.0 // indirect + golang.org/x/text v0.14.0 // indirect google.golang.org/api v0.114.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect diff --git a/samples/pubsub/go.sum b/samples/pubsub/go.sum index 3625d93fd..1db53d982 100644 --- a/samples/pubsub/go.sum +++ b/samples/pubsub/go.sum @@ -98,8 +98,8 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g= golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= @@ -112,13 +112,13 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/sql/v2/README.md b/sql/v2/README.md index f45641d97..948f48f41 100644 --- a/sql/v2/README.md +++ b/sql/v2/README.md @@ -18,6 +18,54 @@ expression, err := cesqlparser.Parse("subject = 'Hello world'") res, err := expression.Evaluate(event) ``` +Add a user defined function +```go +import ( + cesql "github.com/cloudevents/sdk-go/sql/v2" + cefn "github.com/cloudevents/sdk-go/sql/v2/function" + cesqlparser "github.com/cloudevents/sdk-go/sql/v2/parser" + ceruntime "github.com/cloudevents/sdk-go/sql/v2/runtime" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +// Create a test event +event := cloudevents.NewEvent() +event.SetID("aaaa-bbbb-dddd") +event.SetSource("https://my-source") +event.SetType("dev.tekton.event") + +// Create and add a new user defined function +var HasPrefixFunction cesql.Function = cefn.NewFunction( + "HASPREFIX", + []cesql.Type{cesql.StringType, cesql.StringType}, + nil, + func(event cloudevents.Event, i []interface{}) (interface{}, error) { + str := i[0].(string) + prefix := i[1].(string) + + return strings.HasPrefix(str, prefix), nil + }, +) + +err := ceruntime.AddFunction(HasPrefixFunction) + +// parse the expression +expression, err := cesqlparser.Parse("HASPREFIX(type, 'dev.tekton.event')") + if err != nil { + fmt.Println("parser err: ", err) + os.Exit(1) + } + +// Evalute the expression with the test event +res, err := expression.Evaluate(event) + +if res.(bool) { + fmt.Println("Event type has the prefix") +} else { + fmt.Println("Event type doesn't have the prefix") +} +``` + ## Development guide To regenerate the parser, make sure you have [ANTLR4 installed](https://github.com/antlr/antlr4/blob/master/doc/getting-started.md) and then run: diff --git a/sql/v2/expression/like_expression.go b/sql/v2/expression/like_expression.go index 5f557fa5a..01734852a 100644 --- a/sql/v2/expression/like_expression.go +++ b/sql/v2/expression/like_expression.go @@ -6,9 +6,6 @@ package expression import ( - "regexp" - "strings" - cesql "github.com/cloudevents/sdk-go/sql/v2" "github.com/cloudevents/sdk-go/sql/v2/utils" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -16,7 +13,7 @@ import ( type likeExpression struct { baseUnaryExpression - pattern *regexp.Regexp + pattern string } func (l likeExpression) Evaluate(event cloudevents.Event) (interface{}, error) { @@ -30,70 +27,65 @@ func (l likeExpression) Evaluate(event cloudevents.Event) (interface{}, error) { return nil, err } - return l.pattern.MatchString(val.(string)), nil + return matchString(val.(string), l.pattern), nil + } func NewLikeExpression(child cesql.Expression, pattern string) (cesql.Expression, error) { - // Converting to regex is not the most performant impl, but it works - p, err := convertLikePatternToRegex(pattern) - if err != nil { - return nil, err - } - return likeExpression{ baseUnaryExpression: baseUnaryExpression{ child: child, }, - pattern: p, + pattern: pattern, }, nil } -func convertLikePatternToRegex(pattern string) (*regexp.Regexp, error) { - var chunks []string - chunks = append(chunks, "^") +func matchString(text, pattern string) bool { + textLen := len(text) + patternLen := len(pattern) + textIdx := 0 + patternIdx := 0 + lastWildcardIdx := -1 + lastMatchIdx := 0 - var chunk strings.Builder + if patternLen == 0 { + return patternLen == textLen + } - for i := 0; i < len(pattern); i++ { - if pattern[i] == '\\' && i < len(pattern)-1 { - if pattern[i+1] == '%' { - // \% case - chunk.WriteRune('%') - chunks = append(chunks, "\\Q"+chunk.String()+"\\E") - chunk.Reset() - i++ - continue - } else if pattern[i+1] == '_' { - // \_ case - chunk.WriteRune('_') - chunks = append(chunks, "\\Q"+chunk.String()+"\\E") - chunk.Reset() - i++ - continue - } else { - // if there is an actual literal \ character, we need to include that in the string - chunk.WriteRune('\\') - } - } else if pattern[i] == '_' { - // replace with . - chunks = append(chunks, "\\Q"+chunk.String()+"\\E") - chunk.Reset() - chunks = append(chunks, ".") - } else if pattern[i] == '%' { - // replace with .* - chunks = append(chunks, "\\Q"+chunk.String()+"\\E") - chunk.Reset() - chunks = append(chunks, ".*") + for textIdx < textLen { + if patternIdx < patternLen-1 && pattern[patternIdx] == '\\' && + ((pattern[patternIdx+1] == '_' || pattern[patternIdx+1] == '%') && + pattern[patternIdx+1] == text[textIdx]) { + // handle escaped characters -> pattern needs to increment two places here + patternIdx += 2 + textIdx += 1 + } else if patternIdx < patternLen && (pattern[patternIdx] == '_' || pattern[patternIdx] == text[textIdx]) { + // handle non escaped characters + textIdx += 1 + patternIdx += 1 + } else if patternIdx < patternLen && pattern[patternIdx] == '%' { + // handle wildcard characters + lastWildcardIdx = patternIdx + lastMatchIdx = textIdx + patternIdx += 1 + } else if lastWildcardIdx != -1 { + // greedy match didn't work, try again from the last known match + patternIdx = lastWildcardIdx + 1 + lastMatchIdx += 1 + textIdx = lastMatchIdx } else { - chunk.WriteByte(pattern[i]) + return false } } - if chunk.Len() != 0 { - chunks = append(chunks, "\\Q"+chunk.String()+"\\E") - } + // consume remaining pattern characters as long as they are wildcards + for patternIdx < patternLen { + if pattern[patternIdx] != '%' { + return false + } - chunks = append(chunks, "$") + patternIdx += 1 + } - return regexp.Compile(strings.Join(chunks, "")) + return true } diff --git a/sql/v2/function/function.go b/sql/v2/function/function.go index 4ad61faed..f43db3e9d 100644 --- a/sql/v2/function/function.go +++ b/sql/v2/function/function.go @@ -10,11 +10,13 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" ) +type FuncType func(cloudevents.Event, []interface{}) (interface{}, error) + type function struct { name string fixedArgs []cesql.Type variadicArgs *cesql.Type - fn func(cloudevents.Event, []interface{}) (interface{}, error) + fn FuncType } func (f function) Name() string { @@ -39,3 +41,15 @@ func (f function) ArgType(index int) *cesql.Type { func (f function) Run(event cloudevents.Event, arguments []interface{}) (interface{}, error) { return f.fn(event, arguments) } + +func NewFunction(name string, + fixedargs []cesql.Type, + variadicArgs *cesql.Type, + fn FuncType) cesql.Function { + return function{ + name: name, + fixedArgs: fixedargs, + variadicArgs: variadicArgs, + fn: fn, + } +} diff --git a/sql/v2/go.mod b/sql/v2/go.mod index 86d00e196..631a5b536 100644 --- a/sql/v2/go.mod +++ b/sql/v2/go.mod @@ -6,6 +6,7 @@ require ( github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 github.com/cloudevents/sdk-go/v2 v2.5.0 github.com/stretchr/testify v1.8.0 + gopkg.in/yaml.v2 v2.4.0 sigs.k8s.io/yaml v1.3.0 ) @@ -20,7 +21,6 @@ require ( go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/sql/v2/parser/expression_visitor.go b/sql/v2/parser/expression_visitor.go index 8ca5fd07e..4fb863ac2 100644 --- a/sql/v2/parser/expression_visitor.go +++ b/sql/v2/parser/expression_visitor.go @@ -6,6 +6,7 @@ package parser import ( + "fmt" "strconv" "strings" @@ -175,9 +176,13 @@ func (v *expressionVisitor) VisitLikeExpression(ctx *gen.LikeExpressionContext) if patternContext.DQUOTED_STRING_LITERAL() != nil { // Parse double quoted string pattern = dQuotedStringToString(patternContext.DQUOTED_STRING_LITERAL().GetText()) - } else { + } else if patternContext.SQUOTED_STRING_LITERAL() != nil { // Parse single quoted string pattern = sQuotedStringToString(patternContext.SQUOTED_STRING_LITERAL().GetText()) + } else { + // not a string, return an error + v.parsingErrors = append(v.parsingErrors, fmt.Errorf("failed to parse LIKE expression: the pattern was not a string literal")) + return noopExpression{} } likeExpression, err := expression.NewLikeExpression(v.Visit(ctx.Expression()).(cesql.Expression), pattern) diff --git a/sql/v2/runtime/functions_resolver.go b/sql/v2/runtime/functions_resolver.go index b80136842..5ab964fb7 100644 --- a/sql/v2/runtime/functions_resolver.go +++ b/sql/v2/runtime/functions_resolver.go @@ -58,6 +58,11 @@ func (table functionTable) AddFunction(function cesql.Function) error { } } +// Adds user defined function +func AddFunction(fn cesql.Function) error { + return globalFunctionTable.AddFunction(fn) +} + func (table functionTable) ResolveFunction(name string, args int) cesql.Function { item := table[strings.ToUpper(name)] if item == nil { diff --git a/sql/v2/runtime/test/tck/user_defined_functions.yaml b/sql/v2/runtime/test/tck/user_defined_functions.yaml new file mode 100644 index 000000000..c2a3a922e --- /dev/null +++ b/sql/v2/runtime/test/tck/user_defined_functions.yaml @@ -0,0 +1,27 @@ +name: User defined functions +tests: + - name: HASPREFIX (1) + expression: "HASPREFIX('abcdef', 'ab')" + result: true + - name: HASPREFIX (2) + expression: "HASPREFIX('abcdef', 'abcdef')" + result: true + - name: HASPREFIX (3) + expression: "HASPREFIX('abcdef', '')" + result: true + - name: HASPREFIX (4) + expression: "HASPREFIX('abcdef', 'gh')" + result: false + - name: HASPREFIX (5) + expression: "HASPREFIX('abcdef', 'abcdefg')" + result: false + + - name: KONKAT (1) + expression: "KONKAT('a', 'b', 'c')" + result: abc + - name: KONKAT (2) + expression: "KONKAT()" + result: "" + - name: KONKAT (3) + expression: "KONKAT('a')" + result: "a" \ No newline at end of file diff --git a/sql/v2/runtime/test/user_defined_functions_test.go b/sql/v2/runtime/test/user_defined_functions_test.go new file mode 100644 index 000000000..944ba98dd --- /dev/null +++ b/sql/v2/runtime/test/user_defined_functions_test.go @@ -0,0 +1,209 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package runtime_test + +import ( + "io" + "os" + "path" + "runtime" + "strings" + "testing" + + cesql "github.com/cloudevents/sdk-go/sql/v2" + "github.com/cloudevents/sdk-go/sql/v2/function" + "github.com/cloudevents/sdk-go/sql/v2/parser" + ceruntime "github.com/cloudevents/sdk-go/sql/v2/runtime" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/test" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +var TCKFileNames = []string{ + "user_defined_functions", +} + +var TCKUserDefinedFunctions = []cesql.Function{ + function.NewFunction( + "HASPREFIX", + []cesql.Type{cesql.StringType, cesql.StringType}, + nil, + func(event cloudevents.Event, i []interface{}) (interface{}, error) { + str := i[0].(string) + prefix := i[1].(string) + + return strings.HasPrefix(str, prefix), nil + }, + ), + function.NewFunction( + "KONKAT", + []cesql.Type{}, + cesql.TypePtr(cesql.StringType), + func(event cloudevents.Event, i []interface{}) (interface{}, error) { + var sb strings.Builder + for _, v := range i { + sb.WriteString(v.(string)) + } + return sb.String(), nil + }, + ), +} + +type ErrorType string + +const ( + ParseError ErrorType = "parse" + MathError ErrorType = "math" + CastError ErrorType = "cast" + MissingAttributeError ErrorType = "missingAttribute" + MissingFunctionError ErrorType = "missingFunction" + FunctionEvaluationError ErrorType = "functionEvaluation" +) + +type TckFile struct { + Name string `json:"name"` + Tests []TckTestCase `json:"tests"` +} + +type TckTestCase struct { + Name string `json:"name"` + Expression string `json:"expression"` + + Result interface{} `json:"result"` + Error ErrorType `json:"error"` + + Event *cloudevents.Event `json:"event"` + EventOverrides map[string]interface{} `json:"eventOverrides"` +} + +func (tc TckTestCase) InputEvent(t *testing.T) cloudevents.Event { + var inputEvent cloudevents.Event + if tc.Event != nil { + inputEvent = *tc.Event + } else { + inputEvent = test.FullEvent() + } + + // Make sure the event is v1 + inputEvent.SetSpecVersion(event.CloudEventsVersionV1) + + for k, v := range tc.EventOverrides { + require.NoError(t, spec.V1.SetAttribute(inputEvent.Context, k, v)) + } + + return inputEvent +} + +func (tc TckTestCase) ExpectedResult() interface{} { + switch tc.Result.(type) { + case int: + return int32(tc.Result.(int)) + case float64: + return int32(tc.Result.(float64)) + case bool: + return tc.Result.(bool) + } + return tc.Result +} + +func TestFunctionTableAddFunction(t *testing.T) { + + type args struct { + functions []cesql.Function + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "Add user functions to global table", + + args: args{ + functions: TCKUserDefinedFunctions, + }, + wantErr: false, + }, + { + name: "Fail add user functions to global table", + args: args{ + functions: TCKUserDefinedFunctions, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for _, fn := range tt.args.functions { + if err := ceruntime.AddFunction(fn); (err != nil) != tt.wantErr { + t.Errorf("AddFunction() error = %v, wantErr %v", err, tt.wantErr) + } + } + }) + } +} + +func TestUserFunctions(t *testing.T) { + tckFiles := make([]TckFile, 0, len(TCKFileNames)) + + _, basePath, _, _ := runtime.Caller(0) + basePath, _ = path.Split(basePath) + + for _, testFile := range TCKFileNames { + testFilePath := path.Join(basePath, "tck", testFile+".yaml") + + t.Logf("Loading file %s", testFilePath) + file, err := os.Open(testFilePath) + require.NoError(t, err) + + fileBytes, err := io.ReadAll(file) + require.NoError(t, err) + + tckFileModel := TckFile{} + require.NoError(t, yaml.Unmarshal(fileBytes, &tckFileModel)) + + tckFiles = append(tckFiles, tckFileModel) + } + + for i, file := range tckFiles { + i := i + t.Run(file.Name, func(t *testing.T) { + for j, testCase := range tckFiles[i].Tests { + j := j + testCase := testCase + t.Run(testCase.Name, func(t *testing.T) { + t.Parallel() + testCase := tckFiles[i].Tests[j] + + t.Logf("Test expression: '%s'", testCase.Expression) + + if testCase.Error == ParseError { + _, err := parser.Parse(testCase.Expression) + require.NotNil(t, err) + return + } + + expr, err := parser.Parse(testCase.Expression) + require.NoError(t, err) + require.NotNil(t, expr) + + inputEvent := testCase.InputEvent(t) + result, err := expr.Evaluate(inputEvent) + + if testCase.Error != "" { + require.NotNil(t, err) + } else { + require.NoError(t, err) + require.Equal(t, testCase.ExpectedResult(), result) + } + }) + } + }) + } +} diff --git a/sql/v2/test/tck/like_expression.yaml b/sql/v2/test/tck/like_expression.yaml index b6bc5a18b..31e202638 100644 --- a/sql/v2/test/tck/like_expression.yaml +++ b/sql/v2/test/tck/like_expression.yaml @@ -115,4 +115,11 @@ tests: result: false - name: With type coercion from bool (4) expression: "FALSE LIKE 'fal%'" - result: true + result: true + + - name: Invalid string literal in comparison causes parse error + expression: "x LIKE 123" + result: false + error: parse + eventOverrides: + x: "123" diff --git a/sql/v2/test/tck_test.go b/sql/v2/test/tck_test.go index f215c8db4..d22555517 100644 --- a/sql/v2/test/tck_test.go +++ b/sql/v2/test/tck_test.go @@ -6,6 +6,7 @@ package test import ( + "fmt" "io" "os" "path" @@ -70,7 +71,7 @@ type TckTestCase struct { EventOverrides map[string]interface{} `json:"eventOverrides"` } -func (tc TckTestCase) InputEvent(t *testing.T) cloudevents.Event { +func (tc TckTestCase) InputEvent(tb testing.TB) cloudevents.Event { var inputEvent cloudevents.Event if tc.Event != nil { inputEvent = *tc.Event @@ -82,7 +83,7 @@ func (tc TckTestCase) InputEvent(t *testing.T) cloudevents.Event { inputEvent.SetSpecVersion(event.CloudEventsVersionV1) for k, v := range tc.EventOverrides { - require.NoError(t, spec.V1.SetAttribute(inputEvent.Context, k, v)) + require.NoError(tb, spec.V1.SetAttribute(inputEvent.Context, k, v)) } return inputEvent @@ -159,3 +160,60 @@ func TestTCK(t *testing.T) { }) } } + +func BenchmarkTCK(b *testing.B) { + tckFiles := make([]TckFile, 0, len(TCKFileNames)) + + _, basePath, _, _ := runtime.Caller(0) + basePath, _ = path.Split(basePath) + + for _, testFile := range TCKFileNames { + testFilePath := path.Join(basePath, "tck", testFile+".yaml") + + b.Logf("Loading file %s", testFilePath) + + file, err := os.Open(testFilePath) + require.NoError(b, err) + + fileBytes, err := io.ReadAll(file) + require.NoError(b, err) + + tckFileModel := TckFile{} + require.NoError(b, yaml.Unmarshal(fileBytes, &tckFileModel)) + + tckFiles = append(tckFiles, tckFileModel) + } + + for i, file := range tckFiles { + i := i + b.Run(file.Name, func(b *testing.B) { + for j, testCase := range tckFiles[i].Tests { + j := j + testCase := testCase + b.Run(fmt.Sprintf("%v parse", testCase.Name), func(b *testing.B) { + testCase := tckFiles[i].Tests[j] + for k := 0; k < b.N; k++ { + _, _ = parser.Parse(testCase.Expression) + } + }) + + if testCase.Error == ParseError { + return + } + + b.Run(fmt.Sprintf("%v evaluate", testCase.Name), func(b *testing.B) { + testCase := tckFiles[i].Tests[j] + + expr, _ := parser.Parse(testCase.Expression) + + inputEvent := testCase.InputEvent(b) + + for k := 0; k < b.N; k++ { + _, _ = expr.Evaluate(inputEvent) + } + + }) + } + }) + } +} diff --git a/test/benchmark/go.mod b/test/benchmark/go.mod index 0b8868605..fac555be1 100644 --- a/test/benchmark/go.mod +++ b/test/benchmark/go.mod @@ -49,7 +49,7 @@ require ( go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect - golang.org/x/crypto v0.17.0 // indirect - golang.org/x/net v0.17.0 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/net v0.23.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/test/benchmark/go.sum b/test/benchmark/go.sum index 2b4e21ec9..baf8b10d9 100644 --- a/test/benchmark/go.sum +++ b/test/benchmark/go.sum @@ -78,13 +78,13 @@ go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/test/conformance/go.mod b/test/conformance/go.mod index 484123e6b..66f9e4d7c 100644 --- a/test/conformance/go.mod +++ b/test/conformance/go.mod @@ -49,6 +49,6 @@ require ( go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect - golang.org/x/crypto v0.17.0 // indirect - golang.org/x/net v0.17.0 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/net v0.23.0 // indirect ) diff --git a/test/conformance/go.sum b/test/conformance/go.sum index ae068f9ff..35cf0a280 100644 --- a/test/conformance/go.sum +++ b/test/conformance/go.sum @@ -99,8 +99,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -110,8 +110,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/test/integration/go.mod b/test/integration/go.mod index 492b65553..b80046249 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -77,10 +77,10 @@ require ( go.etcd.io/bbolt v1.3.6 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect - golang.org/x/crypto v0.17.0 // indirect - golang.org/x/net v0.17.0 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/net v0.23.0 // indirect golang.org/x/sync v0.4.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/sys v0.18.0 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/test/integration/go.sum b/test/integration/go.sum index 0976f57fb..422913dd6 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -179,8 +179,8 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -196,8 +196,8 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -219,8 +219,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= diff --git a/test/integration/kafka_confluent/kafka_test.go b/test/integration/kafka_confluent/kafka_test.go index ee3d77130..5ae429b2e 100644 --- a/test/integration/kafka_confluent/kafka_test.go +++ b/test/integration/kafka_confluent/kafka_test.go @@ -131,7 +131,8 @@ func protocolFactory(sendTopic string, receiveTopic []string, } if sendTopic != "" { p, err = confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{ - "bootstrap.servers": BOOTSTRAP_SERVER, + "bootstrap.servers": BOOTSTRAP_SERVER, + "go.delivery.reports": false, }), confluent.WithSenderTopic(sendTopic)) } return p, err diff --git a/v2/alias.go b/v2/alias.go index 2fbfaa9a7..0f484b33b 100644 --- a/v2/alias.go +++ b/v2/alias.go @@ -173,6 +173,7 @@ var ( WithTarget = http.WithTarget WithHeader = http.WithHeader + WithHost = http.WithHost WithShutdownTimeout = http.WithShutdownTimeout //WithEncoding = http.WithEncoding //WithStructuredEncoding = http.WithStructuredEncoding // TODO: expose new way diff --git a/v2/event/datacodec/codec.go b/v2/event/datacodec/codec.go index 3e077740b..6f5d1f4c5 100644 --- a/v2/event/datacodec/codec.go +++ b/v2/event/datacodec/codec.go @@ -8,6 +8,7 @@ package datacodec import ( "context" "fmt" + "strings" "github.com/cloudevents/sdk-go/v2/event/datacodec/json" "github.com/cloudevents/sdk-go/v2/event/datacodec/text" @@ -26,9 +27,20 @@ type Encoder func(ctx context.Context, in interface{}) ([]byte, error) var decoder map[string]Decoder var encoder map[string]Encoder +// ssDecoder is a map of content-type structured suffixes as defined in +// [Structured Syntax Suffixes](https://www.iana.org/assignments/media-type-structured-suffix/media-type-structured-suffix.xhtml), +// which may be used to match content types such as application/vnd.custom-app+json +var ssDecoder map[string]Decoder + +// ssEncoder is a map of content-type structured suffixes similar to ssDecoder. +var ssEncoder map[string]Encoder + func init() { decoder = make(map[string]Decoder, 10) + ssDecoder = make(map[string]Decoder, 10) + encoder = make(map[string]Encoder, 10) + ssEncoder = make(map[string]Encoder, 10) AddDecoder("", json.Decode) AddDecoder("application/json", json.Decode) @@ -37,12 +49,18 @@ func init() { AddDecoder("text/xml", xml.Decode) AddDecoder("text/plain", text.Decode) + AddStructuredSuffixDecoder("json", json.Decode) + AddStructuredSuffixDecoder("xml", xml.Decode) + AddEncoder("", json.Encode) AddEncoder("application/json", json.Encode) AddEncoder("text/json", json.Encode) AddEncoder("application/xml", xml.Encode) AddEncoder("text/xml", xml.Encode) AddEncoder("text/plain", text.Encode) + + AddStructuredSuffixEncoder("json", json.Encode) + AddStructuredSuffixEncoder("xml", xml.Encode) } // AddDecoder registers a decoder for a given content type. The codecs will use @@ -51,12 +69,34 @@ func AddDecoder(contentType string, fn Decoder) { decoder[contentType] = fn } +// AddStructuredSuffixDecoder registers a decoder for content-types which match the given structured +// syntax suffix as defined by +// [Structured Syntax Suffixes](https://www.iana.org/assignments/media-type-structured-suffix/media-type-structured-suffix.xhtml). +// This allows users to register custom decoders for non-standard content types which follow the +// structured syntax suffix standard (e.g. application/vnd.custom-app+json). +// +// Suffix should not include the "+" character, and "json" and "xml" are registered by default. +func AddStructuredSuffixDecoder(suffix string, fn Decoder) { + ssDecoder[suffix] = fn +} + // AddEncoder registers an encoder for a given content type. The codecs will // use these to encode the data payload for a cloudevent.Event object. func AddEncoder(contentType string, fn Encoder) { encoder[contentType] = fn } +// AddStructuredSuffixEncoder registers an encoder for content-types which match the given +// structured syntax suffix as defined by +// [Structured Syntax Suffixes](https://www.iana.org/assignments/media-type-structured-suffix/media-type-structured-suffix.xhtml). +// This allows users to register custom encoders for non-standard content types which follow the +// structured syntax suffix standard (e.g. application/vnd.custom-app+json). +// +// Suffix should not include the "+" character, and "json" and "xml" are registered by default. +func AddStructuredSuffixEncoder(suffix string, fn Encoder) { + ssEncoder[suffix] = fn +} + // Decode looks up and invokes the decoder registered for the given content // type. An error is returned if no decoder is registered for the given // content type. @@ -64,6 +104,11 @@ func Decode(ctx context.Context, contentType string, in []byte, out interface{}) if fn, ok := decoder[contentType]; ok { return fn(ctx, in, out) } + + if fn, ok := ssDecoder[structuredSuffix(contentType)]; ok { + return fn(ctx, in, out) + } + return fmt.Errorf("[decode] unsupported content type: %q", contentType) } @@ -74,5 +119,19 @@ func Encode(ctx context.Context, contentType string, in interface{}) ([]byte, er if fn, ok := encoder[contentType]; ok { return fn(ctx, in) } + + if fn, ok := ssEncoder[structuredSuffix(contentType)]; ok { + return fn(ctx, in) + } + return nil, fmt.Errorf("[encode] unsupported content type: %q", contentType) } + +func structuredSuffix(contentType string) string { + parts := strings.Split(contentType, "+") + if len(parts) >= 2 { + return parts[len(parts)-1] + } + + return "" +} diff --git a/v2/event/datacodec/codec_test.go b/v2/event/datacodec/codec_test.go index 0fd96ef5d..bc6ec3558 100644 --- a/v2/event/datacodec/codec_test.go +++ b/v2/event/datacodec/codec_test.go @@ -11,9 +11,10 @@ import ( "strings" "testing" + "github.com/google/go-cmp/cmp" + "github.com/cloudevents/sdk-go/v2/event/datacodec" "github.com/cloudevents/sdk-go/v2/types" - "github.com/google/go-cmp/cmp" ) func strptr(s string) *string { return &s } @@ -25,11 +26,12 @@ type Example struct { func TestCodecDecode(t *testing.T) { testCases := map[string]struct { - contentType string - decoder datacodec.Decoder - in []byte - want interface{} - wantErr string + contentType string + decoder datacodec.Decoder + structuredSuffix string + in []byte + want interface{} + wantErr string }{ "empty": {}, "invalid content type": { @@ -50,12 +52,24 @@ func TestCodecDecode(t *testing.T) { "b": "banana", }, }, + "application/vnd.custom-type+json": { + contentType: "application/vnd.custom-type+json", + in: []byte(`{"a":"apple","b":"banana"}`), + want: &map[string]string{ + "a": "apple", + "b": "banana", + }, + }, "application/xml": { contentType: "application/xml", in: []byte(`7Hello, Structured Encoding v1.0!`), want: &Example{Sequence: 7, Message: "Hello, Structured Encoding v1.0!"}, }, - + "application/vnd.custom-type+xml": { + contentType: "application/vnd.custom-type+xml", + in: []byte(`7Hello, Structured Encoding v1.0!`), + want: &Example{Sequence: 7, Message: "Hello, Structured Encoding v1.0!"}, + }, "custom content type": { contentType: "unit/testing", in: []byte("Hello, Testing"), @@ -82,12 +96,44 @@ func TestCodecDecode(t *testing.T) { }, wantErr: "expecting unit test error", }, + "custom structured suffix": { + contentType: "unit/testing+custom", + structuredSuffix: "custom", + in: []byte("Hello, Testing"), + decoder: func(ctx context.Context, in []byte, out interface{}) error { + if s, k := out.(*map[string]string); k { + if (*s) == nil { + (*s) = make(map[string]string) + } + (*s)["upper"] = strings.ToUpper(string(in)) + (*s)["lower"] = strings.ToLower(string(in)) + } + return nil + }, + want: &map[string]string{ + "upper": "HELLO, TESTING", + "lower": "hello, testing", + }, + }, + "custom structured suffix error": { + contentType: "unit/testing+custom", + structuredSuffix: "custom", + in: []byte("Hello, Testing"), + decoder: func(ctx context.Context, in []byte, out interface{}) error { + return fmt.Errorf("expecting unit test error") + }, + wantErr: "expecting unit test error", + }, } for n, tc := range testCases { t.Run(n, func(t *testing.T) { if tc.decoder != nil { - datacodec.AddDecoder(tc.contentType, tc.decoder) + if tc.structuredSuffix == "" { + datacodec.AddDecoder(tc.contentType, tc.decoder) + } else { + datacodec.AddStructuredSuffixDecoder(tc.structuredSuffix, tc.decoder) + } } got, _ := types.Allocate(tc.want) @@ -111,11 +157,12 @@ func TestCodecDecode(t *testing.T) { func TestCodecEncode(t *testing.T) { testCases := map[string]struct { - contentType string - encoder datacodec.Encoder - in interface{} - want []byte - wantErr string + contentType string + structuredSuffix string + encoder datacodec.Encoder + in interface{} + want []byte + wantErr string }{ "empty": {}, "invalid content type": { @@ -138,11 +185,24 @@ func TestCodecEncode(t *testing.T) { }, want: []byte(`{"a":"apple","b":"banana"}`), }, + "application/vnd.custom-type+json": { + contentType: "application/vnd.custom-type+json", + in: map[string]string{ + "a": "apple", + "b": "banana", + }, + want: []byte(`{"a":"apple","b":"banana"}`), + }, "application/xml": { contentType: "application/xml", in: &Example{Sequence: 7, Message: "Hello, Structured Encoding v1.0!"}, want: []byte(`7Hello, Structured Encoding v1.0!`), }, + "application/vnd.custom-type+xml": { + contentType: "application/vnd.custom-type+xml", + in: &Example{Sequence: 7, Message: "Hello, Structured Encoding v1.0!"}, + want: []byte(`7Hello, Structured Encoding v1.0!`), + }, "custom content type": { contentType: "unit/testing", @@ -173,12 +233,47 @@ func TestCodecEncode(t *testing.T) { }, wantErr: "expecting unit test error", }, + "custom structured suffix": { + contentType: "unit/testing+custom", + structuredSuffix: "custom", + in: []string{ + "Hello,", + "Testing", + }, + encoder: func(ctx context.Context, in interface{}) ([]byte, error) { + if s, ok := in.([]string); ok { + sb := strings.Builder{} + for _, v := range s { + if sb.Len() > 0 { + sb.WriteString(" ") + } + sb.WriteString(v) + } + return []byte(sb.String()), nil + } + return nil, fmt.Errorf("don't get here") + }, + want: []byte("Hello, Testing"), + }, + "custom structured suffix error": { + contentType: "unit/testing+custom", + structuredSuffix: "custom", + in: []byte("Hello, Testing"), + encoder: func(ctx context.Context, in interface{}) ([]byte, error) { + return nil, fmt.Errorf("expecting unit test error") + }, + wantErr: "expecting unit test error", + }, } for n, tc := range testCases { t.Run(n, func(t *testing.T) { if tc.encoder != nil { - datacodec.AddEncoder(tc.contentType, tc.encoder) + if tc.structuredSuffix == "" { + datacodec.AddEncoder(tc.contentType, tc.encoder) + } else { + datacodec.AddStructuredSuffixEncoder(tc.structuredSuffix, tc.encoder) + } } got, err := datacodec.Encode(context.TODO(), tc.contentType, tc.in) diff --git a/v2/protocol/http/options.go b/v2/protocol/http/options.go index 6582af3ea..91a45ce36 100644 --- a/v2/protocol/http/options.go +++ b/v2/protocol/http/options.go @@ -72,6 +72,26 @@ func WithHeader(key, value string) Option { } } +// WithHost sets the outbound host header for all cloud events when using an HTTP request +func WithHost(value string) Option { + return func(p *Protocol) error { + if p == nil { + return fmt.Errorf("http host option can not set nil protocol") + } + value = strings.TrimSpace(value) + if value != "" { + if p.RequestTemplate == nil { + p.RequestTemplate = &nethttp.Request{ + Method: nethttp.MethodPost, + } + } + p.RequestTemplate.Host = value + return nil + } + return fmt.Errorf("http host option was empty string") + } +} + // WithShutdownTimeout sets the shutdown timeout when the http server is being shutdown. func WithShutdownTimeout(timeout time.Duration) Option { return func(p *Protocol) error { @@ -83,6 +103,38 @@ func WithShutdownTimeout(timeout time.Duration) Option { } } +// WithReadTimeout overwrites the default read timeout (600s) of the http +// server. The specified timeout must not be negative. A timeout of 0 disables +// read timeouts in the http server. +func WithReadTimeout(timeout time.Duration) Option { + return func(p *Protocol) error { + if p == nil { + return fmt.Errorf("http read timeout option can not set nil protocol") + } + if timeout < 0 { + return fmt.Errorf("http read timeout must not be negative") + } + p.readTimeout = &timeout + return nil + } +} + +// WithWriteTimeout overwrites the default write timeout (600s) of the http +// server. The specified timeout must not be negative. A timeout of 0 disables +// write timeouts in the http server. +func WithWriteTimeout(timeout time.Duration) Option { + return func(p *Protocol) error { + if p == nil { + return fmt.Errorf("http write timeout option can not set nil protocol") + } + if timeout < 0 { + return fmt.Errorf("http write timeout must not be negative") + } + p.writeTimeout = &timeout + return nil + } +} + func checkListen(p *Protocol, prefix string) error { switch { case p.listener.Load() != nil: diff --git a/v2/protocol/http/options_test.go b/v2/protocol/http/options_test.go index fd0af7fcf..58d12d91a 100644 --- a/v2/protocol/http/options_test.go +++ b/v2/protocol/http/options_test.go @@ -271,6 +271,77 @@ func TestWithHeader(t *testing.T) { } } +func TestWithHost(t *testing.T) { + testCases := map[string]struct { + t *Protocol + value string + want *Protocol + wantErr string + }{ + "valid host": { + t: &Protocol{ + RequestTemplate: &http.Request{}, + }, + value: "test", + want: &Protocol{ + RequestTemplate: &http.Request{ + Host: "test", + }, + }, + }, + "valid host, unset req": { + t: &Protocol{}, + value: "test", + want: &Protocol{ + RequestTemplate: &http.Request{ + Method: http.MethodPost, + Host: "test", + }, + }, + }, + "empty host value": { + t: &Protocol{ + RequestTemplate: &http.Request{}, + }, + wantErr: `http host option was empty string`, + }, + "whitespace key": { + t: &Protocol{ + RequestTemplate: &http.Request{}, + }, + value: " \t\n", + wantErr: `http host option was empty string`, + }, + "nil protocol": { + wantErr: `http host option can not set nil protocol`, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + err := tc.t.applyOptions(WithHost(tc.value)) + + if tc.wantErr != "" || err != nil { + var gotErr string + if err != nil { + gotErr = err.Error() + } + if diff := cmp.Diff(tc.wantErr, gotErr); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + got := tc.t + + if diff := cmp.Diff(tc.want, got, + cmpopts.IgnoreUnexported(Protocol{}), cmpopts.IgnoreUnexported(http.Request{})); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + } + }) + } +} + func TestWithShutdownTimeout(t *testing.T) { testCases := map[string]struct { t *Protocol @@ -315,6 +386,106 @@ func TestWithShutdownTimeout(t *testing.T) { } } +func TestWithReadTimeout(t *testing.T) { + expected := time.Minute * 4 + testCases := map[string]struct { + t *Protocol + timeout time.Duration + want *Protocol + wantErr string + }{ + "valid timeout": { + t: &Protocol{}, + timeout: time.Minute * 4, + want: &Protocol{ + readTimeout: &expected, + }, + }, + "negative timeout": { + t: &Protocol{}, + timeout: -1, + wantErr: "http read timeout must not be negative", + }, + "nil protocol": { + wantErr: "http read timeout option can not set nil protocol", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + err := tc.t.applyOptions(WithReadTimeout(tc.timeout)) + + if tc.wantErr != "" || err != nil { + var gotErr string + if err != nil { + gotErr = err.Error() + } + if diff := cmp.Diff(tc.wantErr, gotErr); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + got := tc.t + + if diff := cmp.Diff(tc.want, got, + cmpopts.IgnoreUnexported(Protocol{})); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + } + }) + } +} + +func TestWithWriteTimeout(t *testing.T) { + expected := time.Minute * 4 + + testCases := map[string]struct { + t *Protocol + timeout time.Duration + want *Protocol + wantErr string + }{ + "valid timeout": { + t: &Protocol{}, + timeout: time.Minute * 4, + want: &Protocol{ + writeTimeout: &expected, + }, + }, + "negative timeout": { + t: &Protocol{}, + timeout: -1, + wantErr: "http write timeout must not be negative", + }, + "nil protocol": { + wantErr: "http write timeout option can not set nil protocol", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + err := tc.t.applyOptions(WithWriteTimeout(tc.timeout)) + + if tc.wantErr != "" || err != nil { + var gotErr string + if err != nil { + gotErr = err.Error() + } + if diff := cmp.Diff(tc.wantErr, gotErr); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + got := tc.t + + if diff := cmp.Diff(tc.want, got, + cmpopts.IgnoreUnexported(Protocol{})); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + } + }) + } +} func TestWithPort(t *testing.T) { testCases := map[string]struct { t *Protocol @@ -389,9 +560,19 @@ func forceClose(tr *Protocol) { } func TestWithPort0(t *testing.T) { + noReadWriteTimeout := time.Duration(0) + testCases := map[string]func() (*Protocol, error){ - "WithPort0": func() (*Protocol, error) { return New(WithPort(0)) }, - "SetPort0": func() (*Protocol, error) { return &Protocol{Port: 0}, nil }, + "WithPort0": func() (*Protocol, error) { + return New(WithPort(0)) + }, + "SetPort0": func() (*Protocol, error) { + return &Protocol{ + Port: 0, + readTimeout: &noReadWriteTimeout, + writeTimeout: &noReadWriteTimeout, + }, nil + }, } for name, f := range testCases { t.Run(name, func(t *testing.T) { diff --git a/v2/protocol/http/protocol.go b/v2/protocol/http/protocol.go index 7ee3b8fe1..18bd604a6 100644 --- a/v2/protocol/http/protocol.go +++ b/v2/protocol/http/protocol.go @@ -70,6 +70,18 @@ type Protocol struct { // If 0, DefaultShutdownTimeout is used. ShutdownTimeout time.Duration + // readTimeout defines the http.Server ReadTimeout It is the maximum duration + // for reading the entire request, including the body. If not overwritten by an + // option, the default value (600s) is used + readTimeout *time.Duration + + // writeTimeout defines the http.Server WriteTimeout It is the maximum duration + // before timing out writes of the response. It is reset whenever a new + // request's header is read. Like ReadTimeout, it does not let Handlers make + // decisions on a per-request basis. If not overwritten by an option, the + // default value (600s) is used + writeTimeout *time.Duration + // Port is the port configured to bind the receiver to. Defaults to 8080. // If you want to know the effective port you're listening to, use GetListeningPort() Port int @@ -116,6 +128,17 @@ func New(opts ...Option) (*Protocol, error) { p.ShutdownTimeout = DefaultShutdownTimeout } + // use default timeout from abuse protection value + defaultTimeout := DefaultTimeout + + if p.readTimeout == nil { + p.readTimeout = &defaultTimeout + } + + if p.writeTimeout == nil { + p.writeTimeout = &defaultTimeout + } + if p.isRetriableFunc == nil { p.isRetriableFunc = defaultIsRetriableFunc } diff --git a/v2/protocol/http/protocol_lifecycle.go b/v2/protocol/http/protocol_lifecycle.go index 04ef96915..7551c31c5 100644 --- a/v2/protocol/http/protocol_lifecycle.go +++ b/v2/protocol/http/protocol_lifecycle.go @@ -40,8 +40,8 @@ func (p *Protocol) OpenInbound(ctx context.Context) error { p.server = &http.Server{ Addr: listener.Addr().String(), Handler: attachMiddleware(p.Handler, p.middleware), - ReadTimeout: DefaultTimeout, - WriteTimeout: DefaultTimeout, + ReadTimeout: *p.readTimeout, + WriteTimeout: *p.writeTimeout, } // Shutdown diff --git a/v2/protocol/http/protocol_test.go b/v2/protocol/http/protocol_test.go index 818ef60c2..4014989e6 100644 --- a/v2/protocol/http/protocol_test.go +++ b/v2/protocol/http/protocol_test.go @@ -26,6 +26,7 @@ import ( func TestNew(t *testing.T) { dst := DefaultShutdownTimeout + ot := DefaultTimeout testCases := map[string]struct { opts []Option @@ -36,6 +37,8 @@ func TestNew(t *testing.T) { want: &Protocol{ Client: http.DefaultClient, ShutdownTimeout: dst, + readTimeout: &ot, + writeTimeout: &ot, Port: -1, }, },