Skip to content

Commit

Permalink
SMQ-2546 - Add events to adapters (#2659)
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Gateru <[email protected]>
  • Loading branch information
felixgateru authored Jan 30, 2025
1 parent 53cb10b commit 07dbb86
Show file tree
Hide file tree
Showing 13 changed files with 362 additions and 44 deletions.
9 changes: 9 additions & 0 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
coapserver "github.com/absmach/supermq/pkg/server/coap"
Expand All @@ -47,6 +48,7 @@ type config struct {
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_COAP_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
}

func main() {
Expand Down Expand Up @@ -143,6 +145,13 @@ func main() {
defer nps.Close()
nps = brokerstracing.NewPubSub(coapServerConfig, tracer, nps)

nps, err = msgevents.NewPubSubMiddleware(ctx, nps, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

svc := coap.New(clientsClient, channelsClient, nps)

svc = tracing.New(tracer, svc)
Expand Down
9 changes: 9 additions & 0 deletions cmd/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/messaging/handler"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
Expand Down Expand Up @@ -59,6 +60,7 @@ type config struct {
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_HTTP_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
}

func main() {
Expand Down Expand Up @@ -163,6 +165,13 @@ func main() {
defer pub.Close()
pub = brokerstracing.NewPublisher(httpServerConfig, tracer, pub)

pub, err = msgevents.NewPublisherMiddleware(ctx, pub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

svc := newService(pub, authn, clientsClient, channelsClient, logger, tracer)
targetServerCfg := server.Config{Port: targetHTTPPort}

Expand Down
22 changes: 22 additions & 0 deletions cmd/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/messaging/handler"
mqttpub "github.com/absmach/supermq/pkg/messaging/mqtt"
"github.com/absmach/supermq/pkg/server"
Expand Down Expand Up @@ -134,6 +135,13 @@ func main() {
defer bsub.Close()
bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub)

bsub, err = msgevents.NewPubSubMiddleware(ctx, bsub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTQoS, cfg.MQTTForwarderTimeout)
if err != nil {
logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
Expand All @@ -142,6 +150,13 @@ func main() {
}
defer mpub.Close()

mpub, err = msgevents.NewPublisherMiddleware(ctx, mpub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger)
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllChannels)
if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil {
Expand All @@ -159,6 +174,13 @@ func main() {
defer np.Close()
np = brokerstracing.NewPublisher(serverConfig, tracer, np)

np, err = msgevents.NewPublisherMiddleware(ctx, np, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

es, err := events.NewEventStore(ctx, cfg.ESURL, cfg.Instance)
if err != nil {
logger.Error(fmt.Sprintf("failed to create %s event store : %s", svcName, err))
Expand Down
9 changes: 9 additions & 0 deletions cmd/ws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
httpserver "github.com/absmach/supermq/pkg/server/http"
Expand Down Expand Up @@ -55,6 +56,7 @@ type config struct {
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_WS_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
}

func main() {
Expand Down Expand Up @@ -165,6 +167,13 @@ func main() {
defer nps.Close()
nps = brokerstracing.NewPubSub(targetServerConfig, tracer, nps)

nps, err = msgevents.NewPubSubMiddleware(ctx, nps, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

svc := newService(clientsClient, channelsClient, nps, logger, tracer)

hs := httpserver.NewServer(ctx, cancel, svcName, targetServerConfig, httpapi.MakeHandler(ctx, svc, logger, cfg.InstanceID), logger)
Expand Down
5 changes: 3 additions & 2 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,6 @@ services:
bind:
create_host_path: true


groups-db:
image: postgres:16.2-alpine
container_name: supermq-groups-db
Expand Down Expand Up @@ -948,7 +947,6 @@ services:
bind:
create_host_path: true


jaeger:
image: jaegertracing/all-in-one:1.60
container_name: supermq-jaeger
Expand Down Expand Up @@ -1067,6 +1065,7 @@ services:
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY}
SMQ_HTTP_ADAPTER_INSTANCE_ID: ${SMQ_HTTP_ADAPTER_INSTANCE_ID}
SMQ_ES_URL: ${SMQ_ES_URL}
ports:
- ${SMQ_HTTP_ADAPTER_PORT}:${SMQ_HTTP_ADAPTER_PORT}
networks:
Expand Down Expand Up @@ -1153,6 +1152,7 @@ services:
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY}
SMQ_COAP_ADAPTER_INSTANCE_ID: ${SMQ_COAP_ADAPTER_INSTANCE_ID}
SMQ_ES_URL: ${SMQ_ES_URL}
ports:
- ${SMQ_COAP_ADAPTER_PORT}:${SMQ_COAP_ADAPTER_PORT}/udp
- ${SMQ_COAP_ADAPTER_HTTP_PORT}:${SMQ_COAP_ADAPTER_HTTP_PORT}/tcp
Expand Down Expand Up @@ -1230,6 +1230,7 @@ services:
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY}
SMQ_WS_ADAPTER_INSTANCE_ID: ${SMQ_WS_ADAPTER_INSTANCE_ID}
SMQ_ES_URL: ${SMQ_ES_URL}
ports:
- ${SMQ_WS_ADAPTER_HTTP_PORT}:${SMQ_WS_ADAPTER_HTTP_PORT}
networks:
Expand Down
48 changes: 39 additions & 9 deletions mqtt/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,48 @@ package events

import "github.com/absmach/supermq/pkg/events"

var _ events.Event = (*mqttEvent)(nil)
const (
mqttPrefix = "mqtt"
clientSubscribe = mqttPrefix + ".client_subscribe"
clientConnect = mqttPrefix + ".client_connect"
clientDisconnect = mqttPrefix + ".client_disconnect"
)

type mqttEvent struct {
clientID string
operation string
instance string
var (
_ events.Event = (*connectEvent)(nil)
_ events.Event = (*subscribeEvent)(nil)
)

type connectEvent struct {
operation string
clientID string
subscriberID string
instance string
}

func (ce connectEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": ce.operation,
"client_id": ce.clientID,
"subscriber_id": ce.subscriberID,
"instance": ce.instance,
}, nil
}

type subscribeEvent struct {
operation string
clientID string
subscriberID string
channelID string
subtopic string
}

func (me mqttEvent) Encode() (map[string]interface{}, error) {
func (se subscribeEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"client_id": me.clientID,
"operation": me.operation,
"instance": me.instance,
"operation": se.operation,
"client_id": se.clientID,
"subscriber_id": se.subscriberID,
"channel_id": se.channelID,
"subtopic": se.subtopic,
}, nil
}
50 changes: 33 additions & 17 deletions mqtt/events/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ const streamID = "supermq.mqtt"

//go:generate mockery --name EventStore --output=../mocks --filename events.go --quiet --note "Copyright (c) Abstract Machines"
type EventStore interface {
Connect(ctx context.Context, clientID string) error
Disconnect(ctx context.Context, clientID string) error
Connect(ctx context.Context, clientID, subscriberID string) error
Disconnect(ctx context.Context, clientID, subscriberID string) error
Subscribe(ctx context.Context, clientID, channelID, subscriberID, subtopic string) error
}

// EventStore is a struct used to store event streams in Redis.
type eventStore struct {
events.Publisher
ep events.Publisher
instance string
}

Expand All @@ -33,29 +34,44 @@ func NewEventStore(ctx context.Context, url, instance string) (EventStore, error
}

return &eventStore{
instance: instance,
Publisher: publisher,
instance: instance,
ep: publisher,
}, nil
}

// Connect issues event on MQTT CONNECT.
func (es *eventStore) Connect(ctx context.Context, clientID string) error {
ev := mqttEvent{
clientID: clientID,
operation: "connect",
instance: es.instance,
func (es *eventStore) Connect(ctx context.Context, clientID, subscriberID string) error {
ev := connectEvent{
clientID: clientID,
operation: clientConnect,
subscriberID: subscriberID,
instance: es.instance,
}

return es.Publish(ctx, ev)
return es.ep.Publish(ctx, ev)
}

// Disconnect issues event on MQTT CONNECT.
func (es *eventStore) Disconnect(ctx context.Context, clientID string) error {
ev := mqttEvent{
clientID: clientID,
operation: "disconnect",
instance: es.instance,
func (es *eventStore) Disconnect(ctx context.Context, clientID, subscriberID string) error {
ev := connectEvent{
clientID: clientID,
operation: clientDisconnect,
subscriberID: subscriberID,
instance: es.instance,
}

return es.Publish(ctx, ev)
return es.ep.Publish(ctx, ev)
}

// Subscribe issues event on MQTT SUBSCRIBE.
func (es *eventStore) Subscribe(ctx context.Context, clientID, channelID, subscriberID, subtopic string) error {
ev := subscribeEvent{
operation: clientSubscribe,
clientID: clientID,
channelID: channelID,
subscriberID: subscriberID,
subtopic: subtopic,
}

return es.ep.Publish(ctx, ev)
}
33 changes: 31 additions & 2 deletions mqtt/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
ErrFailedPublishDisconnectEvent = errors.New("failed to publish disconnect event")
ErrFailedParseSubtopic = errors.New("failed to parse subtopic")
ErrFailedPublishConnectEvent = errors.New("failed to publish connect event")
ErrFailedSubscribeEvent = errors.New("failed to publish subscribe event")
ErrFailedPublishToMsgBroker = errors.New("failed to publish to supermq message broker")
)

Expand Down Expand Up @@ -106,7 +107,7 @@ func (h *handler) AuthConnect(ctx context.Context) error {
return errInvalidUserId
}

if err := h.es.Connect(ctx, pwd); err != nil {
if err := h.es.Connect(ctx, s.Username, s.ID); err != nil {
h.logger.Error(errors.Wrap(ErrFailedPublishConnectEvent, err).Error())
}

Expand Down Expand Up @@ -202,6 +203,17 @@ func (h *handler) Subscribe(ctx context.Context, topics *[]string) error {
if !ok {
return errors.Wrap(ErrFailedSubscribe, ErrClientNotInitialized)
}

for _, topic := range *topics {
channelID, subTopic, err := parseTopic(topic)
if err != nil {
return err
}
if err := h.es.Subscribe(ctx, s.Username, channelID, s.ID, subTopic); err != nil {
return errors.Wrap(ErrFailedSubscribeEvent, err)
}
}

h.logger.Info(fmt.Sprintf(LogInfoSubscribed, s.ID, strings.Join(*topics, ",")))
return nil
}
Expand All @@ -223,7 +235,7 @@ func (h *handler) Disconnect(ctx context.Context) error {
return errors.Wrap(ErrFailedDisconnect, ErrClientNotInitialized)
}
h.logger.Error(fmt.Sprintf(LogInfoDisconnected, s.ID, s.Password))
if err := h.es.Disconnect(ctx, string(s.Password)); err != nil {
if err := h.es.Disconnect(ctx, s.Username, s.ID); err != nil {
return errors.Wrap(ErrFailedPublishDisconnectEvent, err)
}
return nil
Expand Down Expand Up @@ -260,6 +272,23 @@ func (h *handler) authAccess(ctx context.Context, clientID, topic string, msgTyp
return nil
}

func parseTopic(topic string) (string, string, error) {
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 2 {
return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
}

chanID := channelParts[1]
subtopic := channelParts[2]

subtopic, err := parseSubtopic(subtopic)
if err != nil {
return "", "", errors.Wrap(ErrFailedParseSubtopic, err)
}

return chanID, subtopic, nil
}

func parseSubtopic(subtopic string) (string, error) {
if subtopic == "" {
return subtopic, nil
Expand Down
Loading

0 comments on commit 07dbb86

Please sign in to comment.