因 MQTT 协议基于发布/订阅的异步通信模式,服务器控制设备后,将无法同步感知设备的返回结果。为解决此问题,物联网通信平台利用 RRPC(Revert RPC)实现同步通信机制。请参考 RRPC 通信
- 订阅消息Topic:
$rrpc/rxd/{productID}/{deviceName}/+
用于订阅云端下发(下行)的 RRPC 请求消息。 - 请求消息Topic:
$rrpc/rxd/{productID}/{deviceName}/{processID}
用于云端发布(下行)RRPC 请求消息。 - 应答消息Topic:
$rrpc/txd/{productID}/{deviceName}/{processID}
用于发布(上行)RRPC 应答消息。
- 设备端订阅 RRPC 订阅消息 Topic。
- 服务器通过调用 PublishRRPCMessage 接口发布 RRPC 请求消息。
- 设备端接收到消息之后截取请求消息 Topic 中云端下发的 processID,设备将应答消息 Topic 的 processID 设置为截取的 processID,并向应答消息 Topic 发布设备的返回消息 。
- 物联网通信平台接收到设备端返回消息之后,根据 processID 对消息进行匹配并将设备返回消息发送给服务器。
! RRPC请求4s超时,即4s内设备端没有应答就认为请求超时。
步骤一:在设备中订阅 RRPC 消息 Topic
运行 MqttSampleTest.java 的main函数,设备上线后调用subscribeRRPCTopic(),进行 通信原理 中的订阅消息Topic。示例代码如下:
private static void subscribeRRPCTopic() {
try {
Thread.sleep(2000);
// 用户上下文(请求实例)
MQTTRequest mqttRequest = new MQTTRequest("subscribeTopic", requestID.getAndIncrement());
// 订阅主题
mqttconnection.subscribeRRPCTopic(TXMqttConstants.QOS0, mqttRequest);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
以下是设备成功订阅 RRPC 消息 Topic 的日志
17/03/2021 00:16:34,000 [main] INFO TXMqttConnection subscribe 749 - Starting subscribe topic: $rrpc/rxd/XTV06F9MX4/test_111/+
17/03/2021 00:16:34,032 [MQTT Call: XTV06F9MX4test_111] DEBUG MqttSampleTest onSubscribeCompleted 263 - onSubscribeCompleted, status[OK], topics[[$rrpc/rxd/XTV06F9MX4/test_111/+]], userContext[], errMsg[subscribe success]
步骤二:调用云 API PublishRRPCMessage 发送 RRPC 请求消息。 打开腾讯云 API控制台,填写个人密钥和设备参数信息,选择在线调用并发送请求。
步骤三:观察设备端接收到发布 RRPC 请求消息Logcat日志,获取 processID
17/03/2021 00:22:19,769 [MQTT Call: XTV06F9MX4test_111] INFO TXMqttConnection messageArrived 1129 - Received topic: $rrpc/rxd/XTV06F9MX4/test_111/27041, id: 0, message: hello
以上日志为 设备端成功接收到发布 RRPC 请求消息,其中可以观察到此时 processID 27041。
步骤四:设备将应答消息 Topic 的 processID 设置为截取的 processID,并向应答消息 Topic 发布设备的返回消息。
SDK 内部 TXMqttConnection 类中,成功接收到发布的 RRPC 请求消息会发布应答消息。
/**
* 收到MQTT消息
*
* @param topic 消息主题
* @param message 消息内容结构体
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
if (topic != null && topic.contains("rrpc/rxd")) {
String[] items = topic.split("/");
String processId = items[items.length-1];
Map<String, String> replyMessage = new HashMap<>();
publishRRPCToCloud(null, processId, replyMessage);
}
}
以下是成功发送 RRPC 应答消息 Topic 的日志
17/03/2021 00:22:19,770 [MQTT Call: XTV06F9MX4test_111] INFO TXMqttConnection publish 567 - Starting publish topic: $rrpc/txd/XTV06F9MX4/test_111/27041 Message: {"test-key":"test-value"}
17/03/2021 00:22:19,771 [MQTT Call: XTV06F9MX4test_111] DEBUG MqttSampleTest onPublishCompleted 251 - onPublishCompleted, status[OK], topics[[$rrpc/txd/XTV06F9MX4/test_111/27041]], userContext[], errMsg[publish success]