diff --git a/docker-compose.yml b/docker-compose.yml index b51d229..a66e7ff 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,6 +21,24 @@ services: stdin_open: true healthcheck: test: "exit 0" + srv_rabbit: + image: rabbitmq:3.13-rc-management + ports: + - 35672:15672 + - 5672:5672 + networks: + - eensyaqp-nw + secrets: + - amqp_username + - amqp_password + environment: + - RABBITMQ_DEFAULT_USER=eensyiot-admin + - RABBITMQ_DEFAULT_PASS=33n5y4dm1n + stdin_open: true + tty: true + container_name: ctn_rabbit + healthcheck: + test: "exit 0" srv_gin: build: context: . @@ -47,12 +65,15 @@ services: depends_on: srv_mongo: condition: service_healthy + srv_rabbit: + condition: service_healthy networks: - eensyaqp-nw secrets: - db_root_username - db_root_password - mongo_uri + - amqp_uri container_name: ctn_gin stdin_open: true tty: true @@ -71,4 +92,6 @@ secrets: amqp_username: file: ./secrets/amqp_username.secret amqp_password: - file: ./secrets/amqp_password.secret \ No newline at end of file + file: ./secrets/amqp_password.secret + amqp_uri: + file: ./secrets/amqp_uri.secret \ No newline at end of file diff --git a/go.mod b/go.mod index d396c28..8ff27a5 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/eensymachines-in/patio v0.0.0-20240411082548-329da191c0fe github.com/gin-gonic/gin v1.9.1 github.com/sirupsen/logrus v1.9.3 + github.com/streadway/amqp v1.1.0 go.mongodb.org/mongo-driver v1.14.0 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 ) diff --git a/go.sum b/go.sum index cb49bc3..abd45bd 100644 --- a/go.sum +++ b/go.sum @@ -76,6 +76,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= +github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/handlers.go b/handlers.go index 31f8d3e..cb39d83 100644 --- a/handlers.go +++ b/handlers.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "net/http" @@ -9,6 +10,7 @@ import ( "github.com/eensymachines-in/patio/aquacfg" "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" + "github.com/streadway/amqp" "go.mongodb.org/mongo-driver/mongo" ) @@ -100,6 +102,24 @@ func HndlOneDvc(c *gin.Context) { })) return } + val, _ := c.Get("amqp-ch") + amqpCh := val.(*amqp.Channel) + val, _ = c.Get("amqp-conn") + amqpConn := val.(*amqp.Connection) + + defer amqpConn.Close() + defer amqpCh.Close() + byt, _ := json.Marshal(newCfg) + err := amqpCh.Publish("configs_direct", string(deviceDetails.MacID), false, false, amqp.Publishing{ + ContentType: "text/plain", + Body: byt, + }) + if err != nil { + httperr.HttpErrOrOkDispatch(c, httperr.ErrGatewayConnect(fmt.Errorf("failed to send message to amqp server %s", err)), log.WithFields(log.Fields{})) + // since amqp publish and db update should be atomic operation + DevicesCollc(db).PatchConfg(deviceDetails.MacID, *deviceDetails.Cfg, ctx) // reverting the old settings + return + } } else { c.AbortWithStatus(http.StatusMethodNotAllowed) return diff --git a/main.go b/main.go index 923e446..41dc86b 100644 --- a/main.go +++ b/main.go @@ -18,11 +18,14 @@ import ( const ( MONGO_URI_SECRET = "/run/secrets/mongo_uri" + AMQP_URI_SECRET = "/run/secrets/amqp_uri" ) var ( mongoConnectURI string = "" mongoDBName = "" + amqpConnectURI = "" + rabbitQname = "" // name of the rabbit queue ) func init() { @@ -72,6 +75,22 @@ func init() { if mongoDBName == "" { log.Fatal("invalid/empty name for mongo db, cannot proceed") } + + /* Making AMQP connections */ + f, err = os.Open(AMQP_URI_SECRET) + if err != nil || f == nil { + log.Fatalf("failed to open amqp connection uri from secret file %s", err) + } + byt, err = io.ReadAll(f) + if err != nil { + log.Fatalf("failed to read amqp connection uri from secret file %s", err) + } + amqpConnectURI = string(byt) + if amqpConnectURI == "" { + log.Fatal("amqp connect uri is empty, check secret file and rerun application") + } + rabbitQname = os.Getenv("AMQP_QNAME") + } func main() { @@ -93,7 +112,7 @@ func main() { // Patching device details - config or users // ?path=users&action=append // ?path=config - devices.PATCH("/:deviceid", DeviceOfID, HndlOneDvc) + devices.PATCH("/:deviceid", RabbitConnectWithChn(amqpConnectURI, "configs_direct"), DeviceOfID, HndlOneDvc) // Removing a device registration completely devices.DELETE("/:deviceid", HndlOneDvc) diff --git a/mddlwr.go b/mddlwr.go index 53fe84b..e85e741 100644 --- a/mddlwr.go +++ b/mddlwr.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "net/http" + "os" "time" "github.com/eensymachines-in/errx/httperr" "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" + "github.com/streadway/amqp" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" @@ -33,6 +35,55 @@ func CORS(c *gin.Context) { c.AbortWithStatus(http.StatusOK) } } + +func RabbitConnectWithChn(connString, xname string) gin.HandlerFunc { + return func(c *gin.Context) { + conn, err := amqp.Dial(connString) + if err != nil { + httperr.HttpErrOrOkDispatch(c, httperr.ErrGatewayConnect(err), log.WithFields(log.Fields{ + "stack": "RabbitConnectWithChn", + "conn_string": connString, + })) + return + } + // defer conn.Close() + ch, err := conn.Channel() + if err != nil { + httperr.HttpErrOrOkDispatch(c, httperr.ErrGatewayConnect(err), log.WithFields(log.Fields{ + "stack": "RabbitConnectWithChn", + "login": os.Getenv("AMQP_LOGIN"), + "server": os.Getenv("AMQP_SERVER"), + })) + conn.Close() // incase no channel, we close the channel before we exit the stack + return + } + // NOTE: we shall be using a direct exchange with mac id specific routing key + err = ch.ExchangeDeclare( + xname, // name + "direct", // exhange type + true, // durable + false, //auto deleted + false, //internal + false, // nowait + nil, //amqp.table + ) + if err != nil { + httperr.HttpErrOrOkDispatch(c, httperr.ErrGatewayConnect(err), log.WithFields(log.Fields{ + "stack": "RabbitConnectWithChn", + "login": os.Getenv("AMQP_LOGIN"), + "server": os.Getenv("AMQP_SERVER"), + })) + // incase declaring the exchange fails we close the channel and connection on our way out + ch.Close() + conn.Close() + return + } + c.Set("amqp-ch", ch) + c.Set("amqp-conn", conn) + c.Next() + } +} + func MongoConnectURI(uri, dbname string) gin.HandlerFunc { return func(c *gin.Context) { ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) diff --git a/test/apitest.http b/test/apitest.http index 5db613c..852008b 100644 --- a/test/apitest.http +++ b/test/apitest.http @@ -42,7 +42,7 @@ Content-Type: application/json { "tickat": "12:00", - "config": 3, + "config": 0, "interval": 250, "pulsegap": 11