diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go index 0bdfa3b693f..ce7b92e0d0e 100644 --- a/cmd/bootstrap/main.go +++ b/cmd/bootstrap/main.go @@ -46,7 +46,7 @@ const ( defDB = "bootstrap" defSvcHTTPPort = "9013" - thingsStream = "magistrala.things" + thingsStream = "events.magistrala.things" streamID = "magistrala.bootstrap" ) diff --git a/cmd/lora/main.go b/cmd/lora/main.go index ed45c3721a2..78105c3e299 100644 --- a/cmd/lora/main.go +++ b/cmd/lora/main.go @@ -45,7 +45,7 @@ const ( thingsRMPrefix = "thing" channelsRMPrefix = "channel" connsRMPrefix = "connection" - thingsStream = "magistrala.things" + thingsStream = "events.magistrala.things" ) type config struct { diff --git a/cmd/opcua/main.go b/cmd/opcua/main.go index f38da90330c..54607d0dd90 100644 --- a/cmd/opcua/main.go +++ b/cmd/opcua/main.go @@ -44,7 +44,7 @@ const ( channelsRMPrefix = "channel" connectionRMPrefix = "connection" - thingsStream = "magistrala.things" + thingsStream = "events.magistrala.things" ) type config struct { diff --git a/pkg/events/mocks/subscriber.go b/pkg/events/mocks/subscriber.go index 38d40023870..4de1ceb50f1 100644 --- a/pkg/events/mocks/subscriber.go +++ b/pkg/events/mocks/subscriber.go @@ -34,17 +34,17 @@ func (_m *Subscriber) Close() error { return r0 } -// Subscribe provides a mock function with given fields: ctx, handler -func (_m *Subscriber) Subscribe(ctx context.Context, handler events.EventHandler) error { - ret := _m.Called(ctx, handler) +// Subscribe provides a mock function with given fields: ctx, cfg +func (_m *Subscriber) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error { + ret := _m.Called(ctx, cfg) if len(ret) == 0 { panic("no return value specified for Subscribe") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, events.EventHandler) error); ok { - r0 = rf(ctx, handler) + if rf, ok := ret.Get(0).(func(context.Context, events.SubscriberConfig) error); ok { + r0 = rf(ctx, cfg) } else { r0 = ret.Error(0) } diff --git a/pkg/events/nats/publisher_test.go b/pkg/events/nats/publisher_test.go index 30f14cab399..a53d855d0a3 100644 --- a/pkg/events/nats/publisher_test.go +++ b/pkg/events/nats/publisher_test.go @@ -19,11 +19,10 @@ import ( ) var ( - streamTopic = "test-topic" - eventsChan = make(chan map[string]interface{}) - logger = mglog.NewMock() - errFailed = errors.New("failed") - numEvents = 100 + eventsChan = make(chan map[string]interface{}) + logger = mglog.NewMock() + errFailed = errors.New("failed") + numEvents = 100 ) type testEvent struct { @@ -56,15 +55,17 @@ func TestPublish(t *testing.T) { publisher, err := nats.NewPublisher(context.Background(), natsURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer publisher.Close() _, err = nats.NewSubscriber(context.Background(), "http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer subcriber.Close() cfg := events.SubscriberConfig{ - Stream: stream, + Stream: "events." + stream, Consumer: consumer, Handler: handler{}, } @@ -129,113 +130,117 @@ func TestPublish(t *testing.T) { } for _, tc := range cases { - event := testEvent{Data: tc.event} - - err := publisher.Publish(context.Background(), event) - switch tc.err { - case nil: - receivedEvent := <-eventsChan - - val := int64(receivedEvent["occurred_at"].(float64)) - if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { - delete(receivedEvent, "occurred_at") - delete(tc.event, "occurred_at") + t.Run(tc.desc, func(t *testing.T) { + event := testEvent{Data: tc.event} + + err := publisher.Publish(context.Background(), event) + switch tc.err { + case nil: + receivedEvent := <-eventsChan + + val := int64(receivedEvent["occurred_at"].(float64)) + if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(tc.event, "occurred_at") + } + + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, tc.event["status"], receivedEvent["status"]) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) + + default: + assert.ErrorContains(t, err, tc.err.Error()) } - - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) - assert.Equal(t, tc.event["status"], receivedEvent["status"]) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) - default: - assert.ErrorContains(t, err, tc.err.Error()) - } + }) } } func TestPubsub(t *testing.T) { - subcases := []struct { - desc string - stream string - consumer string - errorMessage error - handler events.EventHandler + cases := []struct { + desc string + stream string + consumer string + err error + handler events.EventHandler }{ { - desc: "Subscribe to a stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to a stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to the same stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with an empty consumer", - stream: "", - consumer: "", - errorMessage: nats.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + err: nats.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with a valid consumer", - stream: "", - consumer: consumer, - errorMessage: nats.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + err: nats.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to a valid stream with an empty consumer", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: "", - errorMessage: nats.ErrEmptyConsumer, - handler: handler{false}, + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("events.%s", stream), + consumer: "", + err: nats.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to another stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to another stream", + stream: fmt.Sprintf("events.%s.%d", stream, 1), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to a stream with malformed handler", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{true}, + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{true}, }, } - for _, pc := range subcases { - subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger) - if err != nil { - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + for _, pc := range cases { + t.Run(pc.desc, func(t *testing.T) { + subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger) + if err != nil { + assert.Equal(t, err, pc.err) - continue - } - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + return + } - cfg := events.SubscriberConfig{ - Stream: pc.stream, - Consumer: pc.consumer, - Handler: pc.handler, - } - switch err := subcriber.Subscribe(context.Background(), cfg); { - case err == nil: - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - default: - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - } + cfg := events.SubscriberConfig{ + Stream: pc.stream, + Consumer: pc.consumer, + Handler: pc.handler, + } + switch err := subcriber.Subscribe(context.Background(), cfg); { + case err == nil: + assert.Nil(t, err) + default: + assert.Equal(t, err, pc.err) + } - err = subcriber.Close() - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + err = subcriber.Close() + assert.Nil(t, err) + }) } } diff --git a/pkg/events/nats/subscriber.go b/pkg/events/nats/subscriber.go index 8df96070775..182f4260235 100644 --- a/pkg/events/nats/subscriber.go +++ b/pkg/events/nats/subscriber.go @@ -18,9 +18,7 @@ import ( "github.com/nats-io/nats.go/jetstream" ) -const ( - maxReconnects = -1 -) +const maxReconnects = -1 var _ events.Subscriber = (*subEventStore)(nil) @@ -88,7 +86,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberCon subCfg := messaging.SubscriberConfig{ ID: cfg.Consumer, - Topic: eventsPrefix + "." + cfg.Stream, + Topic: cfg.Stream, Handler: &eventHandler{ handler: cfg.Handler, ctx: ctx, @@ -129,7 +127,7 @@ func (eh *eventHandler) Handle(msg *messaging.Message) error { } if err := eh.handler.Handle(eh.ctx, event); err != nil { - eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err)) + eh.logger.Warn(fmt.Sprintf("failed to handle nats event: %s", err)) } return nil diff --git a/pkg/events/rabbitmq/publisher_test.go b/pkg/events/rabbitmq/publisher_test.go index e8a2c65de84..15763ac1cb2 100644 --- a/pkg/events/rabbitmq/publisher_test.go +++ b/pkg/events/rabbitmq/publisher_test.go @@ -19,11 +19,10 @@ import ( ) var ( - streamTopic = "test-topic" - eventsChan = make(chan map[string]interface{}) - logger = mglog.NewMock() - errFailed = errors.New("failed") - numEvents = 100 + eventsChan = make(chan map[string]interface{}) + logger = mglog.NewMock() + errFailed = errors.New("failed") + numEvents = 100 ) type testEvent struct { @@ -56,15 +55,17 @@ func TestPublish(t *testing.T) { publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer publisher.Close() _, err = rabbitmq.NewSubscriber("http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer subcriber.Close() cfg := events.SubscriberConfig{ - Stream: stream, + Stream: "events." + stream, Consumer: consumer, Handler: handler{}, } @@ -128,115 +129,120 @@ func TestPublish(t *testing.T) { }, } - for _, tc := range cases { - event := testEvent{Data: tc.event} - - err := publisher.Publish(context.Background(), event) - switch tc.err { - case nil: - receivedEvent := <-eventsChan - - val := int64(receivedEvent["occurred_at"].(float64)) - if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { - delete(receivedEvent, "occurred_at") - delete(tc.event, "occurred_at") + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + event := testEvent{Data: c.event} + + err := publisher.Publish(context.Background(), event) + switch c.err { + case nil: + receivedEvent := <-eventsChan + + val := int64(receivedEvent["occurred_at"].(float64)) + if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(c.event, "occurred_at") + } + + assert.Equal(t, c.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, c.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, c.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, c.event["status"], receivedEvent["status"]) + assert.Equal(t, c.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, c.event["operation"], receivedEvent["operation"]) + + default: + assert.ErrorContains(t, err, c.err.Error()) } - - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) - assert.Equal(t, tc.event["status"], receivedEvent["status"]) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) - - default: - assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) - } + }) } } func TestPubsub(t *testing.T) { - subcases := []struct { - desc string - stream string - consumer string - errorMessage error - handler events.EventHandler + cases := []struct { + desc string + stream string + consumer string + err error + handler events.EventHandler }{ { - desc: "Subscribe to a stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to a stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to the same stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with an empty consumer", - stream: "", - consumer: "", - errorMessage: rabbitmq.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + err: rabbitmq.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with a valid consumer", - stream: "", - consumer: consumer, - errorMessage: rabbitmq.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + err: rabbitmq.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to a valid stream with an empty consumer", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: "", - errorMessage: rabbitmq.ErrEmptyConsumer, - handler: handler{false}, + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("events.%s", stream), + consumer: "", + err: rabbitmq.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to another stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to another stream", + stream: fmt.Sprintf("events.%s.%d", stream, 1), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to a stream with malformed handler", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{true}, + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{true}, }, } - for _, pc := range subcases { - subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) - if err != nil { - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) + if err != nil { + assert.Equal(t, err, c.err) - continue - } - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + return + } - cfg := events.SubscriberConfig{ - Stream: pc.stream, - Consumer: pc.consumer, - Handler: pc.handler, - } - switch err := subcriber.Subscribe(context.Background(), cfg); { - case err == nil: - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - default: - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - } + assert.Nil(t, err) + + cfg := events.SubscriberConfig{ + Stream: c.stream, + Consumer: c.consumer, + Handler: c.handler, + } + switch err := subcriber.Subscribe(context.Background(), cfg); { + case err == nil: + assert.Nil(t, err) + default: + assert.Equal(t, err, c.err) + } - err = subcriber.Close() - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + err = subcriber.Close() + assert.Nil(t, err) + }) } } diff --git a/pkg/events/rabbitmq/subscriber.go b/pkg/events/rabbitmq/subscriber.go index 4c78ad88d0e..dab686c103d 100644 --- a/pkg/events/rabbitmq/subscriber.go +++ b/pkg/events/rabbitmq/subscriber.go @@ -70,7 +70,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberCon subCfg := messaging.SubscriberConfig{ ID: cfg.Consumer, - Topic: eventsPrefix + "." + cfg.Stream, + Topic: cfg.Stream, Handler: &eventHandler{ handler: cfg.Handler, ctx: ctx, @@ -111,7 +111,7 @@ func (eh *eventHandler) Handle(msg *messaging.Message) error { } if err := eh.handler.Handle(eh.ctx, event); err != nil { - eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err)) + eh.logger.Warn(fmt.Sprintf("failed to handle rabbitmq event: %s", err)) } return nil diff --git a/pkg/events/redis/publisher.go b/pkg/events/redis/publisher.go index a6f5c90acaf..57f74758c3d 100644 --- a/pkg/events/redis/publisher.go +++ b/pkg/events/redis/publisher.go @@ -1,9 +1,6 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !nats && !rabbitmq -// +build !nats,!rabbitmq - package redis import ( diff --git a/pkg/events/redis/publisher_test.go b/pkg/events/redis/publisher_test.go index cad5fd7a89f..2b503e7372c 100644 --- a/pkg/events/redis/publisher_test.go +++ b/pkg/events/redis/publisher_test.go @@ -1,9 +1,6 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !nats && !rabbitmq -// +build !nats,!rabbitmq - package redis_test import ( @@ -23,13 +20,13 @@ import ( ) var ( - streamName = "magistrala.eventstest" - consumer = "test-consumer" - streamTopic = "test-topic" - eventsChan = make(chan map[string]interface{}) - logger = mglog.NewMock() - errFailed = errors.New("failed") - numEvents = 100 + streamName = "magistrala.eventstest" + consumer = "test-consumer" + eventsChan = make(chan map[string]interface{}) + logger = mglog.NewMock() + errFailed = errors.New("failed") + numEvents = 100 + stream = "tests.events" ) type testEvent struct { @@ -65,15 +62,17 @@ func TestPublish(t *testing.T) { publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName, events.UnpublishedEventsCheckInterval) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer publisher.Close() _, err = redis.NewSubscriber("http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) subcriber, err := redis.NewSubscriber(redisURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer subcriber.Close() cfg := events.SubscriberConfig{ - Stream: streamName, + Stream: "events." + stream, Consumer: consumer, Handler: handler{}, } @@ -137,31 +136,35 @@ func TestPublish(t *testing.T) { }, } - for _, tc := range cases { - event := testEvent{Data: tc.event} - - err := publisher.Publish(context.Background(), event) - switch tc.err { - case nil: - receivedEvent := <-eventsChan - - roa, err := strconv.ParseInt(receivedEvent["occurred_at"].(string), 10, 64) - assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) - if assert.WithinRange(t, time.Unix(0, roa), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { - delete(receivedEvent, "occurred_at") - delete(tc.event, "occurred_at") + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + event := testEvent{Data: c.event} + + err := publisher.Publish(context.Background(), event) + switch c.err { + case nil: + assert.Nil(t, err) + + receivedEvent := <-eventsChan + + roa, err := strconv.ParseInt(receivedEvent["occurred_at"].(string), 10, 64) + assert.Nil(t, err) + if assert.WithinRange(t, time.Unix(0, roa), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(c.event, "occurred_at") + } + + assert.Equal(t, c.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, c.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, c.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, c.event["status"], receivedEvent["status"]) + assert.Equal(t, c.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, c.event["operation"], receivedEvent["operation"]) + + default: + assert.ErrorContains(t, err, c.err.Error()) } - - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) - assert.Equal(t, tc.event["status"], receivedEvent["status"]) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) - - default: - assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) - } + }) } } @@ -169,88 +172,88 @@ func TestPubsub(t *testing.T) { err := redisClient.FlushAll(context.Background()).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) - subcases := []struct { - desc string - stream string - consumer string - errorMessage error - handler events.EventHandler + cases := []struct { + desc string + stream string + consumer string + err error + handler events.EventHandler }{ { - desc: "Subscribe to a stream", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to a stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to the same stream", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with an empty consumer", - stream: "", - consumer: "", - errorMessage: redis.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + err: redis.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with a valid consumer", - stream: "", - consumer: consumer, - errorMessage: redis.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + err: redis.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to a valid stream with an empty consumer", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: "", - errorMessage: redis.ErrEmptyConsumer, - handler: handler{false}, + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("events.%s", stream), + consumer: "", + err: redis.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to another stream", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic+"1"), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to another stream", + stream: fmt.Sprintf("events.%s.%d", stream, 1), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to a stream with malformed handler", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{true}, + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{true}, }, } - for _, pc := range subcases { - subcriber, err := redis.NewSubscriber(redisURL, logger) - if err != nil { - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - - continue - } + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + subcriber, err := redis.NewSubscriber(redisURL, logger) + if err != nil { + assert.Equal(t, err, c.err) - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + return + } - cfg := events.SubscriberConfig{ - Stream: pc.stream, - Consumer: pc.consumer, - Handler: pc.handler, - } - switch err := subcriber.Subscribe(context.Background(), cfg); { - case err == nil: - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - default: - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - } + cfg := events.SubscriberConfig{ + Stream: c.stream, + Consumer: c.consumer, + Handler: c.handler, + } + switch err := subcriber.Subscribe(context.TODO(), cfg); { + case err == nil: + assert.Nil(t, err) + default: + assert.Equal(t, err, c.err) + } - err = subcriber.Close() - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + err = subcriber.Close() + assert.Nil(t, err) + }) } } diff --git a/pkg/events/redis/setup_test.go b/pkg/events/redis/setup_test.go index 541cb2a3d17..719e0996c3c 100644 --- a/pkg/events/redis/setup_test.go +++ b/pkg/events/redis/setup_test.go @@ -1,9 +1,6 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !nats && !rabbitmq -// +build !nats,!rabbitmq - package redis_test import ( diff --git a/pkg/events/redis/subscriber.go b/pkg/events/redis/subscriber.go index ad3ce62f1d6..43898075b30 100644 --- a/pkg/events/redis/subscriber.go +++ b/pkg/events/redis/subscriber.go @@ -1,9 +1,6 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !nats && !rabbitmq -// +build !nats,!rabbitmq - package redis import ( @@ -71,7 +68,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberCon Count: eventCount, }).Result() if err != nil { - es.logger.Warn(fmt.Sprintf("failed to read from Redis stream: %s", err)) + es.logger.Warn(fmt.Sprintf("failed to read from redis stream: %s", err)) continue }