Skip to content

Commit

Permalink
feat: add conn id to connect and disconnect events
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Gateru <[email protected]>
  • Loading branch information
felixgateru committed Jan 22, 2025
1 parent b39d060 commit f5d7ba7
Show file tree
Hide file tree
Showing 14 changed files with 112 additions and 117 deletions.
15 changes: 12 additions & 3 deletions cmd/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
adapter "github.com/absmach/supermq/http"
httpapi "github.com/absmach/supermq/http/api"
"github.com/absmach/supermq/http/events"
smqlog "github.com/absmach/supermq/logger"
smqauthn "github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/authn/authsvc"
Expand Down Expand Up @@ -59,6 +60,7 @@ type config struct {
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_HTTP_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
}

func main() {
Expand Down Expand Up @@ -141,6 +143,13 @@ func main() {
defer authnHandler.Close()
logger.Info("authn successfully connected to auth gRPC server " + authnHandler.Secure())

eventStore, err := events.NewEventStore(ctx, cfg.ESURL, cfg.InstanceID)
if err != nil {
logger.Error(fmt.Sprintf("failed to create %s event store : %s", svcName, err))
exitCode = 1
return
}

tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
Expand All @@ -163,7 +172,7 @@ func main() {
defer pub.Close()
pub = brokerstracing.NewPublisher(httpServerConfig, tracer, pub)

svc := newService(pub, authn, clientsClient, channelsClient, logger, tracer)
svc := newService(pub, eventStore, authn, clientsClient, channelsClient, logger, tracer)
targetServerCfg := server.Config{Port: targetHTTPPort}

hs := httpserver.NewServer(ctx, cancel, svcName, targetServerCfg, httpapi.MakeHandler(logger, cfg.InstanceID), logger)
Expand All @@ -190,8 +199,8 @@ func main() {
}
}

func newService(pub messaging.Publisher, authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, logger *slog.Logger, tracer trace.Tracer) session.Handler {
svc := adapter.NewHandler(pub, authn, clients, channels, logger)
func newService(pub messaging.Publisher, es events.EventStore, authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, logger *slog.Logger, tracer trace.Tracer) session.Handler {
svc := adapter.NewHandler(pub, es, authn, clients, channels, logger)
svc = handler.NewTracing(tracer, svc)
svc = handler.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics(svcName, "api")
Expand Down
11 changes: 10 additions & 1 deletion cmd/ws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/absmach/supermq/pkg/uuid"
"github.com/absmach/supermq/ws"
httpapi "github.com/absmach/supermq/ws/api"
"github.com/absmach/supermq/ws/events"
"github.com/absmach/supermq/ws/tracing"
"github.com/caarlos0/env/v11"
"go.opentelemetry.io/otel/trace"
Expand All @@ -55,6 +56,7 @@ type config struct {
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_WS_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
}

func main() {
Expand Down Expand Up @@ -143,6 +145,13 @@ func main() {
defer authnHandler.Close()
logger.Info("authn successfully connected to auth gRPC server " + authnHandler.Secure())

eventStore, err := events.NewEventStore(ctx, cfg.ESURL, cfg.InstanceID)
if err != nil {
logger.Error(fmt.Sprintf("failed to create %s event store : %s", svcName, err))
exitCode = 1
return
}

tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("failed to init Jaeger: %s", err))
Expand Down Expand Up @@ -178,7 +187,7 @@ func main() {
g.Go(func() error {
return hs.Start()
})
handler := ws.NewHandler(nps, logger, authn, clientsClient, channelsClient)
handler := ws.NewHandler(nps, eventStore, logger, authn, clientsClient, channelsClient)
return proxyWS(ctx, httpServerConfig, targetServerConfig, logger, handler)
})

Expand Down
2 changes: 1 addition & 1 deletion coap/events/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

// Package events provides the domain concept definitions needed to support
// coap events functionality.
package events
package events
53 changes: 12 additions & 41 deletions coap/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,21 @@ const (
clientUnsubscribe = coapPrefix + ".client_unsubscribe"
)

type clientPublishEvent struct {
ChannelID string
ClientID string
Topic string
type coapEvent struct {
operation string
channelID string
clientID string
connID string
topic string
}

func (cpe clientPublishEvent) Encode() (map[string]interface{}, error) {
func (ce coapEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": clientPublish,
"channel_id": cpe.ChannelID,
"client_id": cpe.ClientID,
"topic": cpe.Topic,
}
return val, nil
}

type clientSubscribeEvent struct {
ChannelID string
ClientID string
Topic string
}

func (cse clientSubscribeEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": clientSubscribe,
"channel_id": cse.ChannelID,
"client_id": cse.ClientID,
"topic": cse.Topic,
}
return val, nil
}

type clientUnsubscribeEvent struct {
ChannelID string
ClientID string
Topic string
}

func (cse clientUnsubscribeEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": clientUnsubscribe,
"channel_id": cse.ChannelID,
"client_id": cse.ClientID,
"topic": cse.Topic,
"operation": ce.operation,
"channel_id": ce.channelID,
"client_id": ce.clientID,
"conn_id": ce.connID,
"topic": ce.topic,
}
return val, nil
}
29 changes: 17 additions & 12 deletions coap/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ func (es *eventStore) Publish(ctx context.Context, clientID string, msg *messagi
return err
}

event := clientPublishEvent{
ClientID: clientID,
ChannelID: msg.GetChannel(),
Topic: msg.GetSubtopic(),
event := coapEvent{
operation: clientPublish,
clientID: clientID,
channelID: msg.GetChannel(),
topic: msg.GetSubtopic(),
}
if err := es.events.Publish(ctx, event); err != nil {
return err
Expand All @@ -57,10 +58,12 @@ func (es *eventStore) Subscribe(ctx context.Context, clientID, channelID, subtop
return err
}

event := clientSubscribeEvent{
ClientID: clientID,
ChannelID: channelID,
Topic: subtopic,
event := coapEvent{
operation: clientSubscribe,
clientID: clientID,
channelID: channelID,
connID: c.Token(),
topic: subtopic,
}
if err := es.events.Publish(ctx, event); err != nil {
return err
Expand All @@ -75,10 +78,12 @@ func (es *eventStore) Unsubscribe(ctx context.Context, clientID, channelID, subt
return err
}

event := clientUnsubscribeEvent{
ClientID: clientID,
ChannelID: channelID,
Topic: subtopic,
event := coapEvent{
operation: clientUnsubscribe,
clientID: clientID,
channelID: channelID,
connID: token,
topic: subtopic,
}
if err := es.events.Publish(ctx, event); err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,7 @@ services:
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY}
SMQ_HTTP_ADAPTER_INSTANCE_ID: ${SMQ_HTTP_ADAPTER_INSTANCE_ID}
SMQ_ES_URL: ${SMQ_ES_URL}
ports:
- ${SMQ_HTTP_ADAPTER_PORT}:${SMQ_HTTP_ADAPTER_PORT}
networks:
Expand Down Expand Up @@ -1129,6 +1130,7 @@ services:
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY}
SMQ_WS_ADAPTER_INSTANCE_ID: ${SMQ_WS_ADAPTER_INSTANCE_ID}
SMQ_ES_URL: ${SMQ_ES_URL}
ports:
- ${SMQ_WS_ADAPTER_HTTP_PORT}:${SMQ_WS_ADAPTER_HTTP_PORT}
networks:
Expand Down
2 changes: 1 addition & 1 deletion http/events/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

// Package events provides the domain concept definitions needed to support
// http events functionality.
package events
package events
2 changes: 0 additions & 2 deletions http/events/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

const streamID = "supermq.http"



//go:generate mockery --name EventStore --output=../mocks --filename events.go --quiet --note "Copyright (c) Abstract Machines"
type EventStore interface {
Connect(ctx context.Context, clientID string) error
Expand Down
4 changes: 3 additions & 1 deletion mqtt/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package events
import "github.com/absmach/supermq/pkg/events"

const (
mqttPrefix = "http"
mqttPrefix = "mqtt"
clientPublish = mqttPrefix + ".client_publish"
clientSubscribe = mqttPrefix + ".client_subscribe"
clientUnsubscribe = mqttPrefix + ".client_unsubscribe"
Expand All @@ -20,6 +20,7 @@ type mqttEvent struct {
operation string
channelID string
clientID string
connID string
topic string
instance string
}
Expand All @@ -29,6 +30,7 @@ func (me mqttEvent) Encode() (map[string]interface{}, error) {
"operation": me.operation,
"channel_id": me.channelID,
"client_id": me.clientID,
"conn_id": me.connID,
"topic": me.topic,
}, nil
}
20 changes: 12 additions & 8 deletions mqtt/events/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ const streamID = "supermq.mqtt"

//go:generate mockery --name EventStore --output=../mocks --filename events.go --quiet --note "Copyright (c) Abstract Machines"
type EventStore interface {
Connect(ctx context.Context, clientID string) error
Disconnect(ctx context.Context, clientID string) error
Connect(ctx context.Context, clientID, connID string) error
Disconnect(ctx context.Context, clientID, connID string) error
Publish(ctx context.Context, clientID, channelID, topic string) error
Subscribe(ctx context.Context, clientID, channelID, subtopic string) error
Unsubscribe(ctx context.Context, clientID, channelID, subtopic string) error
Subscribe(ctx context.Context, clientID, channelID, connID, subtopic string) error
Unsubscribe(ctx context.Context, clientID, channelID, connID, subtopic string) error
}

// EventStore is a struct used to store event streams in Redis.
Expand All @@ -42,21 +42,23 @@ func NewEventStore(ctx context.Context, url, instance string) (EventStore, error
}

// Connect issues event on MQTT CONNECT.
func (es *eventStore) Connect(ctx context.Context, clientID string) error {
func (es *eventStore) Connect(ctx context.Context, clientID, connID string) error {
ev := mqttEvent{
clientID: clientID,
operation: clientConnect,
connID: connID,
instance: es.instance,
}

return es.publisher.Publish(ctx, ev)
}

// Disconnect issues event on MQTT CONNECT.
func (es *eventStore) Disconnect(ctx context.Context, clientID string) error {
func (es *eventStore) Disconnect(ctx context.Context, clientID, connID string) error {
ev := mqttEvent{
clientID: clientID,
operation: clientDisconnect,
connID: connID,
instance: es.instance,
}

Expand All @@ -77,11 +79,12 @@ func (es *eventStore) Publish(ctx context.Context, clientID, channelID, topic st
}

// Subscribe issues event on MQTT SUBSCRIBE.
func (es *eventStore) Subscribe(ctx context.Context, clientID, channelID, subtopic string) error {
func (es *eventStore) Subscribe(ctx context.Context, clientID, channelID, connID, subtopic string) error {
ev := mqttEvent{
clientID: clientID,
operation: clientSubscribe,
channelID: channelID,
connID: connID,
topic: subtopic,
instance: es.instance,
}
Expand All @@ -90,11 +93,12 @@ func (es *eventStore) Subscribe(ctx context.Context, clientID, channelID, subtop
}

// Unsubscribe issues event on MQTT UNSUBSCRIBE.
func (es *eventStore) Unsubscribe(ctx context.Context, clientID, channelID, subtopic string) error {
func (es *eventStore) Unsubscribe(ctx context.Context, clientID, channelID, connID, subtopic string) error {
ev := mqttEvent{
clientID: clientID,
operation: clientUnsubscribe,
channelID: channelID,
connID: connID,
topic: subtopic,
instance: es.instance,
}
Expand Down
Loading

0 comments on commit f5d7ba7

Please sign in to comment.