Skip to content

Commit

Permalink
refactor(event): Add subscriber config
Browse files Browse the repository at this point in the history
This commit adds event subscriber which combines stream, consumer and handler to a single struct to the codebase. The code is related to event handling, subscribing to event stores, and using message brokers like RabbitMQ, Redis, and NATS.

Signed-off-by: Rodney Osodo <[email protected]>
  • Loading branch information
rodneyosodo committed Feb 12, 2024
1 parent e77d4ea commit 22bf1dc
Show file tree
Hide file tree
Showing 15 changed files with 173 additions and 122 deletions.
16 changes: 10 additions & 6 deletions cmd/bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
httpserver "github.com/absmach/magistrala/internal/server/http"
mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/pkg/auth"
"github.com/absmach/magistrala/pkg/events"
"github.com/absmach/magistrala/pkg/events/store"
mgsdk "github.com/absmach/magistrala/pkg/sdk/go"
"github.com/absmach/magistrala/pkg/uuid"
Expand Down Expand Up @@ -142,6 +143,8 @@ func main() {
return
}

logger.Info("Subscribed to Event Store")

httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
Expand Down Expand Up @@ -196,14 +199,15 @@ func newService(ctx context.Context, authClient magistrala.AuthServiceClient, db
}

func subscribeToThingsES(ctx context.Context, svc bootstrap.Service, cfg config, logger *slog.Logger) error {
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger)
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger)
if err != nil {
return err
}

handler := consumer.NewEventHandler(svc)

logger.Info("Subscribed to Redis Event Store")

return subscriber.Subscribe(ctx, handler)
subConfig := events.SubscriberConfig{
Stream: thingsStream,
Consumer: cfg.ESConsumerName,
Handler: consumer.NewEventHandler(svc),
}
return subscriber.Subscribe(ctx, subConfig)
}
20 changes: 12 additions & 8 deletions cmd/lora/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/lora"
"github.com/absmach/magistrala/lora/api"
"github.com/absmach/magistrala/lora/events"
loraevents "github.com/absmach/magistrala/lora/events"
"github.com/absmach/magistrala/lora/mqtt"
"github.com/absmach/magistrala/pkg/events"
"github.com/absmach/magistrala/pkg/events/store"
"github.com/absmach/magistrala/pkg/messaging"
"github.com/absmach/magistrala/pkg/messaging/brokers"
Expand Down Expand Up @@ -147,6 +148,8 @@ func main() {
return
}

logger.Info("Subscribed to Event Store")

hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(cfg.InstanceID), logger)

if cfg.SendTelemetry {
Expand Down Expand Up @@ -198,21 +201,22 @@ func subscribeToLoRaBroker(svc lora.Service, mc mqttpaho.Client, timeout time.Du
}

func subscribeToThingsES(ctx context.Context, svc lora.Service, cfg config, logger *slog.Logger) error {
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger)
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger)
if err != nil {
return err
}

handler := events.NewEventHandler(svc)

logger.Info("Subscribed to Redis Event Store")

return subscriber.Subscribe(ctx, handler)
subConfig := events.SubscriberConfig{
Stream: thingsStream,
Consumer: cfg.ESConsumerName,
Handler: loraevents.NewEventHandler(svc),
}
return subscriber.Subscribe(ctx, subConfig)
}

func newRouteMapRepository(client *redis.Client, prefix string, logger *slog.Logger) lora.RouteMapRepository {
logger.Info(fmt.Sprintf("Connected to %s Redis Route-map", prefix))
return events.NewRouteMapRepository(client, prefix)
return loraevents.NewRouteMapRepository(client, prefix)
}

func newService(pub messaging.Publisher, rmConn *redis.Client, thingsRMPrefix, channelsRMPrefix, connsRMPrefix string, logger *slog.Logger) lora.Service {
Expand Down
20 changes: 12 additions & 8 deletions cmd/opcua/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
"github.com/absmach/magistrala/opcua"
"github.com/absmach/magistrala/opcua/api"
"github.com/absmach/magistrala/opcua/db"
"github.com/absmach/magistrala/opcua/events"
opcuaevents "github.com/absmach/magistrala/opcua/events"
"github.com/absmach/magistrala/opcua/gopcua"
"github.com/absmach/magistrala/pkg/events"
"github.com/absmach/magistrala/pkg/events/store"
"github.com/absmach/magistrala/pkg/messaging/brokers"
brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing"
Expand Down Expand Up @@ -142,6 +143,8 @@ func main() {
return
}

logger.Info("Subscribed to Event Store")

hs := httpserver.New(ctx, httpCancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, cfg.InstanceID), logger)

if cfg.SendTelemetry {
Expand Down Expand Up @@ -181,21 +184,22 @@ func subscribeToStoredSubs(ctx context.Context, sub opcua.Subscriber, cfg opcua.
}

func subscribeToThingsES(ctx context.Context, svc opcua.Service, cfg config, logger *slog.Logger) error {
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger)
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger)
if err != nil {
return err
}

handler := events.NewEventHandler(svc)

logger.Info("Subscribed to Redis Event Store")

return subscriber.Subscribe(ctx, handler)
subConfig := events.SubscriberConfig{
Stream: thingsStream,
Consumer: cfg.ESConsumerName,
Handler: opcuaevents.NewEventHandler(svc),
}
return subscriber.Subscribe(ctx, subConfig)
}

func newRouteMapRepositoy(client *redis.Client, prefix string, logger *slog.Logger) opcua.RouteMapRepository {
logger.Info(fmt.Sprintf("Connected to %s Redis Route-map", prefix))
return events.NewRouteMapRepository(client, prefix)
return opcuaevents.NewRouteMapRepository(client, prefix)
}

func newService(sub opcua.Subscriber, browser opcua.Browser, thingRM, chanRM, connRM opcua.RouteMapRepository, opcuaConfig opcua.Config, logger *slog.Logger) opcua.Service {
Expand Down
9 changes: 8 additions & 1 deletion pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,19 @@ type EventHandler interface {
Handle(ctx context.Context, event Event) error
}

// SubscriberConfig represents event subscriber configuration.
type SubscriberConfig struct {
Consumer string
Stream string
Handler EventHandler
}

// Subscriber specifies event subscription API.
//
//go:generate mockery --name Subscriber --output=./mocks --filename subscriber.go --quiet --note "Copyright (c) Abstract Machines"
type Subscriber interface {
// Subscribe subscribes to the event stream and consumes events.
Subscribe(ctx context.Context, handler EventHandler) error
Subscribe(ctx context.Context, cfg SubscriberConfig) error

// Close gracefully closes event subscriber's connection.
Close() error
Expand Down
30 changes: 23 additions & 7 deletions pkg/events/nats/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,18 @@ func TestPublish(t *testing.T) {
publisher, err := nats.NewPublisher(context.Background(), natsURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

_, err = nats.NewSubscriber(context.Background(), "http://invaliurl.com", stream, consumer, logger)
_, err = nats.NewSubscriber(context.Background(), "http://invaliurl.com", logger)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)

subcriber, err := nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger)
subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

err = subcriber.Subscribe(context.Background(), handler{})
cfg := events.SubscriberConfig{
Stream: stream,
Consumer: consumer,
Handler: handler{},
}
err = subcriber.Subscribe(context.Background(), cfg)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err))

cases := []struct {
Expand Down Expand Up @@ -209,14 +214,20 @@ func TestPubsub(t *testing.T) {
}

for _, pc := range subcases {
subcriber, err := nats.NewSubscriber(context.Background(), natsURL, pc.stream, pc.consumer, logger)
subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger)
if err != nil {
assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err))

continue
}
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))

switch err := subcriber.Subscribe(context.Background(), pc.handler); {
cfg := events.SubscriberConfig{
Stream: pc.stream,
Consumer: pc.consumer,
Handler: pc.handler,
}
switch err := subcriber.Subscribe(context.Background(), cfg); {
case err == nil:
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
default:
Expand All @@ -232,10 +243,15 @@ func TestUnavailablePublish(t *testing.T) {
publisher, err := nats.NewPublisher(context.Background(), natsURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

subcriber, err := nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger)
subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

err = subcriber.Subscribe(context.Background(), handler{})
cfg := events.SubscriberConfig{
Stream: stream,
Consumer: consumer,
Handler: handler{},
}
err = subcriber.Subscribe(context.Background(), cfg)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err))

err = pool.Client.PauseContainer(container.Container.ID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/events/nats/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestMain(m *testing.M) {
}

if err := pool.Retry(func() error {
_, err = nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger)
_, err = nats.NewSubscriber(context.Background(), natsURL, logger)
return err
}); err != nil {
log.Fatalf("Could not connect to docker: %s", err)
Expand Down
41 changes: 18 additions & 23 deletions pkg/events/nats/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,12 @@ var (
)

type subEventStore struct {
conn *nats.Conn
pubsub messaging.PubSub
stream string
consumer string
logger *slog.Logger
conn *nats.Conn
pubsub messaging.PubSub
logger *slog.Logger
}

func NewSubscriber(ctx context.Context, url, stream, consumer string, logger *slog.Logger) (events.Subscriber, error) {
if stream == "" {
return nil, ErrEmptyStream
}

if consumer == "" {
return nil, ErrEmptyConsumer
}

func NewSubscriber(ctx context.Context, url string, logger *slog.Logger) (events.Subscriber, error) {
conn, err := nats.Connect(url, nats.MaxReconnects(maxReconnects))
if err != nil {
return nil, err
Expand All @@ -82,20 +72,25 @@ func NewSubscriber(ctx context.Context, url, stream, consumer string, logger *sl
}

return &subEventStore{
conn: conn,
pubsub: pubsub,
stream: stream,
consumer: consumer,
logger: logger,
conn: conn,
pubsub: pubsub,
logger: logger,
}, nil
}

func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHandler) error {
func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error {
if cfg.Consumer == "" {
return ErrEmptyConsumer
}
if cfg.Stream == "" {
return ErrEmptyStream
}

subCfg := messaging.SubscriberConfig{
ID: es.consumer,
Topic: eventsPrefix + "." + es.stream,
ID: cfg.Consumer,
Topic: eventsPrefix + "." + cfg.Stream,
Handler: &eventHandler{
handler: handler,
handler: cfg.Handler,
ctx: ctx,
logger: es.logger,
},
Expand Down
30 changes: 23 additions & 7 deletions pkg/events/rabbitmq/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,18 @@ func TestPublish(t *testing.T) {
publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

_, err = rabbitmq.NewSubscriber("http://invaliurl.com", stream, consumer, logger)
_, err = rabbitmq.NewSubscriber("http://invaliurl.com", logger)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)

subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger)
subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

err = subcriber.Subscribe(context.Background(), handler{})
cfg := events.SubscriberConfig{
Stream: stream,
Consumer: consumer,
Handler: handler{},
}
err = subcriber.Subscribe(context.Background(), cfg)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err))

cases := []struct {
Expand Down Expand Up @@ -210,14 +215,20 @@ func TestPubsub(t *testing.T) {
}

for _, pc := range subcases {
subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, pc.stream, pc.consumer, logger)
subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger)
if err != nil {
assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err))

continue
}
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))

switch err := subcriber.Subscribe(context.Background(), pc.handler); {
cfg := events.SubscriberConfig{
Stream: pc.stream,
Consumer: pc.consumer,
Handler: pc.handler,
}
switch err := subcriber.Subscribe(context.Background(), cfg); {
case err == nil:
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
default:
Expand All @@ -233,10 +244,15 @@ func TestUnavailablePublish(t *testing.T) {
publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger)
subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

err = subcriber.Subscribe(context.Background(), handler{})
cfg := events.SubscriberConfig{
Stream: stream,
Consumer: consumer,
Handler: handler{},
}
err = subcriber.Subscribe(context.Background(), cfg)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err))

err = pool.Client.PauseContainer(container.Container.ID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/events/rabbitmq/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestMain(m *testing.M) {
}

if err := pool.Retry(func() error {
_, err = rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger)
_, err = rabbitmq.NewSubscriber(rabbitmqURL, logger)
return err
}); err != nil {
log.Fatalf("Could not connect to docker: %s", err)
Expand Down
Loading

0 comments on commit 22bf1dc

Please sign in to comment.