From c58b9331b4aed51a351635ec5b0d9abbfdb560df Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Fri, 2 Aug 2024 13:32:43 -0700 Subject: [PATCH] Add flexibility in subject matching for nats_jetstream Signed-off-by: Evan Anderson --- protocol/nats_jetstream/v2/receiver.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/protocol/nats_jetstream/v2/receiver.go b/protocol/nats_jetstream/v2/receiver.go index 4f6aeaec3..e05b7419a 100644 --- a/protocol/nats_jetstream/v2/receiver.go +++ b/protocol/nats_jetstream/v2/receiver.go @@ -8,6 +8,7 @@ package nats_jetstream import ( "context" "io" + "strings" "sync" "github.com/nats-io/nats.go" @@ -89,10 +90,20 @@ func NewConsumerFromConn(conn *nats.Conn, stream, subject string, jsmOpts []nats streamInfo, err := jsm.StreamInfo(stream, jsmOpts...) + subjectMatch := stream + ".*" + if !strings.HasPrefix(subject, stream) { + // Use an empty subject parameter in conjunction with + // nats.ConsumerFilterSubjects + subjectMatch = "" + } else if strings.Count(subject[len(stream):], ".") > 1 { + // More than one "." in the remainder of subject, use ".>" to match + subjectMatch = stream + ".>" + } + if streamInfo == nil || err != nil && err.Error() == "stream not found" { _, err = jsm.AddStream(&nats.StreamConfig{ Name: stream, - Subjects: []string{stream + ".*"}, + Subjects: []string{subjectMatch}, }) if err != nil { return nil, err