From eac88c4a6c840f3a1ac9f510a959017e3cd0e0e2 Mon Sep 17 00:00:00 2001 From: Andrei Kurilov <18027129+akurilov@users.noreply.github.com> Date: Sat, 28 Dec 2024 21:38:13 +0200 Subject: [PATCH] fix: use non-streaming events API --- api/grpc/events/client_mock.go | 12 +++- api/grpc/events/client_pool.go | 10 ++- api/grpc/events/logging.go | 13 ++-- api/grpc/events/publish_stream_mock.go | 83 ------------------------- api/grpc/events/publisher.go | 47 -------------- api/grpc/events/publisher_mock.go | 32 ---------- api/grpc/events/publisher_test.go | 84 -------------------------- api/grpc/events/service.go | 17 +++--- api/grpc/events/service.proto | 2 +- api/grpc/events/service_mock.go | 17 ++---- api/grpc/events/service_test.go | 44 ++++++++------ api/grpc/publisher/service.go | 83 ++++--------------------- api/grpc/publisher/service.proto | 4 -- api/http/pub/handler.go | 71 +++++++++------------- helm/pub/templates/deployment.yaml | 3 + helm/pub/values.yaml | 1 + main.go | 7 ++- scripts/cover.sh | 2 +- 18 files changed, 114 insertions(+), 418 deletions(-) delete mode 100644 api/grpc/events/publish_stream_mock.go delete mode 100644 api/grpc/events/publisher.go delete mode 100644 api/grpc/events/publisher_mock.go delete mode 100644 api/grpc/events/publisher_test.go diff --git a/api/grpc/events/client_mock.go b/api/grpc/events/client_mock.go index 2a08ea8..a6e4bd8 100644 --- a/api/grpc/events/client_mock.go +++ b/api/grpc/events/client_mock.go @@ -26,6 +26,14 @@ func (cm clientMock) SetStream(ctx context.Context, req *SetStreamRequest, opts return } -func (cm clientMock) Publish(ctx context.Context, opts ...grpc.CallOption) (Service_PublishClient, error) { - return newPublishStreamMock(), nil +func (cm clientMock) PublishBatch(ctx context.Context, req *PublishRequest, opts ...grpc.CallOption) (resp *PublishResponse, err error) { + switch req.Topic { + case "fail": + err = status.Error(codes.Internal, "internal failure") + default: + resp = &PublishResponse{ + AckCount: 42, + } + } + return } diff --git a/api/grpc/events/client_pool.go b/api/grpc/events/client_pool.go index dc40938..c19f0e3 100644 --- a/api/grpc/events/client_pool.go +++ b/api/grpc/events/client_pool.go @@ -30,18 +30,16 @@ func (cp clientPool) SetStream(ctx context.Context, req *SetStreamRequest, opts return } -func (cp clientPool) Publish(ctx context.Context, opts ...grpc.CallOption) (stream Service_PublishClient, err error) { +func (cp clientPool) PublishBatch(ctx context.Context, req *PublishRequest, opts ...grpc.CallOption) (resp *PublishResponse, err error) { var conn *grpcpool.ClientConn conn, err = cp.connPool.Get(ctx) - var c *grpc.ClientConn if err == nil { - c = conn.ClientConn - conn.Close() // return back to the conn pool immediately + defer conn.Close() } var client ServiceClient if err == nil { - client = NewServiceClient(c) - stream, err = client.Publish(ctx, opts...) + client = NewServiceClient(conn) + resp, err = client.PublishBatch(ctx, req, opts...) } return } diff --git a/api/grpc/events/logging.go b/api/grpc/events/logging.go index 9ff7ffc..70bd568 100644 --- a/api/grpc/events/logging.go +++ b/api/grpc/events/logging.go @@ -3,7 +3,8 @@ package events import ( "context" "fmt" - "github.com/awakari/pub/model" + "github.com/awakari/pub/util" + "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" "log/slog" ) @@ -25,11 +26,9 @@ func (lm loggingMiddleware) SetStream(ctx context.Context, topic string, limit u return } -func (lm loggingMiddleware) NewPublisher(ctx context.Context, topic string) (p model.MessagesWriter, err error) { - p, err = lm.svc.NewPublisher(ctx, topic) - lm.log.Debug(fmt.Sprintf("events.Publish(topic=%s): err=%s", topic, err)) - if err == nil { - p = model.NewMessagesWriterLogging(p, lm.log, topic) - } +func (lm loggingMiddleware) Publish(ctx context.Context, topic string, evts []*pb.CloudEvent) (ackCount uint32, err error) { + ackCount, err = lm.svc.Publish(ctx, topic, evts) + ll := util.LogLevel(err) + lm.log.Log(ctx, ll, fmt.Sprintf("events.Publish(%s, %d): ack=%d, err=%s", topic, len(evts), ackCount, err)) return } diff --git a/api/grpc/events/publish_stream_mock.go b/api/grpc/events/publish_stream_mock.go deleted file mode 100644 index c15946e..0000000 --- a/api/grpc/events/publish_stream_mock.go +++ /dev/null @@ -1,83 +0,0 @@ -package events - -import ( - "context" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" - "io" -) - -type publishStreamMock struct { - lastReq *PublishRequest - lastErr error -} - -func newPublishStreamMock() Service_PublishClient { - return &publishStreamMock{} -} - -func (sm *publishStreamMock) Send(req *PublishRequest) (err error) { - switch sm.lastErr { - case nil: - switch req.Topic { - case "recv_fail": - sm.lastErr = status.Error(codes.Internal, "send failure") - case "send_eof": - sm.lastErr = io.EOF - err = io.EOF - case "recv_eof": - sm.lastErr = io.EOF - case "missing": - sm.lastErr = status.Error(codes.NotFound, "queue missing") - default: - sm.lastReq = req - } - default: - err = io.EOF - } - return -} - -func (sm *publishStreamMock) Recv() (resp *PublishResponse, err error) { - resp = &PublishResponse{} - switch sm.lastErr { - case nil: - resp.AckCount = uint32(len(sm.lastReq.Evts)) - if resp.AckCount > 2 { - resp.AckCount = 2 - } - default: - err = sm.lastErr - } - return -} - -func (sm *publishStreamMock) Header() (metadata.MD, error) { - //TODO implement me - panic("implement me") -} - -func (sm *publishStreamMock) Trailer() metadata.MD { - //TODO implement me - panic("implement me") -} - -func (sm *publishStreamMock) CloseSend() error { - return nil -} - -func (sm *publishStreamMock) Context() context.Context { - //TODO implement me - panic("implement me") -} - -func (sm *publishStreamMock) SendMsg(m interface{}) error { - //TODO implement me - panic("implement me") -} - -func (sm *publishStreamMock) RecvMsg(m interface{}) error { - //TODO implement me - panic("implement me") -} diff --git a/api/grpc/events/publisher.go b/api/grpc/events/publisher.go deleted file mode 100644 index 7881f1d..0000000 --- a/api/grpc/events/publisher.go +++ /dev/null @@ -1,47 +0,0 @@ -package events - -import ( - "context" - "github.com/awakari/pub/model" - "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" - "io" -) - -type publisher struct { - stream Service_PublishClient - queue string -} - -func newPublisher(stream Service_PublishClient, queue string) model.MessagesWriter { - return publisher{ - stream: stream, - queue: queue, - } -} - -func (mw publisher) Close() (err error) { - err = mw.stream.CloseSend() - if err != nil { - err = decodeError(err) - } - return -} - -func (mw publisher) Write(ctx context.Context, msgs []*pb.CloudEvent) (ackCount uint32, err error) { - req := PublishRequest{ - Topic: mw.queue, - Evts: msgs, - } - err = mw.stream.Send(&req) - var resp *PublishResponse - if err == nil || err == io.EOF { - resp, err = mw.stream.Recv() - } - if err != nil { - err = decodeError(err) - } - if resp != nil { - ackCount = resp.AckCount - } - return -} diff --git a/api/grpc/events/publisher_mock.go b/api/grpc/events/publisher_mock.go deleted file mode 100644 index a9891f7..0000000 --- a/api/grpc/events/publisher_mock.go +++ /dev/null @@ -1,32 +0,0 @@ -package events - -import ( - "context" - "github.com/awakari/pub/model" - "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" -) - -type publisherMock struct{} - -func NewPublisherMock() model.MessagesWriter { - return publisherMock{} -} - -func (mw publisherMock) Close() error { - return nil -} - -func (mw publisherMock) Write(ctx context.Context, msgs []*pb.CloudEvent) (ackCount uint32, err error) { - for _, msg := range msgs { - switch msg.Id { - case "queue_fail": - err = ErrInternal - default: - ackCount++ - } - if err != nil { - break - } - } - return -} diff --git a/api/grpc/events/publisher_test.go b/api/grpc/events/publisher_test.go deleted file mode 100644 index 49db83d..0000000 --- a/api/grpc/events/publisher_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package events - -import ( - "context" - "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" - "github.com/stretchr/testify/assert" - "io" - "testing" -) - -func TestMessagesWriter_Close(t *testing.T) { - mw := newPublisher(newPublishStreamMock(), "queue0") - err := mw.Close() - assert.Nil(t, err) -} - -func TestMessagesWriter_Write(t *testing.T) { - cases := map[string]struct { - queue string - msgs []*pb.CloudEvent - ackCount uint32 - err error - }{ - "1 => ack 1": { - queue: "queue0", - msgs: []*pb.CloudEvent{ - { - Id: "msg0", - }, - }, - ackCount: 1, - }, - "3 => ack 2": { - queue: "queue0", - msgs: []*pb.CloudEvent{ - { - Id: "msg0", - }, - { - Id: "msg1", - }, - { - Id: "msg2", - }, - }, - ackCount: 2, - }, - "send eof": { - queue: "send_eof", - msgs: []*pb.CloudEvent{ - { - Id: "msg0", - }, - }, - err: io.EOF, - }, - "recv fail": { - queue: "recv_fail", - msgs: []*pb.CloudEvent{ - { - Id: "msg0", - }, - }, - err: ErrInternal, - }, - "recv eof": { - queue: "recv_eof", - msgs: []*pb.CloudEvent{ - { - Id: "msg0", - }, - }, - err: io.EOF, - }, - } - for k, c := range cases { - t.Run(k, func(t *testing.T) { - mw := newPublisher(newPublishStreamMock(), c.queue) - ackCount, err := mw.Write(context.TODO(), c.msgs) - assert.Equal(t, c.ackCount, ackCount) - assert.ErrorIs(t, err, c.err) - }) - } -} diff --git a/api/grpc/events/service.go b/api/grpc/events/service.go index 6308fc2..f17834b 100644 --- a/api/grpc/events/service.go +++ b/api/grpc/events/service.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "github.com/awakari/pub/model" + "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "io" @@ -12,7 +12,7 @@ import ( type Service interface { SetStream(ctx context.Context, topic string, limit uint32) (err error) - NewPublisher(ctx context.Context, topic string) (p model.MessagesWriter, err error) + Publish(ctx context.Context, topic string, evts []*pb.CloudEvent) (ackCount uint32, err error) } type service struct { @@ -39,11 +39,14 @@ func (svc service) SetStream(ctx context.Context, topic string, limit uint32) (e return } -func (svc service) NewPublisher(ctx context.Context, topic string) (p model.MessagesWriter, err error) { - var stream Service_PublishClient - stream, err = svc.client.Publish(ctx) - if err == nil { - p = newPublisher(stream, topic) +func (svc service) Publish(ctx context.Context, topic string, evts []*pb.CloudEvent) (ackCount uint32, err error) { + var resp *PublishResponse + resp, err = svc.client.PublishBatch(ctx, &PublishRequest{ + Topic: topic, + Evts: evts, + }) + if resp != nil { + ackCount = resp.AckCount } err = decodeError(err) return diff --git a/api/grpc/events/service.proto b/api/grpc/events/service.proto index 3b235e8..d0edfe4 100644 --- a/api/grpc/events/service.proto +++ b/api/grpc/events/service.proto @@ -11,7 +11,7 @@ service Service { rpc SetStream(SetStreamRequest) returns (SetStreamResponse); // Publish events to the specified topic. - rpc Publish(stream PublishRequest) returns (stream PublishResponse); + rpc PublishBatch(PublishRequest) returns (PublishResponse); } message SetStreamRequest { diff --git a/api/grpc/events/service_mock.go b/api/grpc/events/service_mock.go index 14367ec..b96f249 100644 --- a/api/grpc/events/service_mock.go +++ b/api/grpc/events/service_mock.go @@ -2,12 +2,16 @@ package events import ( "context" - "github.com/awakari/pub/model" + "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" ) type serviceMock struct { } +func NewServiceMock() Service { + return serviceMock{} +} + func (sm serviceMock) SetStream(ctx context.Context, topic string, limit uint32) (err error) { switch topic { case "": @@ -18,16 +22,7 @@ func (sm serviceMock) SetStream(ctx context.Context, topic string, limit uint32) return } -func NewServiceMock() Service { - return serviceMock{} -} +func (sm serviceMock) Publish(ctx context.Context, topic string, evts []*pb.CloudEvent) (ackCount uint32, err error) { -func (sm serviceMock) NewPublisher(ctx context.Context, topic string) (mw model.MessagesWriter, err error) { - switch topic { - case "fail": - err = ErrInternal - default: - mw = NewPublisherMock() - } return } diff --git a/api/grpc/events/service_test.go b/api/grpc/events/service_test.go index bb34f2a..f5fb9da 100644 --- a/api/grpc/events/service_test.go +++ b/api/grpc/events/service_test.go @@ -2,28 +2,12 @@ package events import ( "context" + "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" "github.com/stretchr/testify/assert" "log/slog" "testing" ) -func TestService_NewPublisher(t *testing.T) { - svc := NewService(NewClientMock()) - svc = NewLoggingMiddleware(svc, slog.Default()) - cases := map[string]error{ - "ok": nil, - } - for k, expectedErr := range cases { - t.Run(k, func(t *testing.T) { - qw, err := svc.NewPublisher(context.TODO(), k) - assert.ErrorIs(t, err, expectedErr) - if err == nil { - assert.NotNil(t, qw) - } - }) - } -} - func TestService_SetStream(t *testing.T) { svc := NewService(NewClientMock()) svc = NewLoggingMiddleware(svc, slog.Default()) @@ -50,3 +34,29 @@ func TestService_SetStream(t *testing.T) { }) } } + +func TestService_Publish(t *testing.T) { + svc := NewService(NewClientMock()) + svc = NewLoggingMiddleware(svc, slog.Default()) + cases := map[string]struct { + topic string + ackCount uint32 + err error + }{ + "ok": { + topic: "ok", + ackCount: 42, + }, + "fail": { + topic: "fail", + err: ErrInternal, + }, + } + for k, c := range cases { + t.Run(k, func(t *testing.T) { + ackCount, err := svc.Publish(context.TODO(), c.topic, []*pb.CloudEvent{}) + assert.Equal(t, c.ackCount, ackCount) + assert.ErrorIs(t, err, c.err) + }) + } +} diff --git a/api/grpc/publisher/service.go b/api/grpc/publisher/service.go index d936538..f4e2775 100644 --- a/api/grpc/publisher/service.go +++ b/api/grpc/publisher/service.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/awakari/pub/api/grpc/auth" "github.com/awakari/pub/api/grpc/events" "github.com/awakari/pub/api/grpc/limits" "github.com/awakari/pub/api/grpc/permits" @@ -14,13 +13,11 @@ import ( "github.com/segmentio/ksuid" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/timestamppb" - "time" ) type Service interface { - SubmitPermittedEvents(ctx context.Context, streamClient events.Service_PublishClient, req *SubmitMessagesRequest, groupId, userId string) (resp *SubmitMessagesResponse, err error) - SubmitInternalEvents(ctx context.Context, streamClient events.Service_PublishClient, req *SubmitMessagesRequest) (resp *SubmitMessagesResponse, err error) + SubmitPermittedEvents(ctx context.Context, req *SubmitMessagesRequest, groupId, userId string) (resp *SubmitMessagesResponse, err error) + SubmitInternalEvents(ctx context.Context, req *SubmitMessagesRequest) (resp *SubmitMessagesResponse, err error) } type svc struct { @@ -44,64 +41,14 @@ func NewService(client events.ServiceClient, svcPermits permits.Service, cfgEvts } } -func (s svc) SubmitMessages(streamSrv Service_SubmitMessagesServer) (err error) { - ctx := streamSrv.Context() - var groupId string - var userId string - groupId, userId, err = auth.GetIncomingAuthInfo(ctx) - var streamClient events.Service_PublishClient - if err == nil { - streamClient, err = s.client.Publish(ctx) - } - if err == nil { - var req *SubmitMessagesRequest - var resp *SubmitMessagesResponse - for { - req, err = streamSrv.Recv() - if err == nil { - for _, evt := range req.Msgs { - if evt.Attributes == nil { - evt.Attributes = make(map[string]*pb.CloudEventAttributeValue) - } - evt.Attributes[model.KeyCeGroupId] = &pb.CloudEventAttributeValue{ - Attr: &pb.CloudEventAttributeValue_CeString{ - CeString: groupId, - }, - } - evt.Attributes[model.KeyCeUserId] = &pb.CloudEventAttributeValue{ - Attr: &pb.CloudEventAttributeValue_CeString{ - CeString: userId, - }, - } - evt.Attributes[model.KeyCePubTime] = &pb.CloudEventAttributeValue{ - Attr: &pb.CloudEventAttributeValue_CeTimestamp{ - CeTimestamp: timestamppb.New(time.Now().UTC()), - }, - } - } - resp, err = s.SubmitPermittedEvents(ctx, streamClient, req, groupId, userId) - } - // proxy a response - if err == nil { - err = streamSrv.Send(resp) - } - // exit the loop on an error - if err != nil { - break - } - } - } - return -} - -func (s svc) SubmitPermittedEvents(ctx context.Context, streamClient events.Service_PublishClient, req *SubmitMessagesRequest, groupId, userId string) (resp *SubmitMessagesResponse, err error) { +func (s svc) SubmitPermittedEvents(ctx context.Context, req *SubmitMessagesRequest, groupId, userId string) (resp *SubmitMessagesResponse, err error) { // allocate permit var permit model.Permit permit, err = s.svcPermits.Request(ctx, groupId, userId, subjPubMsgs, uint32(len(req.Msgs))) err = encodeError(err) // utilize permit if err == nil { - resp, err = s.utilizePermit(streamClient, req, permit, groupId) + resp, err = s.utilizePermit(ctx, req, permit, groupId) } var usedCount uint32 if err == nil { @@ -115,16 +62,13 @@ func (s svc) SubmitPermittedEvents(ctx context.Context, streamClient events.Serv return } -func (s svc) SubmitInternalEvents(_ context.Context, streamClient events.Service_PublishClient, req *SubmitMessagesRequest) (resp *SubmitMessagesResponse, err error) { +func (s svc) SubmitInternalEvents(ctx context.Context, req *SubmitMessagesRequest) (resp *SubmitMessagesResponse, err error) { // proxy a request - err = streamClient.Send(&events.PublishRequest{ + var dstResp *events.PublishResponse + dstResp, err = s.client.PublishBatch(ctx, &events.PublishRequest{ Topic: s.cfgEvts.Topic, Evts: req.Msgs, }) - var dstResp *events.PublishResponse - if err == nil { - dstResp, err = streamClient.Recv() - } if dstResp != nil { resp = &SubmitMessagesResponse{ AckCount: dstResp.AckCount, @@ -133,10 +77,10 @@ func (s svc) SubmitInternalEvents(_ context.Context, streamClient events.Service return } -func (s svc) utilizePermit(streamClient events.Service_PublishClient, srcReq *SubmitMessagesRequest, permit model.Permit, groupId string) (resp *SubmitMessagesResponse, err error) { +func (s svc) utilizePermit(ctx context.Context, srcReq *SubmitMessagesRequest, permit model.Permit, groupId string) (resp *SubmitMessagesResponse, err error) { // send the message if permit is just exhausted for the 1st time since last reset if permit.JustExhausted { - resp, err = s.notifyLimitReached(streamClient, srcReq, groupId, permit.UserId) + resp, err = s.notifyLimitReached(ctx, srcReq, groupId, permit.UserId) } // var dstReq *events.PublishRequest @@ -144,12 +88,9 @@ func (s svc) utilizePermit(streamClient events.Service_PublishClient, srcReq *Su dstReq, err = s.applyPermit(srcReq, permit) } // proxy a request - if err == nil { - err = streamClient.Send(dstReq) - } var dstResp *events.PublishResponse if err == nil { - dstResp, err = streamClient.Recv() + dstResp, err = s.client.PublishBatch(ctx, dstReq) } if dstResp != nil { resp = &SubmitMessagesResponse{ @@ -160,7 +101,7 @@ func (s svc) utilizePermit(streamClient events.Service_PublishClient, srcReq *Su } func (s svc) notifyLimitReached( - streamClient events.Service_PublishClient, + ctx context.Context, srcReq *SubmitMessagesRequest, groupId, userId string, ) ( @@ -200,7 +141,7 @@ func (s svc) notifyLimitReached( &evt, }, } - resp, err = s.SubmitInternalEvents(context.TODO(), streamClient, &req) + resp, err = s.SubmitInternalEvents(ctx, &req) fmt.Printf("user %s daily publishing limit reached notification %s status: %+v, %s\n", userId, evt.Id, resp, err) return } diff --git a/api/grpc/publisher/service.proto b/api/grpc/publisher/service.proto index 104cb39..2fd93f2 100644 --- a/api/grpc/publisher/service.proto +++ b/api/grpc/publisher/service.proto @@ -6,10 +6,6 @@ option go_package = "api/grpc/publisher"; import "api/grpc/ce/cloudevent.proto"; -service Service { - rpc SubmitMessages(stream SubmitMessagesRequest) returns (stream SubmitMessagesResponse); -} - message SubmitMessagesRequest { repeated ce.CloudEvent msgs = 1; } diff --git a/api/http/pub/handler.go b/api/http/pub/handler.go index 475871c..24d6918 100644 --- a/api/http/pub/handler.go +++ b/api/http/pub/handler.go @@ -2,7 +2,6 @@ package pub import ( "fmt" - "github.com/awakari/pub/api/grpc/events" "github.com/awakari/pub/api/grpc/publisher" "github.com/awakari/pub/api/http/grpc" "github.com/awakari/pub/config" @@ -10,7 +9,6 @@ import ( "github.com/bytedance/sonic" "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" "github.com/gin-gonic/gin" - grpcpool "github.com/processout/grpc-go-pool" "go.uber.org/ratelimit" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -30,7 +28,6 @@ type Handler interface { type handler struct { writer publisher.Service writerInternalCfg config.WriterInternalConfig - connPoolEvts *grpcpool.Pool writerInternalRateLimit ratelimit.Limiter blacklist model.Prefixes[model.BlacklistValue] log *slog.Logger @@ -39,14 +36,12 @@ type handler struct { func NewHandler( writer publisher.Service, writerInternalCfg config.WriterInternalConfig, - connPoolEvts *grpcpool.Pool, blacklist model.Prefixes[model.BlacklistValue], log *slog.Logger, ) Handler { return handler{ writer: writer, writerInternalCfg: writerInternalCfg, - connPoolEvts: connPoolEvts, writerInternalRateLimit: ratelimit.New(writerInternalCfg.RateLimitPerMinute, ratelimit.Per(time.Minute)), blacklist: blacklist, log: log, @@ -150,49 +145,37 @@ func (h handler) write(ctx *gin.Context, evts []*pb.CloudEvent, internal bool) { } } - conn, err := h.connPoolEvts.Get(ctx) - - var streamClient events.Service_PublishClient - if err == nil { - c := conn.ClientConn - conn.Close() // return back to the conn pool immediately - client := events.NewServiceClient(c) - streamClient, err = client.Publish(ctx) - } - - var resp *publisher.SubmitMessagesResponse - if err == nil { - defer streamClient.CloseSend() - grpcCtx, groupId, userId := grpc.AuthRequestContext(ctx) - for _, evt := range evts { - if evt.Attributes == nil { - evt.Attributes = make(map[string]*pb.CloudEventAttributeValue) - } - evt.Attributes[model.KeyCeGroupId] = &pb.CloudEventAttributeValue{ - Attr: &pb.CloudEventAttributeValue_CeString{ - CeString: groupId, - }, - } - evt.Attributes[model.KeyCeUserId] = &pb.CloudEventAttributeValue{ - Attr: &pb.CloudEventAttributeValue_CeString{ - CeString: userId, - }, - } - evt.Attributes[model.KeyCePubTime] = &pb.CloudEventAttributeValue{ - Attr: &pb.CloudEventAttributeValue_CeTimestamp{ - CeTimestamp: timestamppb.New(time.Now().UTC()), - }, - } + grpcCtx, groupId, userId := grpc.AuthRequestContext(ctx) + for _, evt := range evts { + if evt.Attributes == nil { + evt.Attributes = make(map[string]*pb.CloudEventAttributeValue) + } + evt.Attributes[model.KeyCeGroupId] = &pb.CloudEventAttributeValue{ + Attr: &pb.CloudEventAttributeValue_CeString{ + CeString: groupId, + }, } - req := publisher.SubmitMessagesRequest{ - Msgs: evts, + evt.Attributes[model.KeyCeUserId] = &pb.CloudEventAttributeValue{ + Attr: &pb.CloudEventAttributeValue_CeString{ + CeString: userId, + }, } - if internal { - resp, err = h.writer.SubmitInternalEvents(grpcCtx, streamClient, &req) - } else { - resp, err = h.writer.SubmitPermittedEvents(grpcCtx, streamClient, &req, groupId, userId) + evt.Attributes[model.KeyCePubTime] = &pb.CloudEventAttributeValue{ + Attr: &pb.CloudEventAttributeValue_CeTimestamp{ + CeTimestamp: timestamppb.New(time.Now().UTC()), + }, } } + req := publisher.SubmitMessagesRequest{ + Msgs: evts, + } + var resp *publisher.SubmitMessagesResponse + var err error + if internal { + resp, err = h.writer.SubmitInternalEvents(grpcCtx, &req) + } else { + resp, err = h.writer.SubmitPermittedEvents(grpcCtx, &req, groupId, userId) + } if err == nil && resp.AckCount == 0 { ctx.String(http.StatusServiceUnavailable, "was unable to submit, retry later") diff --git a/helm/pub/templates/deployment.yaml b/helm/pub/templates/deployment.yaml index 83e2597..04aa606 100644 --- a/helm/pub/templates/deployment.yaml +++ b/helm/pub/templates/deployment.yaml @@ -107,6 +107,9 @@ spec: - name: http containerPort: {{ .Values.service.port.http }} protocol: TCP + - name: prof + containerPort: {{ .Values.service.port.prof }} + protocol: TCP livenessProbe: tcpSocket: port: {{ .Values.service.port.http }} diff --git a/helm/pub/values.yaml b/helm/pub/values.yaml index 97a4d4d..2351e4d 100644 --- a/helm/pub/values.yaml +++ b/helm/pub/values.yaml @@ -39,6 +39,7 @@ service: type: ClusterIP port: http: 8080 + prof: 6060 ingress: enabled: true diff --git a/main.go b/main.go index 4207e62..8abbb6a 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "log/slog" + //_ "net/http/pprof" "os" ) @@ -183,7 +184,6 @@ func main() { handlerPub := v2.NewHandler( publisher.NewService(clientEvts, svcPermits, cfg.Api.Events), cfg.Api.Writer.Internal, - connPoolEvts, blacklist, log, ) @@ -202,6 +202,11 @@ func main() { authSrcTg := auth2.NewTelegramValidator(svcSrcTg) + // expose the profiling + //go func() { + // _ = http.ListenAndServe("localhost:6060", nil) + //}() + r := gin.Default() r. Group("/v1/src/:type"). diff --git a/scripts/cover.sh b/scripts/cover.sh index d753eb9..d84780f 100755 --- a/scripts/cover.sh +++ b/scripts/cover.sh @@ -1,7 +1,7 @@ #!/bin/bash COVERAGE=$(cat cover.tmp) -THRESHOLD=10 +THRESHOLD=9 if [[ ${COVERAGE} -lt ${THRESHOLD} ]]; \ then \ echo "FAILED: test coverage ${COVERAGE}% < ${THRESHOLD}%"; \