Skip to content

Commit

Permalink
feat(config): Update stream name and topic names
Browse files Browse the repository at this point in the history
The code changes involve updating the "thingsStream" variable and modifying configurations and log messages in multiple files across different directories and packages. The changes include adding "events." to the stream name, updating topic names, and modifying log messages for handling NATS and RabbitMQ events.

Signed-off-by: Rodney Osodo <[email protected]>
  • Loading branch information
rodneyosodo committed Feb 12, 2024
1 parent 22bf1dc commit 79bc9f9
Show file tree
Hide file tree
Showing 12 changed files with 303 additions and 300 deletions.
2 changes: 1 addition & 1 deletion cmd/bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
defDB = "bootstrap"
defSvcHTTPPort = "9013"

thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
streamID = "magistrala.bootstrap"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/lora/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
thingsRMPrefix = "thing"
channelsRMPrefix = "channel"
connsRMPrefix = "connection"
thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
)

type config struct {
Expand Down
2 changes: 1 addition & 1 deletion cmd/opcua/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
channelsRMPrefix = "channel"
connectionRMPrefix = "connection"

thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
)

type config struct {
Expand Down
10 changes: 5 additions & 5 deletions pkg/events/mocks/subscriber.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

181 changes: 93 additions & 88 deletions pkg/events/nats/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ import (
)

var (
streamTopic = "test-topic"
eventsChan = make(chan map[string]interface{})
logger = mglog.NewMock()
errFailed = errors.New("failed")
numEvents = 100
eventsChan = make(chan map[string]interface{})
logger = mglog.NewMock()
errFailed = errors.New("failed")
numEvents = 100
)

type testEvent struct {
Expand Down Expand Up @@ -56,15 +55,17 @@ func TestPublish(t *testing.T) {

publisher, err := nats.NewPublisher(context.Background(), natsURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))
defer publisher.Close()

_, err = nats.NewSubscriber(context.Background(), "http://invaliurl.com", logger)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)

subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))
defer subcriber.Close()

cfg := events.SubscriberConfig{
Stream: stream,
Stream: "events." + stream,
Consumer: consumer,
Handler: handler{},
}
Expand Down Expand Up @@ -129,113 +130,117 @@ func TestPublish(t *testing.T) {
}

for _, tc := range cases {
event := testEvent{Data: tc.event}

err := publisher.Publish(context.Background(), event)
switch tc.err {
case nil:
receivedEvent := <-eventsChan

val := int64(receivedEvent["occurred_at"].(float64))
if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) {
delete(receivedEvent, "occurred_at")
delete(tc.event, "occurred_at")
t.Run(tc.desc, func(t *testing.T) {
event := testEvent{Data: tc.event}

err := publisher.Publish(context.Background(), event)
switch tc.err {
case nil:
receivedEvent := <-eventsChan

val := int64(receivedEvent["occurred_at"].(float64))
if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) {
delete(receivedEvent, "occurred_at")
delete(tc.event, "occurred_at")
}

assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"])
assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"])
assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"])
assert.Equal(t, tc.event["status"], receivedEvent["status"])
assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"])
assert.Equal(t, tc.event["operation"], receivedEvent["operation"])

default:
assert.ErrorContains(t, err, tc.err.Error())
}

assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"])
assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"])
assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"])
assert.Equal(t, tc.event["status"], receivedEvent["status"])
assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"])
assert.Equal(t, tc.event["operation"], receivedEvent["operation"])
default:
assert.ErrorContains(t, err, tc.err.Error())
}
})
}
}

func TestPubsub(t *testing.T) {
subcases := []struct {
desc string
stream string
consumer string
errorMessage error
handler events.EventHandler
cases := []struct {
desc string
stream string
consumer string
err error
handler events.EventHandler
}{
{
desc: "Subscribe to a stream",
stream: fmt.Sprintf("%s.%s", stream, streamTopic),
consumer: consumer,
errorMessage: nil,
handler: handler{false},
desc: "Subscribe to a stream",
stream: fmt.Sprintf("events.%s", stream),
consumer: consumer,
err: nil,
handler: handler{false},
},
{
desc: "Subscribe to the same stream",
stream: fmt.Sprintf("%s.%s", stream, streamTopic),
consumer: consumer,
errorMessage: nil,
handler: handler{false},
desc: "Subscribe to the same stream",
stream: fmt.Sprintf("events.%s", stream),
consumer: consumer,
err: nil,
handler: handler{false},
},
{
desc: "Subscribe to an empty stream with an empty consumer",
stream: "",
consumer: "",
errorMessage: nats.ErrEmptyStream,
handler: handler{false},
desc: "Subscribe to an empty stream with an empty consumer",
stream: "",
consumer: "",
err: nats.ErrEmptyConsumer,
handler: handler{false},
},
{
desc: "Subscribe to an empty stream with a valid consumer",
stream: "",
consumer: consumer,
errorMessage: nats.ErrEmptyStream,
handler: handler{false},
desc: "Subscribe to an empty stream with a valid consumer",
stream: "",
consumer: consumer,
err: nats.ErrEmptyStream,
handler: handler{false},
},
{
desc: "Subscribe to a valid stream with an empty consumer",
stream: fmt.Sprintf("%s.%s", stream, streamTopic),
consumer: "",
errorMessage: nats.ErrEmptyConsumer,
handler: handler{false},
desc: "Subscribe to a valid stream with an empty consumer",
stream: fmt.Sprintf("events.%s", stream),
consumer: "",
err: nats.ErrEmptyConsumer,
handler: handler{false},
},
{
desc: "Subscribe to another stream",
stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"),
consumer: consumer,
errorMessage: nil,
handler: handler{false},
desc: "Subscribe to another stream",
stream: fmt.Sprintf("events.%s.%d", stream, 1),
consumer: consumer,
err: nil,
handler: handler{false},
},
{
desc: "Subscribe to a stream with malformed handler",
stream: fmt.Sprintf("%s.%s", stream, streamTopic),
consumer: consumer,
errorMessage: nil,
handler: handler{true},
desc: "Subscribe to a stream with malformed handler",
stream: fmt.Sprintf("events.%s", stream),
consumer: consumer,
err: nil,
handler: handler{true},
},
}

for _, pc := range subcases {
subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger)
if err != nil {
assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err))
for _, pc := range cases {
t.Run(pc.desc, func(t *testing.T) {
subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger)
if err != nil {
assert.Equal(t, err, pc.err)

continue
}
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
return
}

cfg := events.SubscriberConfig{
Stream: pc.stream,
Consumer: pc.consumer,
Handler: pc.handler,
}
switch err := subcriber.Subscribe(context.Background(), cfg); {
case err == nil:
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
default:
assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err))
}
cfg := events.SubscriberConfig{
Stream: pc.stream,
Consumer: pc.consumer,
Handler: pc.handler,
}
switch err := subcriber.Subscribe(context.Background(), cfg); {
case err == nil:
assert.Nil(t, err)
default:
assert.Equal(t, err, pc.err)
}

err = subcriber.Close()
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
err = subcriber.Close()
assert.Nil(t, err)
})
}
}

Expand Down
8 changes: 3 additions & 5 deletions pkg/events/nats/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import (
"github.com/nats-io/nats.go/jetstream"
)

const (
maxReconnects = -1
)
const maxReconnects = -1

var _ events.Subscriber = (*subEventStore)(nil)

Expand Down Expand Up @@ -88,7 +86,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberCon

subCfg := messaging.SubscriberConfig{
ID: cfg.Consumer,
Topic: eventsPrefix + "." + cfg.Stream,
Topic: cfg.Stream,
Handler: &eventHandler{
handler: cfg.Handler,
ctx: ctx,
Expand Down Expand Up @@ -129,7 +127,7 @@ func (eh *eventHandler) Handle(msg *messaging.Message) error {
}

if err := eh.handler.Handle(eh.ctx, event); err != nil {
eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err))
eh.logger.Warn(fmt.Sprintf("failed to handle nats event: %s", err))
}

return nil
Expand Down
Loading

0 comments on commit 79bc9f9

Please sign in to comment.