From 629facc9162a809a5b8799ec73f9e860669fb09e Mon Sep 17 00:00:00 2001 From: microrain Date: Wed, 5 Jun 2024 07:00:27 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E5=A2=9E=E5=8A=A0=E7=BD=91=E5=85=B3?= =?UTF-8?q?=E9=BB=98=E8=AE=A4=E4=B8=8B=E5=8F=91=E8=8E=B7=E5=8F=96=E7=BD=91?= =?UTF-8?q?=E5=85=B3=E7=89=88=E6=9C=AC=E4=BF=A1=E6=81=AF=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E3=80=82=E4=BC=98=E5=8C=96=E6=97=A5=E5=BF=97=E8=BE=93=E5=87=BA?= =?UTF-8?q?=E3=80=82=E5=AE=8C=E5=96=84=E8=AF=B4=E6=98=8E=E6=96=87=E6=A1=A3?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 140 +++++++++++++++++++++++++++++++++++++++---- consts/consts.go | 6 +- events/const.go | 6 +- events/pushEvents.go | 36 +++++++++++ gateway.go | 6 +- service.go | 12 ++-- set.go | 8 +-- version/version.go | 13 ++++ 8 files changed, 202 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index cf71de0..a707469 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # SagooIOT 网关基础服务库代码 -开发SagooIOT专属网关时,可以引用此项目,以便快速开发。 +开发SagooIOT专属网关时,可以引用此项目,以便快速开发。创建一个空的工程,按下面的步骤完成自己专属网关的开发。 ## SagooIOT的网关开发说明 @@ -9,12 +9,63 @@ go get -u github.com/sagoo-cloud/iotgateway ``` +## 实现入口程序 + +参考如下: + +```go + +package main + +import ( + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/gctx" + "github.com/gogf/gf/v2/os/glog" + "github.com/sagoo-cloud/iotgateway" + "github.com/sagoo-cloud/iotgateway/version" +) + +// 定义编译时的版本信息 +var ( + BuildVersion = "0.0" + BuildTime = "" + CommitID = "" +) + +func main() { + //初始化日志 + glog.SetDefaultLogger(g.Log()) + //显示版本信息 + version.ShowLogo(BuildVersion, BuildTime, CommitID) + ctx := gctx.GetInitCtx() + + //需要解析的协议,可以根据需要添加,如果不需要实现自定义解析协议,可以不添加,可以为nil + chargeProtocol := protocol.ChargeProtocol{} + + //创建网关 + gateway, err := iotgateway.NewGateway(ctx, chargeProtocol) + if err != nil { + panic(err) + } + //初始化事件 + events.Init() + + // 初始化个性网关需要实现的其它服务 + + //启动网关 + gateway.Start() + +} + + +``` + + ## 实现protocol接口 实现protocol接口处理接收到的数据。在Decode方法中,需要将接收到的数据进行解析,然后返回解析后的数据。在Encode方法中,需要将需要发送的数据进行编码,然后返回编码后的数据。 ```go - type ChargeProtocol struct { } @@ -40,7 +91,7 @@ func (c *ChargeProtocol) Decode(conn net.Conn, buffer []byte) (res []byte, err e ```go - //定义事件返回数据 + //准备事件返回数据 var eventData = make(map[string]interface{}) eventData["XXX字段1"] = "XXX值1" eventData["XXX字段2"] = "XXX值2" @@ -48,14 +99,11 @@ func (c *ChargeProtocol) Decode(conn net.Conn, buffer []byte) (res []byte, err e var eventDataList = make(map[string]interface{}) eventDataList["XXX事件标识字串"] = eventData - - //推送数据到mqtt out := g.Map{ "DeviceKey": deviceKey, "EventDataList": eventDataList, } - - //触发向MQTT服务推送数据事件 + //触发属性上报事件 event.MustFire(consts.PushAttributeDataToMQTT, out) ``` @@ -64,24 +112,43 @@ func (c *ChargeProtocol) Decode(conn net.Conn, buffer []byte) (res []byte, err e 触发的是 `consts.PushAttributeDataToMQTT` 事件 ```go - + //准备上报的数据 var propertieData = make(map[string]interface{}) propertieData["XXX字段1"] = "XXX值1" propertieData["XXX字段2"] = "XXX值2" - //推送数据 out := g.Map{ "DeviceKey": deviceKey, "PropertieDataList": propertieData, } + //触发属性上报事件 event.MustFire(consts.PushAttributeDataToMQTT, out) ``` +## 从SagooIOT平台下发调用回复 + +在SagooIoT系统中向设备端下发有两种情况,1. 服务下发,2. 属性设置下发。 + +### 服务下发 + +如果需要完成SagooIoT端向设备进行服务调用,需要在网关程序中完成订阅服务下发事件。 +触发的是 `consts.PushServiceResDataToMQTT` 事件。 + +一 、在获取到设备key的地方订阅服务下发事件。 + +```go + //订阅网关设备服务下发事件 + iotgateway.ServerGateway.SubscribeServiceEvent(传入获取的设备key) + +``` +二、在对设备进行处理后,需要回复SagooIOT平台。 + 由SagooIOT平台端下发后回复: 触发的是 `consts.PushServiceResDataToMQTT` 事件 ```go + //准备回复数据 var replyData = make(map[string]interface{}) replyData["XXX字段1"] = "XXX值1" replyData["XXX字段2"] = "XXX值1" @@ -89,5 +156,58 @@ func (c *ChargeProtocol) Decode(conn net.Conn, buffer []byte) (res []byte, err e "DeviceKey": deviceKey, "ReplyData": replyData, } + //出发回复的事件 event.MustFire(consts.PushServiceResDataToMQTT, outData) -``` \ No newline at end of file +``` + +### 属性设置下发 + +如果需要完成SagooIoT端向设备进行服务调用,需要在网关程序中完成订阅服务下发事件。 +触发的是 `consts.PropertySetEvent` 事件。 + +一 、在获取到设备key的地方订阅服务下发事件。 + +```go + //订阅网关设备服务下发事件 + iotgateway.ServerGateway.SubscribeSetEvent(传入获取的设备key) + +``` +二、在对设备进行处理后,需要回复SagooIOT平台。 + +由SagooIOT平台端下发后回复: +触发的是 `consts.PushSetResDataToMQTT` 事件 + +```go + //准备回复数据 + var replyData = make(map[string]interface{}) + replyData["XXX字段1"] = "XXX值1" + replyData["XXX字段2"] = "XXX值1" + outData := g.Map{ + "DeviceKey": deviceKey, + "ReplyData": replyData, + } + //出发回复的事件 + event.MustFire(consts.PushSetResDataToMQTT, outData) +``` + +### SagooIoT平台接收到回复的数据处理 + +在SagooIoT平台,对服务下发后,会收到回复数据。需要在对应的功能定义设置输入参数。参数标识与数据类型要与回服务回复的数据保持一致。 + + +## 默认服务下发功能 + +网关中已经有一些默认的服务下发功能。 + +### 获取网关版本信息 + +功能标识:`getGatewayVersion` +功能描述:获取网关版本信息 +功能输入参数:无 +功能输出参数: + +| 参数标识 | 参数名称 | 类型 | +| --------- | -------- | ------ | +| Version | 版本 | string | +| BuildTime | 编译时间 | string | +| CommitID | 提交ID | string | \ No newline at end of file diff --git a/consts/consts.go b/consts/consts.go index d6c6fbc..08547af 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -1,9 +1,9 @@ package consts const ( - PushAttributeDataToMQTT = "PushAttributeDataToMQTT" - PushServiceResDataToMQTT = "PushServiceResDataToMQTT" - PushSetResDataToMQTT = "PushSetResDataToMQTT" + PushAttributeDataToMQTT = "PushAttributeDataToMQTT" //属性上报 + PushServiceResDataToMQTT = "PushServiceResDataToMQTT" //服务调用结果上报 + PushSetResDataToMQTT = "PushSetResDataToMQTT" //属性设置结果上报 NetTypeTcpServer = "tcp" NetTypeMqttServer = "mqtt" diff --git a/events/const.go b/events/const.go index 6e8c6ba..157b13c 100644 --- a/events/const.go +++ b/events/const.go @@ -1,6 +1,8 @@ package events const ( - // PropertySetEvent 属性设置下发事件,SagooIoT平台下发属性设置命令时触发 - PropertySetEvent = "property" + PropertySetEvent = "property" // PropertySetEvent 属性设置下发事件,SagooIoT平台下发属性设置命令时触发 + + GetGatewayVersionEvent = "getGatewayVersion" // ServiceCallEvent 服务调用下发事件,SagooIoT平台下发服务调用getGatewayVersion命令时触发 + GetGatewayConfig = "getGatewayConfig" // ServiceCallEvent 服务调用下发事件,SagooIoT平台下发服务调用getGatewayConfig命令时触发 ) diff --git a/events/pushEvents.go b/events/pushEvents.go index cc63bb1..2b096d0 100644 --- a/events/pushEvents.go +++ b/events/pushEvents.go @@ -4,17 +4,21 @@ import ( "context" "encoding/json" "errors" + "fmt" "github.com/gogf/gf/v2/encoding/gjson" + "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "github.com/gogf/gf/v2/util/guid" "github.com/gookit/event" "github.com/sagoo-cloud/iotgateway/consts" + "github.com/sagoo-cloud/iotgateway/lib" "github.com/sagoo-cloud/iotgateway/log" "github.com/sagoo-cloud/iotgateway/mqttClient" "github.com/sagoo-cloud/iotgateway/mqttProtocol" "github.com/sagoo-cloud/iotgateway/vars" + "github.com/sagoo-cloud/iotgateway/version" ) // LoadingPublishEvent 加载发布事件 @@ -25,6 +29,10 @@ func LoadingPublishEvent() { event.On(consts.PushServiceResDataToMQTT, event.ListenerFunc(pushServiceResDataToMQTT), event.High) //推送设置属性响应数据到mqtt服务事件 event.On(consts.PushSetResDataToMQTT, event.ListenerFunc(pushSetResDataToMQTT), event.High) + + // 服务下发获取网关配置信息事件 + event.On(GetGatewayVersionEvent, event.ListenerFunc(getGatewayVersionData), event.Normal) + } // pushAttributeDataToMQTT 推送属性数据到mqtt服务 @@ -169,3 +177,31 @@ func pushSetResDataToMQTT(e event.Event) (err error) { } return } + +// getGatewayVersionData 获取网关版本信息事件 +func getGatewayVersionData(e event.Event) (err error) { + // 获取设备KEY + ok, deviceKey := lib.GetMapValueForKey(e.Data(), "DeviceKey") + if !ok { + glog.Debug(context.Background(), "获取设备KEY失败") + return fmt.Errorf("获取设备KEY失败: %s", e.Data()) + } + //==== 平台端下发调用 应答==== + ra, err := vars.GetUpMessageMap(deviceKey.(string)) + if err == nil { + if ra.MessageID != "" { + + var rd = make(map[string]interface{}) + rd["Version"] = version.GetVersion() + rd["BuildTime"] = version.GetBuildTime() + rd["CommitID"] = version.CommitID + + outData := g.Map{ + "DeviceKey": deviceKey, + "ReplyData": rd, + } + event.Async(consts.PushServiceResDataToMQTT, outData) + } + } + return +} diff --git a/gateway.go b/gateway.go index ae0b0de..19f02f3 100644 --- a/gateway.go +++ b/gateway.go @@ -94,6 +94,10 @@ func (gw *gateway) Start() { if name == "" { name = "SagooIoT Gateway Server" } + + //订阅网关设备服务下发事件 + gw.SubscribeServiceEvent(gw.options.GatewayServerConfig.DeviceKey) + go gw.heartbeat(gw.options.GatewayServerConfig.Duration) //启动心跳 switch gw.options.GatewayServerConfig.NetType { case consts.NetTypeTcpServer: @@ -151,7 +155,7 @@ func (gw *gateway) sendHeartbeat() { glog.Debugf(context.Background(), "网关向平台发送心跳数据:%s", string(outData)) token := gw.MQTTClient.Publish(topic, 1, false, outData) if token.Error() != nil { - log.Error("publish error: %s", token.Error()) + glog.Errorf(context.Background(), "publish error: %s", token.Error()) } } diff --git a/service.go b/service.go index 3a2916e..0a90539 100644 --- a/service.go +++ b/service.go @@ -1,8 +1,10 @@ package iotgateway import ( + "context" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/util/gconv" "github.com/gookit/event" "github.com/sagoo-cloud/iotgateway/lib" @@ -21,10 +23,10 @@ func (gw *gateway) SubscribeServiceEvent(deviceKey string) { return } topic := fmt.Sprintf(serviceTopic, deviceKey) - log.Debug("topic: ", topic) + glog.Debugf(context.Background(), "%s 设备订阅了服务调用监听topic: %s", deviceKey, topic) token := gw.MQTTClient.Subscribe(topic, 1, onServiceMessage) if token.Error() != nil { - log.Debug("subscribe error: ", token.Error()) + glog.Debug(context.Background(), "subscribe error: ", token.Error()) } } @@ -43,12 +45,12 @@ var onServiceMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Mes //通过监听到的topic地址获取设备标识 deviceKey := lib.GetTopicInfo("deviceKey", msg.Topic()) var data = mqttProtocol.ServiceCallRequest{} - log.Debug("==111==收到服务下发的topic====", msg.Topic()) - log.Debug("====收到服务下发的信息====", msg.Payload()) + glog.Debug(context.Background(), "接收到服务下发的topic:", msg.Topic()) + glog.Debug(context.Background(), "接收到服务下发的数据:", msg.Payload()) err := gconv.Scan(msg.Payload(), &data) if err != nil { - log.Debug("解析服务功能数据出错: %s", err) + glog.Debug(context.Background(), "解析服务功能数据出错: %s", err) return } diff --git a/set.go b/set.go index 0ca1324..483fcc7 100644 --- a/set.go +++ b/set.go @@ -23,10 +23,10 @@ func (gw *gateway) SubscribeSetEvent(deviceKey string) { return } topic := fmt.Sprintf(setTopic, deviceKey) - log.Debug("topic: ", topic) + glog.Debugf(context.Background(), "%s 设备订阅了属性设置监听topic: %s", deviceKey, topic) token := gw.MQTTClient.Subscribe(topic, 1, onSetMessage) if token.Error() != nil { - log.Debug("subscribe error: ", token.Error()) + glog.Debug(context.Background(), "subscribe error: ", token.Error()) } } @@ -42,8 +42,8 @@ var onSetMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message //通过监听到的topic地址获取设备标识 deviceKey := lib.GetTopicInfo("deviceKey", msg.Topic()) var data = mqttProtocol.ServiceCallRequest{} - glog.Debug(ctx, "==111==收到属性设置下发的topic====", msg.Topic()) - glog.Debug(ctx, "====收到属性设置下发的信息====", msg.Payload()) + glog.Debug(ctx, "接收到属性设置下发的topic:", msg.Topic()) + glog.Debug(ctx, "接收收到属性设置下发的数据:", msg.Payload()) err := gconv.Scan(msg.Payload(), &data) if err != nil { diff --git a/version/version.go b/version/version.go index 6878967..fca052b 100644 --- a/version/version.go +++ b/version/version.go @@ -4,10 +4,14 @@ import "fmt" var ( BuildVersion string + BuildTime string + CommitID string ) func ShowLogo(buildVersion, buildTime, commitID string) { BuildVersion = buildVersion + BuildTime = buildTime + CommitID = commitID //版本号 //fmt.Println(" _____ \n / ____| \n | (___ __ _ __ _ ___ ___ \n \\___ \\ / _` |/ _` |/ _ \\ / _ \\ \n ____) | (_| | (_| | (_) | (_) |\n |_____/ \\__,_|\\__, |\\___/ \\___/ \n __/ | \n |___/ ") @@ -18,5 +22,14 @@ func ShowLogo(buildVersion, buildTime, commitID string) { fmt.Println("") } func GetVersion() string { + if BuildVersion == "" { + BuildVersion = "0.0" + } return BuildVersion } +func GetBuildTime() string { + return BuildTime +} +func GetCommitID() string { + return CommitID +}