From da3fdf3ff7bdddaf207c91588d0cd329dafeec5f Mon Sep 17 00:00:00 2001 From: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Date: Wed, 10 Jan 2024 16:18:48 +0300 Subject: [PATCH] feat(config): Update stream name and topic names The code changes involve updating the "thingsStream" variable and modifying configurations and log messages in multiple files across different directories and packages. The changes include adding "events." to the stream name, updating topic names, and modifying log messages for handling NATS and RabbitMQ events. Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> --- cmd/bootstrap/main.go | 2 +- cmd/lora/main.go | 2 +- cmd/opcua/main.go | 2 +- pkg/events/mocks/subscriber.go | 10 +- pkg/events/nats/publisher.go | 11 -- pkg/events/nats/publisher_test.go | 231 ++++++++++--------------- pkg/events/nats/setup_test.go | 15 +- pkg/events/nats/subscriber.go | 8 +- pkg/events/rabbitmq/publisher_test.go | 233 +++++++++++--------------- pkg/events/rabbitmq/setup_test.go | 7 +- pkg/events/rabbitmq/subscriber.go | 4 +- pkg/events/redis/publisher_test.go | 189 +++++++++++---------- pkg/events/redis/subscriber.go | 2 +- 13 files changed, 304 insertions(+), 412 deletions(-) diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go index fe3bfa210a1..3c5efa711f2 100644 --- a/cmd/bootstrap/main.go +++ b/cmd/bootstrap/main.go @@ -45,7 +45,7 @@ const ( defDB = "bootstrap" defSvcHTTPPort = "9013" - thingsStream = "magistrala.things" + thingsStream = "events.magistrala.things" streamID = "magistrala.bootstrap" ) diff --git a/cmd/lora/main.go b/cmd/lora/main.go index 24d36f59813..c34c1afe363 100644 --- a/cmd/lora/main.go +++ b/cmd/lora/main.go @@ -44,7 +44,7 @@ const ( thingsRMPrefix = "thing" channelsRMPrefix = "channel" connsRMPrefix = "connection" - thingsStream = "magistrala.things" + thingsStream = "events.magistrala.things" ) type config struct { diff --git a/cmd/opcua/main.go b/cmd/opcua/main.go index 077f4340fe7..f5425530ab2 100644 --- a/cmd/opcua/main.go +++ b/cmd/opcua/main.go @@ -43,7 +43,7 @@ const ( channelsRMPrefix = "channel" connectionRMPrefix = "connection" - thingsStream = "magistrala.things" + thingsStream = "events.magistrala.things" ) type config struct { diff --git a/pkg/events/mocks/subscriber.go b/pkg/events/mocks/subscriber.go index 38d40023870..4de1ceb50f1 100644 --- a/pkg/events/mocks/subscriber.go +++ b/pkg/events/mocks/subscriber.go @@ -34,17 +34,17 @@ func (_m *Subscriber) Close() error { return r0 } -// Subscribe provides a mock function with given fields: ctx, handler -func (_m *Subscriber) Subscribe(ctx context.Context, handler events.EventHandler) error { - ret := _m.Called(ctx, handler) +// Subscribe provides a mock function with given fields: ctx, cfg +func (_m *Subscriber) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error { + ret := _m.Called(ctx, cfg) if len(ret) == 0 { panic("no return value specified for Subscribe") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, events.EventHandler) error); ok { - r0 = rf(ctx, handler) + if rf, ok := ret.Get(0).(func(context.Context, events.SubscriberConfig) error); ok { + r0 = rf(ctx, cfg) } else { r0 = ret.Error(0) } diff --git a/pkg/events/nats/publisher.go b/pkg/events/nats/publisher.go index 29c07f1e741..e711f9701ce 100644 --- a/pkg/events/nats/publisher.go +++ b/pkg/events/nats/publisher.go @@ -50,8 +50,6 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er stream: stream, } - go es.StartPublishingRoutine(ctx) - return es, nil } @@ -74,15 +72,6 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error return es.publisher.Publish(ctx, es.stream, record) } -func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) { - // Nats doesn't need to check for unpublished events - // since the events are published to a buffer. - // The buffer is flushed when the connection is reestablished. - // https://docs.nats.io/using-nats/developer/connecting/reconnect/buffer - - <-ctx.Done() -} - func (es *pubEventStore) Close() error { es.conn.Close() diff --git a/pkg/events/nats/publisher_test.go b/pkg/events/nats/publisher_test.go index cd308392cb8..98dd1e74db8 100644 --- a/pkg/events/nats/publisher_test.go +++ b/pkg/events/nats/publisher_test.go @@ -19,10 +19,9 @@ import ( ) var ( - streamTopic = "test-topic" - eventsChan = make(chan map[string]interface{}) - logger = mglog.NewMock() - errFailed = errors.New("failed") + eventsChan = make(chan map[string]interface{}) + logger = mglog.NewMock() + errFailed = errors.New("failed") ) type testEvent struct { @@ -52,15 +51,17 @@ func (te testEvent) Encode() (map[string]interface{}, error) { func TestPublish(t *testing.T) { publisher, err := nats.NewPublisher(ctx, natsURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer publisher.Close() _, err = nats.NewSubscriber(ctx, "http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) subcriber, err := nats.NewSubscriber(ctx, natsURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer subcriber.Close() cfg := events.SubscriberConfig{ - Stream: stream, + Stream: "events." + stream, Consumer: consumer, Handler: handler{}, } @@ -125,165 +126,117 @@ func TestPublish(t *testing.T) { } for _, tc := range cases { - event := testEvent{Data: tc.event} - - err := publisher.Publish(ctx, event) - switch tc.err { - case nil: - assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) - - receivedEvent := <-eventsChan - - val := int64(receivedEvent["occurred_at"].(float64)) - if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { - delete(receivedEvent, "occurred_at") - delete(tc.event, "occurred_at") - } - - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"], fmt.Sprintf("%s - expected temperature: %s, got: %s", tc.desc, tc.event["temperature"], receivedEvent["temperature"])) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"], fmt.Sprintf("%s - expected humidity: %s, got: %s", tc.desc, tc.event["humidity"], receivedEvent["humidity"])) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"], fmt.Sprintf("%s - expected sensor_id: %s, got: %s", tc.desc, tc.event["sensor_id"], receivedEvent["sensor_id"])) - assert.Equal(t, tc.event["status"], receivedEvent["status"], fmt.Sprintf("%s - expected status: %s, got: %s", tc.desc, tc.event["status"], receivedEvent["status"])) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"], fmt.Sprintf("%s - expected timestamp: %s, got: %s", tc.desc, tc.event["timestamp"], receivedEvent["timestamp"])) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"], fmt.Sprintf("%s - expected operation: %s, got: %s", tc.desc, tc.event["operation"], receivedEvent["operation"])) - - default: - assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) - } - } -} - -func TestUnavailablePublish(t *testing.T) { - _, err := nats.NewPublisher(ctx, "http://invaliurl.com", stream) - assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - - publisher, err := nats.NewPublisher(ctx, natsURL, stream) - assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - - err = pool.Client.PauseContainer(container.Container.ID) - assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err)) - - spawnGoroutines(publisher, t) - - err = pool.Client.UnpauseContainer(container.Container.ID) - assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err)) - - // Wait for the events to be published. - time.Sleep(events.UnpublishedEventsCheckInterval) - - err = publisher.Close() - assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err)) -} - -func generateRandomEvent() testEvent { - return testEvent{ - Data: map[string]interface{}{ - "temperature": fmt.Sprintf("%f", rand.Float64()), - "humidity": fmt.Sprintf("%f", rand.Float64()), - "sensor_id": fmt.Sprintf("%d", rand.Intn(1000)), - "location": fmt.Sprintf("%f", rand.Float64()), - "status": fmt.Sprintf("%d", rand.Intn(1000)), - "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), - "operation": "create", - }, - } -} - -func spawnGoroutines(publisher events.Publisher, t *testing.T) { - for i := 0; i < 1e4; i++ { - go func() { - for i := 0; i < 10; i++ { - event := generateRandomEvent() - err := publisher.Publish(ctx, event) - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + t.Run(tc.desc, func(t *testing.T) { + event := testEvent{Data: tc.event} + + err := publisher.Publish(ctx, event) + switch tc.err { + case nil: + receivedEvent := <-eventsChan + + val := int64(receivedEvent["occurred_at"].(float64)) + if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(tc.event, "occurred_at") + } + + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, tc.event["status"], receivedEvent["status"]) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) + + default: + assert.ErrorContains(t, err, tc.err.Error()) } - }() + }) } } func TestPubsub(t *testing.T) { - subcases := []struct { - desc string - stream string - consumer string - errorMessage error - handler events.EventHandler + cases := []struct { + desc string + stream string + consumer string + err error + handler events.EventHandler }{ { - desc: "Subscribe to a stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to a stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to the same stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with an empty consumer", - stream: "", - consumer: "", - errorMessage: nats.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + err: nats.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with a valid consumer", - stream: "", - consumer: consumer, - errorMessage: nats.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + err: nats.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to a valid stream with an empty consumer", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: "", - errorMessage: nats.ErrEmptyConsumer, - handler: handler{false}, + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("events.%s", stream), + consumer: "", + err: nats.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to another stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to another stream", + stream: fmt.Sprintf("events.%s.%d", stream, 1), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to a stream with malformed handler", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{true}, + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{true}, }, } - for _, pc := range subcases { - subcriber, err := nats.NewSubscriber(ctx, natsURL, logger) - if err != nil { - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - - continue - } + for _, pc := range cases { + t.Run(pc.desc, func(t *testing.T) { + subcriber, err := nats.NewSubscriber(ctx, natsURL, logger) + if err != nil { + assert.Equal(t, err, pc.err) - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + return + } - cfg := events.SubscriberConfig{ - Stream: pc.stream, - Consumer: pc.consumer, - Handler: pc.handler, - } - switch err := subcriber.Subscribe(context.TODO(), cfg); { - case err == nil: - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - default: - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - } + cfg := events.SubscriberConfig{ + Stream: pc.stream, + Consumer: pc.consumer, + Handler: pc.handler, + } + switch err := subcriber.Subscribe(context.TODO(), cfg); { + case err == nil: + assert.Nil(t, err) + default: + assert.Equal(t, err, pc.err) + } - err = subcriber.Close() - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + err = subcriber.Close() + assert.Nil(t, err) + }) } } diff --git a/pkg/events/nats/setup_test.go b/pkg/events/nats/setup_test.go index 19e5a3fb7a5..c7c36a61c0c 100644 --- a/pkg/events/nats/setup_test.go +++ b/pkg/events/nats/setup_test.go @@ -17,22 +17,19 @@ import ( ) var ( - natsURL string - stream = "tests.events" - consumer = "tests-consumer" - ctx = context.Background() - pool *dockertest.Pool - container *dockertest.Resource + natsURL string + stream = "tests.events" + consumer = "tests-consumer" + ctx = context.Background() ) func TestMain(m *testing.M) { - var err error - pool, err = dockertest.NewPool("") + pool, err := dockertest.NewPool("") if err != nil { log.Fatalf("Could not connect to docker: %s", err) } - container, err = pool.RunWithOptions(&dockertest.RunOptions{ + container, err := pool.RunWithOptions(&dockertest.RunOptions{ Name: "test-nats-events", Repository: "nats", Tag: "2.9.21-alpine", diff --git a/pkg/events/nats/subscriber.go b/pkg/events/nats/subscriber.go index 361c5246473..1bf074349f5 100644 --- a/pkg/events/nats/subscriber.go +++ b/pkg/events/nats/subscriber.go @@ -18,9 +18,7 @@ import ( "github.com/nats-io/nats.go/jetstream" ) -const ( - maxReconnects = -1 -) +const maxReconnects = -1 var _ events.Subscriber = (*subEventStore)(nil) @@ -88,7 +86,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberCon subCfg := messaging.SubscriberConfig{ ID: cfg.Consumer, - Topic: eventsPrefix + "." + cfg.Stream, + Topic: cfg.Stream, Handler: &eventHandler{ handler: cfg.Handler, ctx: ctx, @@ -129,7 +127,7 @@ func (eh *eventHandler) Handle(msg *messaging.Message) error { } if err := eh.handler.Handle(eh.ctx, event); err != nil { - eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err)) + eh.logger.Warn(fmt.Sprintf("failed to handle nats event: %s", err)) } return nil diff --git a/pkg/events/rabbitmq/publisher_test.go b/pkg/events/rabbitmq/publisher_test.go index e69bb5fb2df..81357ecd130 100644 --- a/pkg/events/rabbitmq/publisher_test.go +++ b/pkg/events/rabbitmq/publisher_test.go @@ -19,10 +19,9 @@ import ( ) var ( - streamTopic = "test-topic" - eventsChan = make(chan map[string]interface{}) - logger = mglog.NewMock() - errFailed = errors.New("failed") + eventsChan = make(chan map[string]interface{}) + logger = mglog.NewMock() + errFailed = errors.New("failed") ) type testEvent struct { @@ -52,15 +51,17 @@ func (te testEvent) Encode() (map[string]interface{}, error) { func TestPublish(t *testing.T) { publisher, err := rabbitmq.NewPublisher(ctx, rabbitmqURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer publisher.Close() _, err = rabbitmq.NewSubscriber("http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer subcriber.Close() cfg := events.SubscriberConfig{ - Stream: stream, + Stream: "events." + stream, Consumer: consumer, Handler: handler{}, } @@ -124,166 +125,120 @@ func TestPublish(t *testing.T) { }, } - for _, tc := range cases { - event := testEvent{Data: tc.event} - - err := publisher.Publish(ctx, event) - switch tc.err { - case nil: - assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) - - receivedEvent := <-eventsChan - - val := int64(receivedEvent["occurred_at"].(float64)) - if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { - delete(receivedEvent, "occurred_at") - delete(tc.event, "occurred_at") + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + event := testEvent{Data: c.event} + + err := publisher.Publish(ctx, event) + switch c.err { + case nil: + receivedEvent := <-eventsChan + + val := int64(receivedEvent["occurred_at"].(float64)) + if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(c.event, "occurred_at") + } + + assert.Equal(t, c.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, c.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, c.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, c.event["status"], receivedEvent["status"]) + assert.Equal(t, c.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, c.event["operation"], receivedEvent["operation"]) + + default: + assert.ErrorContains(t, err, c.err.Error()) } - - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"], fmt.Sprintf("%s - expected temperature: %s, got: %s", tc.desc, tc.event["temperature"], receivedEvent["temperature"])) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"], fmt.Sprintf("%s - expected humidity: %s, got: %s", tc.desc, tc.event["humidity"], receivedEvent["humidity"])) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"], fmt.Sprintf("%s - expected sensor_id: %s, got: %s", tc.desc, tc.event["sensor_id"], receivedEvent["sensor_id"])) - assert.Equal(t, tc.event["status"], receivedEvent["status"], fmt.Sprintf("%s - expected status: %s, got: %s", tc.desc, tc.event["status"], receivedEvent["status"])) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"], fmt.Sprintf("%s - expected timestamp: %s, got: %s", tc.desc, tc.event["timestamp"], receivedEvent["timestamp"])) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"], fmt.Sprintf("%s - expected operation: %s, got: %s", tc.desc, tc.event["operation"], receivedEvent["operation"])) - - default: - assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) - } - } -} - -func TestUnavailablePublish(t *testing.T) { - _, err := rabbitmq.NewPublisher(ctx, "http://invaliurl.com", stream) - assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - - publisher, err := rabbitmq.NewPublisher(ctx, rabbitmqURL, stream) - assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - - err = pool.Client.PauseContainer(container.Container.ID) - assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err)) - - spawnGoroutines(publisher, t) - - err = pool.Client.UnpauseContainer(container.Container.ID) - assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err)) - - // Wait for the events to be published. - time.Sleep(2 * events.UnpublishedEventsCheckInterval) - - err = publisher.Close() - assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err)) -} - -func generateRandomEvent() testEvent { - return testEvent{ - Data: map[string]interface{}{ - "temperature": fmt.Sprintf("%f", rand.Float64()), - "humidity": fmt.Sprintf("%f", rand.Float64()), - "sensor_id": fmt.Sprintf("%d", rand.Intn(1000)), - "location": fmt.Sprintf("%f", rand.Float64()), - "status": fmt.Sprintf("%d", rand.Intn(1000)), - "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), - "operation": "create", - }, - } -} - -func spawnGoroutines(publisher events.Publisher, t *testing.T) { - for i := 0; i < 1e4; i++ { - go func() { - for i := 0; i < 10; i++ { - event := generateRandomEvent() - err := publisher.Publish(ctx, event) - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - } - }() + }) } } func TestPubsub(t *testing.T) { - subcases := []struct { - desc string - stream string - consumer string - errorMessage error - handler events.EventHandler + cases := []struct { + desc string + stream string + consumer string + err error + handler events.EventHandler }{ { - desc: "Subscribe to a stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to a stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to the same stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with an empty consumer", - stream: "", - consumer: "", - errorMessage: rabbitmq.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + err: rabbitmq.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with a valid consumer", - stream: "", - consumer: consumer, - errorMessage: rabbitmq.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + err: rabbitmq.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to a valid stream with an empty consumer", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: "", - errorMessage: rabbitmq.ErrEmptyConsumer, - handler: handler{false}, + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("events.%s", stream), + consumer: "", + err: rabbitmq.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to another stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to another stream", + stream: fmt.Sprintf("events.%s.%d", stream, 1), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to a stream with malformed handler", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{true}, + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{true}, }, } - for _, pc := range subcases { - subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) - if err != nil { - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) + if err != nil { + assert.Equal(t, err, c.err) - continue - } + return + } - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + assert.Nil(t, err) - cfg := events.SubscriberConfig{ - Stream: pc.stream, - Consumer: pc.consumer, - Handler: pc.handler, - } - switch err := subcriber.Subscribe(ctx, cfg); { - case err == nil: - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - default: - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - } + cfg := events.SubscriberConfig{ + Stream: c.stream, + Consumer: c.consumer, + Handler: c.handler, + } + switch err := subcriber.Subscribe(ctx, cfg); { + case err == nil: + assert.Nil(t, err) + default: + assert.Equal(t, err, c.err) + } - err = subcriber.Close() - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + err = subcriber.Close() + assert.Nil(t, err) + }) } } diff --git a/pkg/events/rabbitmq/setup_test.go b/pkg/events/rabbitmq/setup_test.go index bbc1423fa2c..904fa3a564f 100644 --- a/pkg/events/rabbitmq/setup_test.go +++ b/pkg/events/rabbitmq/setup_test.go @@ -21,13 +21,10 @@ var ( stream = "tests.events" consumer = "tests-consumer" ctx = context.TODO() - pool = &dockertest.Pool{} - container = &dockertest.Resource{} ) func TestMain(m *testing.M) { - var err error - pool, err = dockertest.NewPool("") + pool, err := dockertest.NewPool("") if err != nil { log.Fatalf("Could not connect to docker: %s", err) } @@ -37,7 +34,7 @@ func TestMain(m *testing.M) { Repository: "rabbitmq", Tag: "3.9.20", } - container, err = pool.RunWithOptions(&opts) + container, err := pool.RunWithOptions(&opts) if err != nil { log.Fatalf("Could not start container: %s", err) } diff --git a/pkg/events/rabbitmq/subscriber.go b/pkg/events/rabbitmq/subscriber.go index af758e4cd82..e2e0ee7262b 100644 --- a/pkg/events/rabbitmq/subscriber.go +++ b/pkg/events/rabbitmq/subscriber.go @@ -70,7 +70,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberCon subCfg := messaging.SubscriberConfig{ ID: cfg.Consumer, - Topic: eventsPrefix + "." + cfg.Stream, + Topic: cfg.Stream, Handler: &eventHandler{ handler: cfg.Handler, ctx: ctx, @@ -111,7 +111,7 @@ func (eh *eventHandler) Handle(msg *messaging.Message) error { } if err := eh.handler.Handle(eh.ctx, event); err != nil { - eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err)) + eh.logger.Warn(fmt.Sprintf("failed to handle rabbitmq event: %s", err)) } return nil diff --git a/pkg/events/redis/publisher_test.go b/pkg/events/redis/publisher_test.go index b669a235a55..9c1a8c78cfc 100644 --- a/pkg/events/redis/publisher_test.go +++ b/pkg/events/redis/publisher_test.go @@ -23,13 +23,12 @@ import ( ) var ( - streamName = "magistrala.eventstest" - consumer = "test-consumer" - streamTopic = "test-topic" - eventsChan = make(chan map[string]interface{}) - logger = mglog.NewMock() - errFailed = errors.New("failed") - ctx = context.TODO() + stream = "tests.events" + consumer = "test-consumer" + eventsChan = make(chan map[string]interface{}) + logger = mglog.NewMock() + errFailed = errors.New("failed") + ctx = context.TODO() ) type testEvent struct { @@ -60,17 +59,19 @@ func TestPublish(t *testing.T) { err := redisClient.FlushAll(ctx).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) - publisher, err := redis.NewPublisher(ctx, redisURL, streamName) + publisher, err := redis.NewPublisher(ctx, redisURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer publisher.Close() subcriber, err := redis.NewSubscriber("http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) subcriber, err = redis.NewSubscriber(redisURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer subcriber.Close() cfg := events.SubscriberConfig{ - Stream: streamName, + Stream: "events." + stream, Consumer: consumer, Handler: handler{}, } @@ -134,33 +135,35 @@ func TestPublish(t *testing.T) { }, } - for _, tc := range cases { - event := testEvent{Data: tc.event} + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + event := testEvent{Data: c.event} - err := publisher.Publish(ctx, event) - switch tc.err { - case nil: - assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) + err := publisher.Publish(ctx, event) + switch c.err { + case nil: + assert.Nil(t, err) - receivedEvent := <-eventsChan + receivedEvent := <-eventsChan - roa, err := strconv.ParseInt(receivedEvent["occurred_at"].(string), 10, 64) - assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) - if assert.WithinRange(t, time.Unix(0, roa), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { - delete(receivedEvent, "occurred_at") - delete(tc.event, "occurred_at") - } + roa, err := strconv.ParseInt(receivedEvent["occurred_at"].(string), 10, 64) + assert.Nil(t, err) + if assert.WithinRange(t, time.Unix(0, roa), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(c.event, "occurred_at") + } - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"], fmt.Sprintf("%s - expected temperature: %s, got: %s", tc.desc, tc.event["temperature"], receivedEvent["temperature"])) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"], fmt.Sprintf("%s - expected humidity: %s, got: %s", tc.desc, tc.event["humidity"], receivedEvent["humidity"])) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"], fmt.Sprintf("%s - expected sensor_id: %s, got: %s", tc.desc, tc.event["sensor_id"], receivedEvent["sensor_id"])) - assert.Equal(t, tc.event["status"], receivedEvent["status"], fmt.Sprintf("%s - expected status: %s, got: %s", tc.desc, tc.event["status"], receivedEvent["status"])) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"], fmt.Sprintf("%s - expected timestamp: %s, got: %s", tc.desc, tc.event["timestamp"], receivedEvent["timestamp"])) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"], fmt.Sprintf("%s - expected operation: %s, got: %s", tc.desc, tc.event["operation"], receivedEvent["operation"])) + assert.Equal(t, c.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, c.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, c.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, c.event["status"], receivedEvent["status"]) + assert.Equal(t, c.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, c.event["operation"], receivedEvent["operation"]) - default: - assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) - } + default: + assert.ErrorContains(t, err, c.err.Error()) + } + }) } } @@ -168,88 +171,88 @@ func TestPubsub(t *testing.T) { err := redisClient.FlushAll(ctx).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) - subcases := []struct { - desc string - stream string - consumer string - errorMessage error - handler events.EventHandler + cases := []struct { + desc string + stream string + consumer string + err error + handler events.EventHandler }{ { - desc: "Subscribe to a stream", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to a stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to the same stream", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with an empty consumer", - stream: "", - consumer: "", - errorMessage: redis.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + err: redis.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with a valid consumer", - stream: "", - consumer: consumer, - errorMessage: redis.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + err: redis.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to a valid stream with an empty consumer", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: "", - errorMessage: redis.ErrEmptyConsumer, - handler: handler{false}, + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("events.%s", stream), + consumer: "", + err: redis.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to another stream", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic+"1"), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to another stream", + stream: fmt.Sprintf("events.%s.%d", stream, 1), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to a stream with malformed handler", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{true}, + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{true}, }, } - for _, pc := range subcases { - subcriber, err := redis.NewSubscriber(redisURL, logger) - if err != nil { - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - - continue - } + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + subcriber, err := redis.NewSubscriber(redisURL, logger) + if err != nil { + assert.Equal(t, err, c.err) - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + return + } - cfg := events.SubscriberConfig{ - Stream: pc.stream, - Consumer: pc.consumer, - Handler: pc.handler, - } - switch err := subcriber.Subscribe(context.TODO(), cfg); { - case err == nil: - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - default: - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - } + cfg := events.SubscriberConfig{ + Stream: c.stream, + Consumer: c.consumer, + Handler: c.handler, + } + switch err := subcriber.Subscribe(context.TODO(), cfg); { + case err == nil: + assert.Nil(t, err) + default: + assert.Equal(t, err, c.err) + } - err = subcriber.Close() - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + err = subcriber.Close() + assert.Nil(t, err) + }) } } diff --git a/pkg/events/redis/subscriber.go b/pkg/events/redis/subscriber.go index f76a8eae8c0..3edb2a79067 100644 --- a/pkg/events/redis/subscriber.go +++ b/pkg/events/redis/subscriber.go @@ -71,7 +71,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberCon Count: eventCount, }).Result() if err != nil { - es.logger.Warn(fmt.Sprintf("failed to read from Redis stream: %s", err)) + es.logger.Warn(fmt.Sprintf("failed to read from redis stream: %s", err)) continue }