diff --git a/protocol/mqtt_paho/v2/message.go b/protocol/mqtt_paho/v2/message.go index 8dd938545..2bd7ed35d 100644 --- a/protocol/mqtt_paho/v2/message.go +++ b/protocol/mqtt_paho/v2/message.go @@ -17,8 +17,7 @@ import ( ) const ( - prefix = "ce-" - contentType = "Content-Type" + prefix = "ce-" ) var specs = spec.WithPrefix(prefix) @@ -41,8 +40,7 @@ func NewMessage(msg *paho.Publish) *Message { var f format.Format var v spec.Version if msg.Properties != nil { - // Use properties.User["Content-type"] to determine if message is structured - if s := msg.Properties.User.Get(contentType); format.IsFormat(s) { + if s := msg.Properties.ContentType; format.IsFormat(s) { f = format.Lookup(s) } else if s := msg.Properties.User.Get(specs.PrefixedSpecVersionName()); s != "" { v = specs.Version(s) @@ -88,14 +86,20 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) } else { err = encoder.SetExtension(strings.TrimPrefix(userProperty.Key, prefix), userProperty.Value) } - } else if userProperty.Key == contentType { - err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(userProperty.Value)) } if err != nil { return } } + contentType := m.internal.Properties.ContentType + if contentType != "" { + err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), contentType) + if err != nil { + return err + } + } + if m.internal.Payload != nil { return encoder.SetData(bytes.NewBuffer(m.internal.Payload)) } diff --git a/protocol/mqtt_paho/v2/message_test.go b/protocol/mqtt_paho/v2/message_test.go index 757f81f5c..ff51a0a0c 100644 --- a/protocol/mqtt_paho/v2/message_test.go +++ b/protocol/mqtt_paho/v2/message_test.go @@ -32,7 +32,7 @@ func TestReadStructured(t *testing.T) { msg: &paho.Publish{ Payload: []byte(""), Properties: &paho.PublishProperties{ - User: []paho.UserProperty{{Key: contentType, Value: event.ApplicationCloudEventsJSON}}, + ContentType: event.ApplicationCloudEventsJSON, }, }, }, diff --git a/protocol/mqtt_paho/v2/write_message.go b/protocol/mqtt_paho/v2/write_message.go index a4b87f4aa..9db47e918 100644 --- a/protocol/mqtt_paho/v2/write_message.go +++ b/protocol/mqtt_paho/v2/write_message.go @@ -42,11 +42,9 @@ var ( func (b *pubMessageWriter) SetStructuredEvent(ctx context.Context, f format.Format, event io.Reader) error { if b.Properties == nil { - b.Properties = &paho.PublishProperties{ - User: make([]paho.UserProperty, 0), - } + b.Properties = &paho.PublishProperties{} } - b.Properties.User.Add(contentType, f.MediaType()) + b.Properties.ContentType = f.MediaType() var buf bytes.Buffer _, err := io.Copy(&buf, event) if err != nil { @@ -85,15 +83,13 @@ func (b *pubMessageWriter) SetData(reader io.Reader) error { func (b *pubMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error { if attribute.Kind() == spec.DataContentType { if value == nil { - b.removeProperty(contentType) + b.Properties.ContentType = "" } s, err := types.Format(value) if err != nil { return err } - if err := b.addProperty(contentType, s); err != nil { - return err - } + b.Properties.ContentType = s } else { if value == nil { b.removeProperty(prefix + attribute.Name())