Skip to content

Commit

Permalink
feat: add events to adapters
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Gateru <[email protected]>
  • Loading branch information
felixgateru committed Jan 21, 2025
1 parent 153d230 commit b39d060
Show file tree
Hide file tree
Showing 31 changed files with 958 additions and 202 deletions.
13 changes: 11 additions & 2 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/absmach/supermq"
"github.com/absmach/supermq/coap"
httpapi "github.com/absmach/supermq/coap/api"
"github.com/absmach/supermq/coap/events"
"github.com/absmach/supermq/coap/tracing"
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/pkg/grpcclient"
Expand Down Expand Up @@ -47,6 +48,7 @@ type config struct {
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_COAP_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
}

func main() {
Expand Down Expand Up @@ -143,7 +145,14 @@ func main() {
defer nps.Close()
nps = brokerstracing.NewPubSub(coapServerConfig, tracer, nps)

svc := coap.New(clientsClient, channelsClient, nps)
svc := coap.New(channelsClient, nps)

svc, err = events.NewEventStoreMiddleware(ctx, svc, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

svc = tracing.New(tracer, svc)

Expand All @@ -154,7 +163,7 @@ func main() {

hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(cfg.InstanceID), logger)

cs := coapserver.NewServer(ctx, cancel, svcName, coapServerConfig, httpapi.MakeCoAPHandler(svc, logger), logger)
cs := coapserver.NewServer(ctx, cancel, svcName, coapServerConfig, httpapi.MakeCoAPHandler(svc, logger, clientsClient), logger)

if cfg.SendTelemetry {
chc := chclient.New(svcName, supermq.Version, logger, cancel)
Expand Down
57 changes: 10 additions & 47 deletions coap/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"fmt"

grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
"github.com/absmach/supermq/pkg/connections"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
Expand All @@ -27,52 +26,37 @@ const chansPrefix = "channels"
type Service interface {
// Publish publishes message to specified channel.
// Key is used to authorize publisher.
Publish(ctx context.Context, key string, msg *messaging.Message) error
Publish(ctx context.Context, clientID string, msg *messaging.Message) error

// Subscribes to channel with specified id, subtopic and adds subscription to
// service map of subscriptions under given ID.
Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error
Subscribe(ctx context.Context, clientID, chanID, subtopic string, c Client) error

// Unsubscribe method is used to stop observing resource.
Unsubscribe(ctx context.Context, key, chanID, subptopic, token string) error

// DisconnectHandler method is used to disconnected the client
DisconnectHandler(ctx context.Context, chanID, subptopic, token string) error
Unsubscribe(ctx context.Context, clientID, chanID, subptopic, token string) error
}

var _ Service = (*adapterService)(nil)

// Observers is a map of maps,.
type adapterService struct {
clients grpcClientsV1.ClientsServiceClient
channels grpcChannelsV1.ChannelsServiceClient
pubsub messaging.PubSub
}

// New instantiates the CoAP adapter implementation.
func New(clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, pubsub messaging.PubSub) Service {
func New(channels grpcChannelsV1.ChannelsServiceClient, pubsub messaging.PubSub) Service {
as := &adapterService{
clients: clients,
channels: channels,
pubsub: pubsub,
}

return as
}

func (svc *adapterService) Publish(ctx context.Context, key string, msg *messaging.Message) error {
authnRes, err := svc.clients.Authenticate(ctx, &grpcClientsV1.AuthnReq{
ClientSecret: key,
})
if err != nil {
return errors.Wrap(svcerr.ErrAuthentication, err)
}
if !authnRes.Authenticated {
return svcerr.ErrAuthentication
}

func (svc *adapterService) Publish(ctx context.Context, clientID string, msg *messaging.Message) error {
authzRes, err := svc.channels.Authorize(ctx, &grpcChannelsV1.AuthzReq{
ClientId: authnRes.GetId(),
ClientId: clientID,
ClientType: policies.ClientType,
Type: uint32(connections.Publish),
ChannelId: msg.GetChannel(),
Expand All @@ -84,23 +68,12 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg *messagi
return svcerr.ErrAuthorization
}

msg.Publisher = authnRes.GetId()
msg.Publisher = clientID

return svc.pubsub.Publish(ctx, msg.GetChannel(), msg)
}

func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error {
authnRes, err := svc.clients.Authenticate(ctx, &grpcClientsV1.AuthnReq{
ClientSecret: key,
})
if err != nil {
return errors.Wrap(svcerr.ErrAuthentication, err)
}
if !authnRes.Authenticated {
return svcerr.ErrAuthentication
}

clientID := authnRes.GetId()
func (svc *adapterService) Subscribe(ctx context.Context, clientID, chanID, subtopic string, c Client) error {
authzRes, err := svc.channels.Authorize(ctx, &grpcChannelsV1.AuthzReq{
ClientId: clientID,
ClientType: policies.ClientType,
Expand Down Expand Up @@ -128,20 +101,10 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic
return svc.pubsub.Subscribe(ctx, subCfg)
}

func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) error {
authnRes, err := svc.clients.Authenticate(ctx, &grpcClientsV1.AuthnReq{
ClientSecret: key,
})
if err != nil {
return errors.Wrap(svcerr.ErrAuthentication, err)
}
if !authnRes.Authenticated {
return svcerr.ErrAuthentication
}

func (svc *adapterService) Unsubscribe(ctx context.Context, clientID, chanID, subtopic, token string) error {
authzRes, err := svc.channels.Authorize(ctx, &grpcChannelsV1.AuthzReq{
DomainId: "",
ClientId: authnRes.GetId(),
ClientId: clientID,
ClientType: policies.ClientType,
Type: uint32(connections.Subscribe),
ChannelId: chanID,
Expand Down
2 changes: 1 addition & 1 deletion coap/api/doc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0

// Package api contains API-related concerns: endpoint definitions, middlewares
// Package api contains API-related concerns: endpoint definitions
// and all resource representations.
package api
23 changes: 0 additions & 23 deletions coap/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,3 @@ func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, key, chanID, subto

return lm.svc.Unsubscribe(ctx, key, chanID, subtopic, token)
}

// DisconnectHandler logs the disconnect handler. It logs the channel ID, subtopic (if any) and the time it took to complete the request.
// If the request fails, it logs the error.
func (lm *loggingMiddleware) DisconnectHandler(ctx context.Context, chanID, subtopic, token string) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("channel_id", chanID),
slog.String("token", token),
}
if subtopic != "" {
args = append(args, slog.String("subtopic", subtopic))
}
if err != nil {
args = append(args, slog.Any("error", err))
lm.logger.Warn("Unsubscribe failed", args...)
return
}
lm.logger.Info("Unsubscribe completed successfully", args...)
}(time.Now())

return lm.svc.DisconnectHandler(ctx, chanID, subtopic, token)
}
10 changes: 0 additions & 10 deletions coap/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,3 @@ func (mm *metricsMiddleware) Unsubscribe(ctx context.Context, key, chanID, subto

return mm.svc.Unsubscribe(ctx, key, chanID, subtopic, token)
}

// DisconnectHandler instruments DisconnectHandler method with metrics.
func (mm *metricsMiddleware) DisconnectHandler(ctx context.Context, chanID, subtopic, token string) error {
defer func(begin time.Time) {
mm.counter.With("method", "disconnect_handler").Add(1)
mm.latency.With("method", "disconnect_handler").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.DisconnectHandler(ctx, chanID, subtopic, token)
}
37 changes: 30 additions & 7 deletions coap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/absmach/supermq"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
"github.com/absmach/supermq/coap"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
Expand Down Expand Up @@ -49,6 +50,7 @@ var (
var (
logger *slog.Logger
service coap.Service
clients grpcClientsV1.ClientsServiceClient
)

// MakeHandler returns a HTTP handler for API endpoints.
Expand All @@ -61,9 +63,10 @@ func MakeHandler(instanceID string) http.Handler {
}

// MakeCoAPHandler creates handler for CoAP messages.
func MakeCoAPHandler(svc coap.Service, l *slog.Logger) mux.HandlerFunc {
func MakeCoAPHandler(svc coap.Service, l *slog.Logger, cli grpcClientsV1.ClientsServiceClient) mux.HandlerFunc {
logger = l
service = svc
clients = cli

return handler
}
Expand Down Expand Up @@ -94,14 +97,20 @@ func handler(w mux.ResponseWriter, m *mux.Message) {
resp.SetCode(codes.Unauthorized)
return
}
clientID, err := authenticate(m.Context(), key)
if err != nil {
logger.Warn(fmt.Sprintf("Error authenticating: %s", err))
resp.SetCode(codes.Unauthorized)
return
}

switch m.Code() {
case codes.GET:
resp.SetCode(codes.Content)
err = handleGet(m, w, msg, key)
err = handleGet(m, w, msg, clientID)
case codes.POST:
resp.SetCode(codes.Created)
err = service.Publish(m.Context(), key, msg)
err = service.Publish(m.Context(), clientID, msg)
default:
err = errMethodNotAllowed
}
Expand All @@ -122,7 +131,7 @@ func handler(w mux.ResponseWriter, m *mux.Message) {
}
}

func handleGet(m *mux.Message, w mux.ResponseWriter, msg *messaging.Message, key string) error {
func handleGet(m *mux.Message, w mux.ResponseWriter, msg *messaging.Message, clientID string) error {
var obs uint32
obs, err := m.Options().Observe()
if err != nil {
Expand All @@ -132,11 +141,11 @@ func handleGet(m *mux.Message, w mux.ResponseWriter, msg *messaging.Message, key
if obs == startObserve {
c := coap.NewClient(w.Conn(), m.Token(), logger)
w.Conn().AddOnClose(func() {
_ = service.DisconnectHandler(context.Background(), msg.GetChannel(), msg.GetSubtopic(), c.Token())
_ = service.Unsubscribe(context.Background(), clientID, msg.GetChannel(), msg.GetSubtopic(), c.Token())
})
return service.Subscribe(w.Conn().Context(), key, msg.GetChannel(), msg.GetSubtopic(), c)
return service.Subscribe(w.Conn().Context(), clientID, msg.GetChannel(), msg.GetSubtopic(), c)
}
return service.Unsubscribe(w.Conn().Context(), key, msg.GetChannel(), msg.GetSubtopic(), m.Token().String())
return service.Unsubscribe(w.Conn().Context(), clientID, msg.GetChannel(), msg.GetSubtopic(), m.Token().String())
}

func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
Expand Down Expand Up @@ -214,3 +223,17 @@ func parseSubtopic(subtopic string) (string, error) {
subtopic = strings.Join(filteredElems, ".")
return subtopic, nil
}

func authenticate(ctx context.Context, key string) (string, error) {
authnRes, err := clients.Authenticate(ctx, &grpcClientsV1.AuthnReq{
ClientSecret: key,
})
if err != nil {
return "", errors.Wrap(svcerr.ErrAuthentication, err)
}
if !authnRes.Authenticated {
return "", svcerr.ErrAuthentication
}

return authnRes.GetId(), nil
}
6 changes: 6 additions & 0 deletions coap/events/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0

// Package events provides the domain concept definitions needed to support
// coap events functionality.
package events
59 changes: 59 additions & 0 deletions coap/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0

package events

const (
coapPrefix = "coap"
clientPublish = coapPrefix + ".client_publish"
clientSubscribe = coapPrefix + ".client_subscribe"
clientUnsubscribe = coapPrefix + ".client_unsubscribe"
)

type clientPublishEvent struct {
ChannelID string
ClientID string
Topic string
}

func (cpe clientPublishEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": clientPublish,
"channel_id": cpe.ChannelID,
"client_id": cpe.ClientID,
"topic": cpe.Topic,
}
return val, nil
}

type clientSubscribeEvent struct {
ChannelID string
ClientID string
Topic string
}

func (cse clientSubscribeEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": clientSubscribe,
"channel_id": cse.ChannelID,
"client_id": cse.ClientID,
"topic": cse.Topic,
}
return val, nil
}

type clientUnsubscribeEvent struct {
ChannelID string
ClientID string
Topic string
}

func (cse clientUnsubscribeEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": clientUnsubscribe,
"channel_id": cse.ChannelID,
"client_id": cse.ClientID,
"topic": cse.Topic,
}
return val, nil
}
Loading

0 comments on commit b39d060

Please sign in to comment.