Skip to content

Commit

Permalink
migrate coap to v3
Browse files Browse the repository at this point in the history
Signed-off-by: SammyOina <[email protected]>
  • Loading branch information
SammyOina committed Oct 17, 2023
1 parent d8c4b64 commit 293a9c4
Show file tree
Hide file tree
Showing 90 changed files with 46 additions and 12,834 deletions.
47 changes: 27 additions & 20 deletions coap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package api

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -19,9 +20,9 @@ import (
mflog "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/plgd-dev/go-coap/v2/message"
"github.com/plgd-dev/go-coap/v2/message/codes"
"github.com/plgd-dev/go-coap/v2/mux"
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/message/codes"
"github.com/plgd-dev/go-coap/v3/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

Expand Down Expand Up @@ -65,20 +66,26 @@ func MakeCoAPHandler(svc coap.Service, l mflog.Logger) mux.HandlerFunc {
return handler
}

func sendResp(w mux.ResponseWriter, resp *message.Message) {
if err := w.Client().WriteMessage(resp); err != nil {
func sendResp(ctx context.Context, w mux.ResponseWriter, resp *message.Message) {
m := w.Conn().AcquireMessage(ctx)
m.SetCode(resp.Code)
m.SetBody(bytes.NewReader(resp.Payload))
m.SetToken(resp.Token)
for _, option := range resp.Options {
m.SetOptionBytes(option.ID, option.Value)
}
if err := w.Conn().WriteMessage(m); err != nil {
logger.Warn(fmt.Sprintf("Can't set response: %s", err))
}
}

func handler(w mux.ResponseWriter, m *mux.Message) {
resp := message.Message{
Code: codes.Content,
Token: m.Token,
Context: m.Context,
Token: m.Token(),
Options: make(message.Options, 0, 16),
}
defer sendResp(w, &resp)
defer sendResp(m.Context(), w, &resp)
msg, err := decodeMessage(m)
if err != nil {
logger.Warn(fmt.Sprintf("Error decoding message: %s", err))
Expand All @@ -91,9 +98,9 @@ func handler(w mux.ResponseWriter, m *mux.Message) {
resp.Code = codes.Unauthorized
return
}
switch m.Code {
switch m.Code() {
case codes.GET:
err = handleGet(m.Context, m, w.Client(), msg, key)
err = handleGet(m.Context(), m, w.Conn(), msg, key)
case codes.POST:
resp.Code = codes.Created
err = nil
Expand All @@ -115,25 +122,25 @@ func handler(w mux.ResponseWriter, m *mux.Message) {
}
}

func handleGet(ctx context.Context, m *mux.Message, c mux.Client, msg *messaging.Message, key string) error {
func handleGet(ctx context.Context, m *mux.Message, c mux.Conn, msg *messaging.Message, key string) error {
var obs uint32
obs, err := m.Options.Observe()
obs, err := m.Observe()
if err != nil {
logger.Warn(fmt.Sprintf("Error reading observe option: %s", err))
return errBadOptions
}
if obs == startObserve {
c := coap.NewClient(c, m.Token, logger)
c := coap.NewClient(c, m.Token(), logger)
return service.Subscribe(ctx, key, msg.Channel, msg.Subtopic, c)
}
return service.Unsubscribe(ctx, key, msg.Channel, msg.Subtopic, m.Token.String())
return service.Unsubscribe(ctx, key, msg.Channel, msg.Subtopic, m.Token().String())
}

func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
if msg.Options == nil {
if msg.Options() == nil {
return &messaging.Message{}, errBadOptions
}
path, err := msg.Options.Path()
path, err := msg.Path()
if err != nil {
return &messaging.Message{}, err
}
Expand All @@ -154,8 +161,8 @@ func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
Created: time.Now().UnixNano(),
}

if msg.Body != nil {
buff, err := io.ReadAll(msg.Body)
if msg.Body() != nil {
buff, err := io.ReadAll(msg.Body())
if err != nil {
return ret, err
}
Expand All @@ -165,10 +172,10 @@ func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
}

func parseKey(msg *mux.Message) (string, error) {
if obs, _ := msg.Options.Observe(); obs != 0 && msg.Code == codes.GET {
if obs, _ := msg.Observe(); obs != 0 && msg.Code() == codes.GET {
return "", nil
}
authKey, err := msg.Options.GetString(message.URIQuery)
authKey, err := msg.Options().GetString(message.URIQuery)
if err != nil {
return "", err
}
Expand Down
37 changes: 17 additions & 20 deletions coap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/plgd-dev/go-coap/v2/message"
"github.com/plgd-dev/go-coap/v2/message/codes"
mux "github.com/plgd-dev/go-coap/v2/mux"
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/message/codes"
mux "github.com/plgd-dev/go-coap/v3/mux"
)

// Client wraps CoAP client.
Expand All @@ -36,14 +36,14 @@ type Client interface {
var ErrOption = errors.New("unable to set option")

type client struct {
client mux.Client
client mux.Conn
token message.Token
observe uint32
logger logger.Logger
}

// NewClient instantiates a new Observer.
func NewClient(c mux.Client, tkn message.Token, l logger.Logger) Client {
func NewClient(c mux.Conn, tkn message.Token, l logger.Logger) Client {
return &client{
client: c,
token: tkn,
Expand All @@ -57,13 +57,10 @@ func (c *client) Done() <-chan struct{} {
}

func (c *client) Cancel() error {
m := message.Message{
Code: codes.Content,
Token: c.token,
Context: context.Background(),
Options: make(message.Options, 0, 16),
}
if err := c.client.WriteMessage(&m); err != nil {
pm := c.client.AcquireMessage(context.Background())
pm.SetCode(codes.Content)
pm.SetToken(c.token)
if err := c.client.WriteMessage(pm); err != nil {
c.logger.Error(fmt.Sprintf("Error sending message: %s.", err))
}
return c.client.Close()
Expand All @@ -74,12 +71,10 @@ func (c *client) Token() string {
}

func (c *client) Handle(msg *messaging.Message) error {
m := message.Message{
Code: codes.Content,
Token: c.token,
Context: c.client.Context(),
Body: bytes.NewReader(msg.Payload),
}
pm := c.client.AcquireMessage(context.Background())
pm.SetCode(codes.Content)
pm.SetToken(c.token)
pm.SetBody(bytes.NewReader(msg.Payload))

atomic.AddUint32(&c.observe, 1)
var opts message.Options
Expand All @@ -103,6 +98,8 @@ func (c *client) Handle(msg *messaging.Message) error {
return fmt.Errorf("cannot set options to response: %w", err)
}

m.Options = opts
return c.client.WriteMessage(&m)
for _, option := range opts {
pm.SetOptionBytes(option.ID, option.Value)
}
return c.client.WriteMessage(pm)
}
Loading

0 comments on commit 293a9c4

Please sign in to comment.