Skip to content

Commit

Permalink
Add flexibility in subject matching for nats_jetstream
Browse files Browse the repository at this point in the history
Signed-off-by: Evan Anderson <[email protected]>
  • Loading branch information
evankanderson committed Aug 4, 2024
1 parent 5f0cc73 commit ab040db
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion protocol/nats_jetstream/v2/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package nats_jetstream
import (
"context"
"io"
"strings"
"sync"

"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -89,10 +90,21 @@ func NewConsumerFromConn(conn *nats.Conn, stream, subject string, jsmOpts []nats

streamInfo, err := jsm.StreamInfo(stream, jsmOpts...)

subjectMatch := stream + ".*"
if strings.Count(strings.TrimPrefix(subject, stream), ".") > 1 {
// More than one "." in the remainder of subject, use ".>" to match
subjectMatch = stream + ".>"
}
if !strings.HasPrefix(subject, stream) {
// Use an empty subject parameter in conjunction with
// nats.ConsumerFilterSubjects
subjectMatch = ""
}

if streamInfo == nil || err != nil && err.Error() == "stream not found" {
_, err = jsm.AddStream(&nats.StreamConfig{
Name: stream,
Subjects: []string{stream + ".*"},
Subjects: []string{subjectMatch},
})
if err != nil {
return nil, err
Expand Down

0 comments on commit ab040db

Please sign in to comment.