Skip to content

Commit

Permalink
move the receiver to invoker
Browse files Browse the repository at this point in the history
Signed-off-by: myan <[email protected]>
  • Loading branch information
yanmxa committed May 13, 2024
1 parent 50b0a35 commit 882dddc
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 17 deletions.
28 changes: 12 additions & 16 deletions v2/binding/to_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func ToEvent(ctx context.Context, message MessageReader, transformers ...Transfo
switch mt := m.(type) {
case *EventMessage:
e := (*event.Event)(mt)
return e, Transformers(transformers).Transform(mt, (*messageToEventBuilder)(e))
return e, Transformers(transformers).Transform(mt, (*MessageToEventBuilder)(e))
case MessageWrapper:
m = mt.GetWrappedMessage()
default:
Expand All @@ -52,7 +52,7 @@ func ToEvent(ctx context.Context, message MessageReader, transformers ...Transfo
}

e := event.New()
encoder := (*messageToEventBuilder)(&e)
encoder := (*MessageToEventBuilder)(&e)
_, err := DirectWrite(
context.Background(),
message,
Expand All @@ -63,11 +63,7 @@ func ToEvent(ctx context.Context, message MessageReader, transformers ...Transfo
return nil, err
}

if mt, ok := message.(MessageMetadataReader); ok {
return &e, Transformers(transformers).Transform(mt, encoder)
} else {
return &e, Transformers(transformers).Transform((*EventMessage)(&e), encoder)
}
return &e, Transformers(transformers).Transform((*EventMessage)(&e), encoder)
}

// ToEvents translates a Batch Message and corresponding Reader data to a slice of Events.
Expand All @@ -85,14 +81,14 @@ func ToEvents(ctx context.Context, message MessageReader, body io.Reader) ([]eve
return events, json.NewDecoder(body).Decode(&events)
}

type messageToEventBuilder event.Event
type MessageToEventBuilder event.Event

var (
_ StructuredWriter = (*messageToEventBuilder)(nil)
_ BinaryWriter = (*messageToEventBuilder)(nil)
_ StructuredWriter = (*MessageToEventBuilder)(nil)
_ BinaryWriter = (*MessageToEventBuilder)(nil)
)

func (b *messageToEventBuilder) SetStructuredEvent(ctx context.Context, format format.Format, ev io.Reader) error {
func (b *MessageToEventBuilder) SetStructuredEvent(ctx context.Context, format format.Format, ev io.Reader) error {
var buf bytes.Buffer
_, err := io.Copy(&buf, ev)
if err != nil {
Expand All @@ -101,15 +97,15 @@ func (b *messageToEventBuilder) SetStructuredEvent(ctx context.Context, format f
return format.Unmarshal(buf.Bytes(), (*event.Event)(b))
}

func (b *messageToEventBuilder) Start(ctx context.Context) error {
func (b *MessageToEventBuilder) Start(ctx context.Context) error {
return nil
}

func (b *messageToEventBuilder) End(ctx context.Context) error {
func (b *MessageToEventBuilder) End(ctx context.Context) error {
return nil
}

func (b *messageToEventBuilder) SetData(data io.Reader) error {
func (b *MessageToEventBuilder) SetData(data io.Reader) error {
buf, ok := data.(*bytes.Buffer)
if !ok {
buf = new(bytes.Buffer)
Expand All @@ -124,7 +120,7 @@ func (b *messageToEventBuilder) SetData(data io.Reader) error {
return nil
}

func (b *messageToEventBuilder) SetAttribute(attribute spec.Attribute, value interface{}) error {
func (b *MessageToEventBuilder) SetAttribute(attribute spec.Attribute, value interface{}) error {
if value == nil {
_ = attribute.Delete(b.Context)
return nil
Expand All @@ -148,7 +144,7 @@ func (b *messageToEventBuilder) SetAttribute(attribute spec.Attribute, value int
return attribute.Set(b.Context, value)
}

func (b *messageToEventBuilder) SetExtension(name string, value interface{}) error {
func (b *MessageToEventBuilder) SetExtension(name string, value interface{}) error {
if value == nil {
return b.Context.SetExtension(name, nil)
}
Expand Down
11 changes: 10 additions & 1 deletion v2/client/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p
var respMsg binding.Message
var result protocol.Result

e, eventErr := binding.ToEvent(ctx, m, r.receiverTransformers...)
e, eventErr := binding.ToEvent(ctx, m)
switch {
case eventErr != nil && r.fn.hasEventIn:
r.observabilityService.RecordReceivedMalformedEvent(ctx, eventErr)
Expand All @@ -79,6 +79,15 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p
}
}

mt, ok := m.(binding.MessageMetadataReader)
if ok {
encoder := (*binding.MessageToEventBuilder)(e)
err := binding.Transformers(r.receiverTransformers).Transform(mt, encoder)
if err != nil {
cecontext.LoggerFrom(ctx).Errorf("apply receiver transformers error on response event: %v", err)
}
}

// Let's invoke the receiver fn
var resp *event.Event
resp, result = func() (resp *event.Event, result protocol.Result) {
Expand Down

0 comments on commit 882dddc

Please sign in to comment.