diff --git a/v2/binding/to_event.go b/v2/binding/to_event.go index 591e3d1cb..978df22c3 100644 --- a/v2/binding/to_event.go +++ b/v2/binding/to_event.go @@ -41,7 +41,7 @@ func ToEvent(ctx context.Context, message MessageReader, transformers ...Transfo switch mt := m.(type) { case *EventMessage: e := (*event.Event)(mt) - return e, Transformers(transformers).Transform(mt, (*messageToEventBuilder)(e)) + return e, Transformers(transformers).Transform(mt, (*MessageToEventBuilder)(e)) case MessageWrapper: m = mt.GetWrappedMessage() default: @@ -52,7 +52,7 @@ func ToEvent(ctx context.Context, message MessageReader, transformers ...Transfo } e := event.New() - encoder := (*messageToEventBuilder)(&e) + encoder := (*MessageToEventBuilder)(&e) _, err := DirectWrite( context.Background(), message, @@ -63,11 +63,7 @@ func ToEvent(ctx context.Context, message MessageReader, transformers ...Transfo return nil, err } - if mt, ok := message.(MessageMetadataReader); ok { - return &e, Transformers(transformers).Transform(mt, encoder) - } else { - return &e, Transformers(transformers).Transform((*EventMessage)(&e), encoder) - } + return &e, Transformers(transformers).Transform((*EventMessage)(&e), encoder) } // ToEvents translates a Batch Message and corresponding Reader data to a slice of Events. @@ -85,14 +81,14 @@ func ToEvents(ctx context.Context, message MessageReader, body io.Reader) ([]eve return events, json.NewDecoder(body).Decode(&events) } -type messageToEventBuilder event.Event +type MessageToEventBuilder event.Event var ( - _ StructuredWriter = (*messageToEventBuilder)(nil) - _ BinaryWriter = (*messageToEventBuilder)(nil) + _ StructuredWriter = (*MessageToEventBuilder)(nil) + _ BinaryWriter = (*MessageToEventBuilder)(nil) ) -func (b *messageToEventBuilder) SetStructuredEvent(ctx context.Context, format format.Format, ev io.Reader) error { +func (b *MessageToEventBuilder) SetStructuredEvent(ctx context.Context, format format.Format, ev io.Reader) error { var buf bytes.Buffer _, err := io.Copy(&buf, ev) if err != nil { @@ -101,15 +97,15 @@ func (b *messageToEventBuilder) SetStructuredEvent(ctx context.Context, format f return format.Unmarshal(buf.Bytes(), (*event.Event)(b)) } -func (b *messageToEventBuilder) Start(ctx context.Context) error { +func (b *MessageToEventBuilder) Start(ctx context.Context) error { return nil } -func (b *messageToEventBuilder) End(ctx context.Context) error { +func (b *MessageToEventBuilder) End(ctx context.Context) error { return nil } -func (b *messageToEventBuilder) SetData(data io.Reader) error { +func (b *MessageToEventBuilder) SetData(data io.Reader) error { buf, ok := data.(*bytes.Buffer) if !ok { buf = new(bytes.Buffer) @@ -124,7 +120,7 @@ func (b *messageToEventBuilder) SetData(data io.Reader) error { return nil } -func (b *messageToEventBuilder) SetAttribute(attribute spec.Attribute, value interface{}) error { +func (b *MessageToEventBuilder) SetAttribute(attribute spec.Attribute, value interface{}) error { if value == nil { _ = attribute.Delete(b.Context) return nil @@ -148,7 +144,7 @@ func (b *messageToEventBuilder) SetAttribute(attribute spec.Attribute, value int return attribute.Set(b.Context, value) } -func (b *messageToEventBuilder) SetExtension(name string, value interface{}) error { +func (b *MessageToEventBuilder) SetExtension(name string, value interface{}) error { if value == nil { return b.Context.SetExtension(name, nil) } diff --git a/v2/client/invoker.go b/v2/client/invoker.go index 2ebb826b8..5947f26bc 100644 --- a/v2/client/invoker.go +++ b/v2/client/invoker.go @@ -65,7 +65,7 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p var respMsg binding.Message var result protocol.Result - e, eventErr := binding.ToEvent(ctx, m, r.receiverTransformers...) + e, eventErr := binding.ToEvent(ctx, m) switch { case eventErr != nil && r.fn.hasEventIn: r.observabilityService.RecordReceivedMalformedEvent(ctx, eventErr) @@ -79,6 +79,15 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p } } + mt, ok := m.(binding.MessageMetadataReader) + if ok { + encoder := (*binding.MessageToEventBuilder)(e) + err := binding.Transformers(r.receiverTransformers).Transform(mt, encoder) + if err != nil { + cecontext.LoggerFrom(ctx).Errorf("apply receiver transformers error on response event: %v", err) + } + } + // Let's invoke the receiver fn var resp *event.Event resp, result = func() (resp *event.Event, result protocol.Result) {