Skip to content

Commit

Permalink
feat: use sonic event deserialization
Browse files Browse the repository at this point in the history
  • Loading branch information
akurilov committed Dec 20, 2024
1 parent 0689b8c commit 7e88060
Show file tree
Hide file tree
Showing 17 changed files with 345 additions and 89 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ proto:
api/grpc/permits/*.proto \
api/grpc/source/*/*.proto \
api/grpc/events/*.proto \
api/grpc/auth/*.proto \
api/grpc/ce/*.proto
api/grpc/auth/*.proto

vet: proto
go vet
Expand Down
13 changes: 2 additions & 11 deletions api/grpc/ce/cloudevent.proto
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
syntax = "proto3";

package awakari.ce;
package ce;

option go_package = "github.com/awakari/pub/api/grpc/ce";
option go_package = "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb";

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
Expand Down Expand Up @@ -59,12 +59,3 @@ message CloudEventAttributeValue {
google.protobuf.Timestamp ce_timestamp = 7;
}
}

/**
* CloudEvent Protobuf Batch Format
*
*/

message CloudEventBatch {
repeated CloudEvent events = 1;
}
4 changes: 2 additions & 2 deletions api/grpc/events/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package events

import (
"context"
"github.com/awakari/pub/api/grpc/ce"
"github.com/awakari/pub/model"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"io"
)

Expand All @@ -27,7 +27,7 @@ func (mw publisher) Close() (err error) {
return
}

func (mw publisher) Write(ctx context.Context, msgs []*ce.CloudEvent) (ackCount uint32, err error) {
func (mw publisher) Write(ctx context.Context, msgs []*pb.CloudEvent) (ackCount uint32, err error) {
req := PublishRequest{
Topic: mw.queue,
Evts: msgs,
Expand Down
4 changes: 2 additions & 2 deletions api/grpc/events/publisher_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package events

import (
"context"
"github.com/awakari/pub/api/grpc/ce"
"github.com/awakari/pub/model"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
)

type publisherMock struct{}
Expand All @@ -16,7 +16,7 @@ func (mw publisherMock) Close() error {
return nil
}

func (mw publisherMock) Write(ctx context.Context, msgs []*ce.CloudEvent) (ackCount uint32, err error) {
func (mw publisherMock) Write(ctx context.Context, msgs []*pb.CloudEvent) (ackCount uint32, err error) {
for _, msg := range msgs {
switch msg.Id {
case "queue_fail":
Expand Down
14 changes: 7 additions & 7 deletions api/grpc/events/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package events

import (
"context"
"github.com/awakari/pub/api/grpc/ce"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"github.com/stretchr/testify/assert"
"io"
"testing"
Expand All @@ -17,13 +17,13 @@ func TestMessagesWriter_Close(t *testing.T) {
func TestMessagesWriter_Write(t *testing.T) {
cases := map[string]struct {
queue string
msgs []*ce.CloudEvent
msgs []*pb.CloudEvent
ackCount uint32
err error
}{
"1 => ack 1": {
queue: "queue0",
msgs: []*ce.CloudEvent{
msgs: []*pb.CloudEvent{
{
Id: "msg0",
},
Expand All @@ -32,7 +32,7 @@ func TestMessagesWriter_Write(t *testing.T) {
},
"3 => ack 2": {
queue: "queue0",
msgs: []*ce.CloudEvent{
msgs: []*pb.CloudEvent{
{
Id: "msg0",
},
Expand All @@ -47,7 +47,7 @@ func TestMessagesWriter_Write(t *testing.T) {
},
"send eof": {
queue: "send_eof",
msgs: []*ce.CloudEvent{
msgs: []*pb.CloudEvent{
{
Id: "msg0",
},
Expand All @@ -56,7 +56,7 @@ func TestMessagesWriter_Write(t *testing.T) {
},
"recv fail": {
queue: "recv_fail",
msgs: []*ce.CloudEvent{
msgs: []*pb.CloudEvent{
{
Id: "msg0",
},
Expand All @@ -65,7 +65,7 @@ func TestMessagesWriter_Write(t *testing.T) {
},
"recv eof": {
queue: "recv_eof",
msgs: []*ce.CloudEvent{
msgs: []*pb.CloudEvent{
{
Id: "msg0",
},
Expand Down
28 changes: 14 additions & 14 deletions api/grpc/publisher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"errors"
"fmt"
"github.com/awakari/pub/api/grpc/auth"
"github.com/awakari/pub/api/grpc/ce"
"github.com/awakari/pub/api/grpc/events"
"github.com/awakari/pub/api/grpc/limits"
"github.com/awakari/pub/api/grpc/permits"
"github.com/awakari/pub/config"
"github.com/awakari/pub/model"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"github.com/segmentio/ksuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -61,20 +61,20 @@ func (s svc) SubmitMessages(streamSrv Service_SubmitMessagesServer) (err error)
if err == nil {
for _, evt := range req.Msgs {
if evt.Attributes == nil {
evt.Attributes = make(map[string]*ce.CloudEventAttributeValue)
evt.Attributes = make(map[string]*pb.CloudEventAttributeValue)
}
evt.Attributes[model.KeyCeGroupId] = &ce.CloudEventAttributeValue{
Attr: &ce.CloudEventAttributeValue_CeString{
evt.Attributes[model.KeyCeGroupId] = &pb.CloudEventAttributeValue{
Attr: &pb.CloudEventAttributeValue_CeString{
CeString: groupId,
},
}
evt.Attributes[model.KeyCeUserId] = &ce.CloudEventAttributeValue{
Attr: &ce.CloudEventAttributeValue_CeString{
evt.Attributes[model.KeyCeUserId] = &pb.CloudEventAttributeValue{
Attr: &pb.CloudEventAttributeValue_CeString{
CeString: userId,
},
}
evt.Attributes[model.KeyCePubTime] = &ce.CloudEventAttributeValue{
Attr: &ce.CloudEventAttributeValue_CeTimestamp{
evt.Attributes[model.KeyCePubTime] = &pb.CloudEventAttributeValue{
Attr: &pb.CloudEventAttributeValue_CeTimestamp{
CeTimestamp: timestamppb.New(time.Now().UTC()),
},
}
Expand Down Expand Up @@ -174,20 +174,20 @@ func (s svc) notifyLimitReached(
if userId == "" || userId == src {
return // no limit owner, don't send anything
}
evt := ce.CloudEvent{
Attributes: map[string]*ce.CloudEventAttributeValue{
evt := pb.CloudEvent{
Attributes: map[string]*pb.CloudEventAttributeValue{
model.KeyToGroupId: {
Attr: &ce.CloudEventAttributeValue_CeString{
Attr: &pb.CloudEventAttributeValue_CeString{
CeString: groupId,
},
},
model.KeyToUserId: {
Attr: &ce.CloudEventAttributeValue_CeString{
Attr: &pb.CloudEventAttributeValue_CeString{
CeString: userId,
},
},
},
Data: &ce.CloudEvent_TextData{
Data: &pb.CloudEvent_TextData{
TextData: txtLimitReached,
},
Id: ksuid.New().String(),
Expand All @@ -196,7 +196,7 @@ func (s svc) notifyLimitReached(
Type: model.ValTypeLimitReached,
}
req := SubmitMessagesRequest{
Msgs: []*ce.CloudEvent{
Msgs: []*pb.CloudEvent{
&evt,
},
}
Expand Down
174 changes: 174 additions & 0 deletions api/http/pub/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package pub

import (
"encoding/base64"
"fmt"
"github.com/bytedance/sonic"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"google.golang.org/protobuf/types/known/timestamppb"
"time"
)

type event struct {
Id string `json:"id"`
SpecVersion1 *string `json:"specVersion,omitempty"`
SpecVersion2 *string `json:"spec_version,omitempty"`
Source string `json:"source"`
Type string `json:"type"`
Attributes map[string]attribute `json:"attributes"`
TextData1 *string `json:"textData,omitempty"`
TextData2 *string `json:"text_data,omitempty"`
}

type attribute struct {
CeBoolean1 *bool `json:"ceBoolean,omitempty"`
CeBoolean2 *bool `json:"ce_boolean,omitempty"`
CeBytes1 *string `json:"ceBytes,omitempty"`
CeBytes2 *string `json:"ce_bytes,omitempty"`
CeInteger1 *int32 `json:"ceInteger,omitempty"`
CeInteger2 *int32 `json:"ce_integer,omitempty"`
CeString1 *string `json:"ceString,omitempty"`
CeString2 *string `json:"ce_string,omitempty"`
CeTimestamp1 *time.Time `json:"ceTimestamp,omitempty"`
CeTimestamp2 *time.Time `json:"ce_timestamp,omitempty"`
CeUri1 *string `json:"ceUri,omitempty"`
CeUri2 *string `json:"ce_uri,omitempty"`
CeUriRef1 *string `json:"ceUriRef,omitempty"`
CeUriRef2 *string `json:"ce_uri_ref,omitempty"`
}

type eventBatch struct {
Events []event `json:"events"`
}

func Unmarshal(src []byte, dst *pb.CloudEvent) (err error) {
var raw event
err = sonic.Unmarshal(src, &raw)
if err == nil {
err = convert(raw, dst)
}
return
}

func UnmarshalBatch(src []byte) (dstBatch []*pb.CloudEvent, err error) {
var rawBatch eventBatch
err = sonic.Unmarshal(src, &rawBatch)
if err == nil {
for _, raw := range rawBatch.Events {
var dst pb.CloudEvent
err = convert(raw, &dst)
if err != nil {
break
}
dstBatch = append(dstBatch, &dst)
}
}
return
}

func convert(raw event, dst *pb.CloudEvent) (err error) {

dst.Id = raw.Id
dst.Source = raw.Source
dst.Type = raw.Type

if raw.SpecVersion1 != nil {
dst.SpecVersion = *raw.SpecVersion1
}
if raw.SpecVersion2 != nil {
dst.SpecVersion = *raw.SpecVersion2
}

if raw.TextData1 != nil {
dst.Data = &pb.CloudEvent_TextData{
TextData: *raw.TextData1,
}
}
if raw.TextData2 != nil {
dst.Data = &pb.CloudEvent_TextData{
TextData: *raw.TextData2,
}
}

dst.Attributes = make(map[string]*pb.CloudEventAttributeValue)
for name, srcAttr := range raw.Attributes {

var dstAttr pb.CloudEventAttributeValue

switch {
case srcAttr.CeBoolean1 != nil:
dstAttr.Attr = &pb.CloudEventAttributeValue_CeBoolean{
CeBoolean: *srcAttr.CeBoolean1,
}
case srcAttr.CeBoolean2 != nil:
dstAttr.Attr = &pb.CloudEventAttributeValue_CeBoolean{
CeBoolean: *srcAttr.CeBoolean2,
}
case srcAttr.CeBytes1 != nil:
var bytes []byte
bytes, err = base64.StdEncoding.DecodeString(*srcAttr.CeBytes1)
if err == nil {
dstAttr.Attr = &pb.CloudEventAttributeValue_CeBytes{
CeBytes: bytes,
}
}
case srcAttr.CeBytes2 != nil:
var bytes []byte
bytes, err = base64.StdEncoding.DecodeString(*srcAttr.CeBytes2)
if err == nil {
dstAttr.Attr = &pb.CloudEventAttributeValue_CeBytes{
CeBytes: bytes,
}
}
case srcAttr.CeInteger1 != nil:
dstAttr.Attr = &pb.CloudEventAttributeValue_CeInteger{
CeInteger: *srcAttr.CeInteger1,
}
case srcAttr.CeInteger2 != nil:
dstAttr.Attr = &pb.CloudEventAttributeValue_CeInteger{
CeInteger: *srcAttr.CeInteger2,
}
case srcAttr.CeString1 != nil:
dstAttr.Attr = &pb.CloudEventAttributeValue_CeString{
CeString: *srcAttr.CeString1,
}
case srcAttr.CeString2 != nil:
dstAttr.Attr = &pb.CloudEventAttributeValue_CeString{
CeString: *srcAttr.CeString2,
}
case srcAttr.CeTimestamp1 != nil:
dstAttr.Attr = &pb.CloudEventAttributeValue_CeTimestamp{
CeTimestamp: timestamppb.New(*srcAttr.CeTimestamp1),
}
case srcAttr.CeTimestamp2 != nil:
dstAttr.Attr = &pb.CloudEventAttributeValue_CeTimestamp{
CeTimestamp: timestamppb.New(*srcAttr.CeTimestamp2),
}
case srcAttr.CeUri1 != nil:
dstAttr.Attr = &pb.CloudEventAttributeValue_CeUri{
CeUri: *srcAttr.CeUri1,
}
case srcAttr.CeUri2 != nil:
dstAttr.Attr = &pb.CloudEventAttributeValue_CeUri{
CeUri: *srcAttr.CeUri2,
}
case srcAttr.CeUriRef1 != nil:
dstAttr.Attr = &pb.CloudEventAttributeValue_CeUriRef{
CeUriRef: *srcAttr.CeUriRef1,
}
case srcAttr.CeUriRef2 != nil:
dstAttr.Attr = &pb.CloudEventAttributeValue_CeUriRef{
CeUriRef: *srcAttr.CeUriRef2,
}
default:
err = fmt.Errorf("failed to convert event %s, unknown attribute type: %+v", raw.Id, srcAttr)
}

if err != nil {
break
}
dst.Attributes[name] = &dstAttr
}

return
}
Loading

0 comments on commit 7e88060

Please sign in to comment.