From ab040db881e816a3668a4d7654e95607446fcf22 Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Fri, 2 Aug 2024 13:32:43 -0700 Subject: [PATCH] Add flexibility in subject matching for nats_jetstream Signed-off-by: Evan Anderson --- protocol/nats_jetstream/v2/receiver.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/protocol/nats_jetstream/v2/receiver.go b/protocol/nats_jetstream/v2/receiver.go index 4f6aeaec3..75122488f 100644 --- a/protocol/nats_jetstream/v2/receiver.go +++ b/protocol/nats_jetstream/v2/receiver.go @@ -8,6 +8,7 @@ package nats_jetstream import ( "context" "io" + "strings" "sync" "github.com/nats-io/nats.go" @@ -89,10 +90,21 @@ func NewConsumerFromConn(conn *nats.Conn, stream, subject string, jsmOpts []nats streamInfo, err := jsm.StreamInfo(stream, jsmOpts...) + subjectMatch := stream + ".*" + if strings.Count(strings.TrimPrefix(subject, stream), ".") > 1 { + // More than one "." in the remainder of subject, use ".>" to match + subjectMatch = stream + ".>" + } + if !strings.HasPrefix(subject, stream) { + // Use an empty subject parameter in conjunction with + // nats.ConsumerFilterSubjects + subjectMatch = "" + } + if streamInfo == nil || err != nil && err.Error() == "stream not found" { _, err = jsm.AddStream(&nats.StreamConfig{ Name: stream, - Subjects: []string{stream + ".*"}, + Subjects: []string{subjectMatch}, }) if err != nil { return nil, err