From 9212db4692b1887e879a464defb9e5972279d321 Mon Sep 17 00:00:00 2001 From: stephen-totty-hpe Date: Tue, 9 Jul 2024 10:22:26 -0400 Subject: [PATCH] add ability to use PullConsumers with NATS jetstream with a specified callback Signed-off-by: stephen-totty-hpe --- protocol/nats_jetstream/v2/options.go | 16 ++++++++ protocol/nats_jetstream/v2/receiver.go | 40 ++++++++++++++++++-- protocol/nats_jetstream/v2/subscriber.go | 16 ++++++++ test/integration/nats_jetstream/nats_test.go | 26 +++++++++++++ 4 files changed, 95 insertions(+), 3 deletions(-) diff --git a/protocol/nats_jetstream/v2/options.go b/protocol/nats_jetstream/v2/options.go index f86642334..5f919467b 100644 --- a/protocol/nats_jetstream/v2/options.go +++ b/protocol/nats_jetstream/v2/options.go @@ -12,6 +12,8 @@ import ( ) var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber") +var ErrInvalidDurableName = errors.New("invalid durable name for PullSubscriber") +var ErrInvalidCallbackFunc = errors.New("invalid callback function for PullSubscriber") // NatsOptions is a helper function to group a variadic nats.ProtocolOption into // []nats.Option that can be used by either Sender, Consumer or Protocol @@ -50,3 +52,17 @@ func WithQueueSubscriber(queue string) ConsumerOption { return nil } } + +// WithPullSubscriber configures the Consumer to pull messages when subscribing +func WithPullSubscriber(durable string, fetchCallback FetchCallbackFunc) ConsumerOption { + return func(c *Consumer) error { + if durable == "" { + return ErrInvalidDurableName + } + if fetchCallback == nil { + return ErrInvalidCallbackFunc + } + c.Subscriber = &PullSubscriber{Durable: durable, FetchCallback: fetchCallback} + return nil + } +} diff --git a/protocol/nats_jetstream/v2/receiver.go b/protocol/nats_jetstream/v2/receiver.go index 4f6aeaec3..e58a5ea34 100644 --- a/protocol/nats_jetstream/v2/receiver.go +++ b/protocol/nats_jetstream/v2/receiver.go @@ -128,9 +128,22 @@ func (c *Consumer) OpenInbound(ctx context.Context) error { } // Wait until external or internal context done - select { - case <-ctx.Done(): - case <-c.internalClose: + pullSubscriber, isPullSubscriber := c.Subscriber.(*PullSubscriber) + if isPullSubscriber { + err = c.pullSubscriptionProcessor(ctx, sub, pullSubscriber) + if err != nil { + errDrain := sub.Drain() + if errDrain != nil { + return errDrain + } + return err + } + } else { + // Wait until external or internal context done + select { + case <-ctx.Done(): + case <-c.internalClose: + } } // Finish to consume messages in the queue and close the subscription @@ -163,6 +176,27 @@ func (c *Consumer) applyOptions(opts ...ConsumerOption) error { return nil } +// pullSubscriptionProcessor allows the pull subscriber to control some aspects of the fetch calls +func (c *Consumer) pullSubscriptionProcessor(ctx context.Context, natsSub *nats.Subscription, ps *PullSubscriber) error { + for { + // Wait until external or internal context done + select { + case <-ctx.Done(): + return nil + case <-c.internalClose: + return nil + default: + msgs, err := ps.FetchCallback(natsSub) + if err != nil { + return err + } + for i := range msgs { + c.Receiver.MsgHandler(msgs[i]) + } + } + } +} + var _ protocol.Opener = (*Consumer)(nil) var _ protocol.Receiver = (*Consumer)(nil) var _ protocol.Closer = (*Consumer)(nil) diff --git a/protocol/nats_jetstream/v2/subscriber.go b/protocol/nats_jetstream/v2/subscriber.go index 3c9a22928..7d8df1fad 100644 --- a/protocol/nats_jetstream/v2/subscriber.go +++ b/protocol/nats_jetstream/v2/subscriber.go @@ -36,3 +36,19 @@ func (s *QueueSubscriber) Subscribe(jsm nats.JetStreamContext, subject string, c } var _ Subscriber = (*QueueSubscriber)(nil) + +// FetchCallbackFunc defines a callback where Fetch should be called against nats.Subscription +type FetchCallbackFunc func(natsSub *nats.Subscription) ([]*nats.Msg, error) + +// PullSubscriber creates pull subscriptions +type PullSubscriber struct { + Durable string + FetchCallback FetchCallbackFunc +} + +// Subscribe implements Subscriber.Subscribe +func (s *PullSubscriber) Subscribe(jsm nats.JetStreamContext, subject string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) { + return jsm.PullSubscribe(subject, s.Durable, opts...) +} + +var _ Subscriber = (*PullSubscriber)(nil) diff --git a/test/integration/nats_jetstream/nats_test.go b/test/integration/nats_jetstream/nats_test.go index 0c9d5b554..cc3d57159 100644 --- a/test/integration/nats_jetstream/nats_test.go +++ b/test/integration/nats_jetstream/nats_test.go @@ -54,6 +54,17 @@ func TestSendReceiveStructuredAndBinary(t *testing.T) { bindingEncoding: binding.EncodingStructured, }, }, + { + name: "pull subscriber - structured", + args: args{ + opts: []ce_nats.ProtocolOption{ + ce_nats.WithConsumerOptions( + ce_nats.WithPullSubscriber(uuid.New().String(), fetchCallback), + ), + }, + bindingEncoding: binding.EncodingStructured, + }, + }, { name: "regular subscriber - binary", args: args{ @@ -70,6 +81,17 @@ func TestSendReceiveStructuredAndBinary(t *testing.T) { bindingEncoding: binding.EncodingBinary, }, }, + { + name: "pull subscriber - binary", + args: args{ + opts: []ce_nats.ProtocolOption{ + ce_nats.WithConsumerOptions( + ce_nats.WithPullSubscriber(uuid.New().String(), fetchCallback), + ), + }, + bindingEncoding: binding.EncodingBinary, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -137,6 +159,10 @@ func testProtocol(t testing.TB, natsConn *nats.Conn, opts ...ce_nats.ProtocolOpt }, p.Sender, p.Consumer } +func fetchCallback(natsSub *nats.Subscription) ([]*nats.Msg, error) { + return natsSub.Fetch(1) +} + func BenchmarkSendReceive(b *testing.B) { conn := testConn(b) defer conn.Close()