Skip to content

Latest commit

 

History

History
86 lines (72 loc) · 4.67 KB

RRPC同步通信.md

File metadata and controls

86 lines (72 loc) · 4.67 KB

RRPC同步通信

RRPC同步通信简介

因 MQTT 协议基于发布/订阅的异步通信模式,服务器控制设备后,将无法同步感知设备的返回结果。为解决此问题,物联网通信平台利用 RRPC(Revert RPC)实现同步通信机制。请参考 RRPC 通信

通信原理

  • 订阅消息Topic: $rrpc/rxd/{productID}/{deviceName}/+ 用于订阅云端下发(下行)的 RRPC 请求消息。
  • 请求消息Topic: $rrpc/rxd/{productID}/{deviceName}/{processID} 用于云端发布(下行)RRPC 请求消息。
  • 应答消息Topic: $rrpc/txd/{productID}/{deviceName}/{processID} 用于发布(上行)RRPC 应答消息。

通信流程

  1. 设备端订阅 RRPC 订阅消息 Topic。
  2. 服务器通过调用 PublishRRPCMessage 接口发布 RRPC 请求消息。
  3. 设备端接收到消息之后截取请求消息 Topic 中云端下发的 processID,设备将应答消息 Topic 的 processID 设置为截取的 processID,并向应答消息 Topic 发布设备的返回消息 。
  4. 物联网通信平台接收到设备端返回消息之后,根据 processID 对消息进行匹配并将设备返回消息发送给服务器。

! RRPC请求4s超时,即4s内设备端没有应答就认为请求超时。

运行示例程序进行 RRPC 同步通信

步骤一:在设备中订阅 RRPC 消息 Topic

运行 MqttSampleTest.java 的main函数,设备上线后调用subscribeRRPCTopic(),进行 通信原理 中的订阅消息Topic。示例代码如下:

private static void subscribeRRPCTopic() {
    try {
        Thread.sleep(2000);
        // 用户上下文(请求实例)
        MQTTRequest mqttRequest = new MQTTRequest("subscribeTopic", requestID.getAndIncrement());
        // 订阅主题
        mqttconnection.subscribeRRPCTopic(TXMqttConstants.QOS0, mqttRequest);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

以下是设备成功订阅 RRPC 消息 Topic 的日志

17/03/2021 00:16:34,000 [main] INFO  TXMqttConnection subscribe 749  - Starting subscribe topic: $rrpc/rxd/XTV06F9MX4/test_111/+
17/03/2021 00:16:34,032 [MQTT Call: XTV06F9MX4test_111] DEBUG MqttSampleTest onSubscribeCompleted 263  - onSubscribeCompleted, status[OK], topics[[$rrpc/rxd/XTV06F9MX4/test_111/+]], userContext[], errMsg[subscribe success]

步骤二:调用云 API PublishRRPCMessage 发送 RRPC 请求消息。 打开腾讯云 API控制台,填写个人密钥和设备参数信息,选择在线调用并发送请求。

步骤三:观察设备端接收到发布 RRPC 请求消息Logcat日志,获取 processID

17/03/2021 00:22:19,769 [MQTT Call: XTV06F9MX4test_111] INFO  TXMqttConnection messageArrived 1129  - Received topic: $rrpc/rxd/XTV06F9MX4/test_111/27041, id: 0, message: hello

以上日志为 设备端成功接收到发布 RRPC 请求消息,其中可以观察到此时 processID 27041。

步骤四:设备将应答消息 Topic 的 processID 设置为截取的 processID,并向应答消息 Topic 发布设备的返回消息。

SDK 内部 TXMqttConnection 类中,成功接收到发布的 RRPC 请求消息会发布应答消息。

/**
 * 收到MQTT消息
 *
 * @param topic   消息主题
 * @param message 消息内容结构体
 * @throws Exception
 */
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    if (topic != null && topic.contains("rrpc/rxd")) {
        String[] items = topic.split("/");
        String processId = items[items.length-1];
        Map<String, String> replyMessage = new HashMap<>();
        publishRRPCToCloud(null, processId, replyMessage);
    }
}

以下是成功发送 RRPC 应答消息 Topic 的日志

17/03/2021 00:22:19,770 [MQTT Call: XTV06F9MX4test_111] INFO  TXMqttConnection publish 567  - Starting publish topic: $rrpc/txd/XTV06F9MX4/test_111/27041 Message: {"test-key":"test-value"}
17/03/2021 00:22:19,771 [MQTT Call: XTV06F9MX4test_111] DEBUG MqttSampleTest onPublishCompleted 251  - onPublishCompleted, status[OK], topics[[$rrpc/txd/XTV06F9MX4/test_111/27041]],  userContext[], errMsg[publish success]