Skip to content

Commit

Permalink
- #5 amqp direct exhange binding achieved, also atomic nature of patches
Browse files Browse the repository at this point in the history
  • Loading branch information
kneerunjun committed Apr 13, 2024
1 parent b5eb055 commit cadc9cf
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 3 deletions.
25 changes: 24 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@ services:
stdin_open: true
healthcheck:
test: "exit 0"
srv_rabbit:
image: rabbitmq:3.13-rc-management
ports:
- 35672:15672
- 5672:5672
networks:
- eensyaqp-nw
secrets:
- amqp_username
- amqp_password
environment:
- RABBITMQ_DEFAULT_USER=eensyiot-admin
- RABBITMQ_DEFAULT_PASS=33n5y4dm1n
stdin_open: true
tty: true
container_name: ctn_rabbit
healthcheck:
test: "exit 0"
srv_gin:
build:
context: .
Expand All @@ -47,12 +65,15 @@ services:
depends_on:
srv_mongo:
condition: service_healthy
srv_rabbit:
condition: service_healthy
networks:
- eensyaqp-nw
secrets:
- db_root_username
- db_root_password
- mongo_uri
- amqp_uri
container_name: ctn_gin
stdin_open: true
tty: true
Expand All @@ -71,4 +92,6 @@ secrets:
amqp_username:
file: ./secrets/amqp_username.secret
amqp_password:
file: ./secrets/amqp_password.secret
file: ./secrets/amqp_password.secret
amqp_uri:
file: ./secrets/amqp_uri.secret
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/eensymachines-in/patio v0.0.0-20240411082548-329da191c0fe
github.com/gin-gonic/gin v1.9.1
github.com/sirupsen/logrus v1.9.3
github.com/streadway/amqp v1.1.0
go.mongodb.org/mongo-driver v1.14.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
20 changes: 20 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package main

import (
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/eensymachines-in/errx/httperr"
"github.com/eensymachines-in/patio/aquacfg"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"go.mongodb.org/mongo-driver/mongo"
)

Expand Down Expand Up @@ -100,6 +102,24 @@ func HndlOneDvc(c *gin.Context) {
}))
return
}
val, _ := c.Get("amqp-ch")
amqpCh := val.(*amqp.Channel)
val, _ = c.Get("amqp-conn")
amqpConn := val.(*amqp.Connection)

defer amqpConn.Close()
defer amqpCh.Close()
byt, _ := json.Marshal(newCfg)
err := amqpCh.Publish("configs_direct", string(deviceDetails.MacID), false, false, amqp.Publishing{
ContentType: "text/plain",
Body: byt,
})
if err != nil {
httperr.HttpErrOrOkDispatch(c, httperr.ErrGatewayConnect(fmt.Errorf("failed to send message to amqp server %s", err)), log.WithFields(log.Fields{}))
// since amqp publish and db update should be atomic operation
DevicesCollc(db).PatchConfg(deviceDetails.MacID, *deviceDetails.Cfg, ctx) // reverting the old settings
return
}
} else {
c.AbortWithStatus(http.StatusMethodNotAllowed)
return
Expand Down
21 changes: 20 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ import (

const (
MONGO_URI_SECRET = "/run/secrets/mongo_uri"
AMQP_URI_SECRET = "/run/secrets/amqp_uri"
)

var (
mongoConnectURI string = ""
mongoDBName = ""
amqpConnectURI = ""
rabbitQname = "" // name of the rabbit queue
)

func init() {
Expand Down Expand Up @@ -72,6 +75,22 @@ func init() {
if mongoDBName == "" {
log.Fatal("invalid/empty name for mongo db, cannot proceed")
}

/* Making AMQP connections */
f, err = os.Open(AMQP_URI_SECRET)
if err != nil || f == nil {
log.Fatalf("failed to open amqp connection uri from secret file %s", err)
}
byt, err = io.ReadAll(f)
if err != nil {
log.Fatalf("failed to read amqp connection uri from secret file %s", err)
}
amqpConnectURI = string(byt)
if amqpConnectURI == "" {
log.Fatal("amqp connect uri is empty, check secret file and rerun application")
}
rabbitQname = os.Getenv("AMQP_QNAME")

}

func main() {
Expand All @@ -93,7 +112,7 @@ func main() {
// Patching device details - config or users
// ?path=users&action=append
// ?path=config
devices.PATCH("/:deviceid", DeviceOfID, HndlOneDvc)
devices.PATCH("/:deviceid", RabbitConnectWithChn(amqpConnectURI, "configs_direct"), DeviceOfID, HndlOneDvc)
// Removing a device registration completely
devices.DELETE("/:deviceid", HndlOneDvc)

Expand Down
51 changes: 51 additions & 0 deletions mddlwr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"fmt"
"net/http"
"os"
"time"

"github.com/eensymachines-in/errx/httperr"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
Expand All @@ -33,6 +35,55 @@ func CORS(c *gin.Context) {
c.AbortWithStatus(http.StatusOK)
}
}

func RabbitConnectWithChn(connString, xname string) gin.HandlerFunc {
return func(c *gin.Context) {
conn, err := amqp.Dial(connString)
if err != nil {
httperr.HttpErrOrOkDispatch(c, httperr.ErrGatewayConnect(err), log.WithFields(log.Fields{
"stack": "RabbitConnectWithChn",
"conn_string": connString,
}))
return
}
// defer conn.Close()
ch, err := conn.Channel()
if err != nil {
httperr.HttpErrOrOkDispatch(c, httperr.ErrGatewayConnect(err), log.WithFields(log.Fields{
"stack": "RabbitConnectWithChn",
"login": os.Getenv("AMQP_LOGIN"),
"server": os.Getenv("AMQP_SERVER"),
}))
conn.Close() // incase no channel, we close the channel before we exit the stack
return
}
// NOTE: we shall be using a direct exchange with mac id specific routing key
err = ch.ExchangeDeclare(
xname, // name
"direct", // exhange type
true, // durable
false, //auto deleted
false, //internal
false, // nowait
nil, //amqp.table
)
if err != nil {
httperr.HttpErrOrOkDispatch(c, httperr.ErrGatewayConnect(err), log.WithFields(log.Fields{
"stack": "RabbitConnectWithChn",
"login": os.Getenv("AMQP_LOGIN"),
"server": os.Getenv("AMQP_SERVER"),
}))
// incase declaring the exchange fails we close the channel and connection on our way out
ch.Close()
conn.Close()
return
}
c.Set("amqp-ch", ch)
c.Set("amqp-conn", conn)
c.Next()
}
}

func MongoConnectURI(uri, dbname string) gin.HandlerFunc {
return func(c *gin.Context) {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
2 changes: 1 addition & 1 deletion test/apitest.http
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Content-Type: application/json

{
"tickat": "12:00",
"config": 3,
"config": 0,
"interval": 250,
"pulsegap": 11

Expand Down

0 comments on commit cadc9cf

Please sign in to comment.