Skip to content

Latest commit

 

History

History
executable file
·
374 lines (320 loc) · 20.4 KB

设备影子.md

File metadata and controls

executable file
·
374 lines (320 loc) · 20.4 KB

设备影子

设备影子简介

设备影子本质上是一份在服务器端缓存的设备数据(JSON 形式),为设备缓存的一份状态和配置数据。请参考官网 设备影子详情 设备影子数据流

作为中介,设备影子可以有效实现设备和用户应用之间的数据双向同步:

  • 对于设备配置,用户应用不需要直接修改设备,只需要修改服务器端的设备影子,由设备影子同步到设备。即使当时设备不在线,设备上线后仍能从设备影子同步到最新配置。
  • 对于设备状态,设备将状态上报到设备影子,用户应用查询时,只需查询设备影子即可。这样可以有效减少设备和服务器端的网络交互,尤其是低功耗设备。

填写认证连接设备的参数

示例中编辑 unit_test_config.json 文件中的参数配置信息

{
  "TESTSHADOWSAMPLE_PRODUCT_ID":         "",
  "TESTSHADOWSAMPLE_DEVICE_NAME":        "",
  "TESTSHADOWSAMPLE_DEVICE_PSK":         ""
}

以密钥认证方式为例,需要在 unit_test_config.json 填写 TESTSHADOWSAMPLE_PRODUCT_ID(产品ID)、TESTSHADOWSAMPLE_DEVICE_NAME(设备名称)、TESTSHADOWSAMPLE_DEVICE_PSK(设备密钥)。

如果使用的是证书认证方式,除了需要在 unit_test_config.json 填写 TESTSHADOWSAMPLE_PRODUCT_ID(产品ID)、TESTSHADOWSAMPLE_DEVICE_NAME(设备名称),还需在 ShadowSampleTest.java 将mDevPSK(设备密钥)设置为null之外,还需将证书和私钥放到 resources文件夹中,填写mCertFilePath (设备证书文件名称)、mPrivKeyFilePath(设备私钥文件名称)。

运行示例程序体验设备影子连接 IoT 云端

运行 ShadowSampleTest.java 的main函数。将设备进行认证连接到云端。示例代码如下:

public static void main(String[] args) {
    ...
    // init connection
    MqttConnectOptions options = new MqttConnectOptions();
    options.setConnectionTimeout(8);
    options.setKeepAliveInterval(60);
    options.setAutomaticReconnect(true);

    if (mDevPSK != null) {

    } else {
        String workDir = System.getProperty("user.dir") + "/hub/hub-device-java/src/test/resources/";
        options.setSocketFactory(AsymcSslUtils.getSocketFactoryByFile(workDir + mCertFilePath, workDir + mPrivKeyFilePath));
    }
    mShadowConnection = new TXShadowConnection(mProductID, mDevName, mDevPSK, new callback());
    mShadowConnection.connect(options, null);
}

以下是设备成功上线并订阅设备影子 Topic 的logcat日志,在控制台中可查看创建的 gateway1 设备的状态已更新为上线。

15/03/2021 20:01:07,918 [main] INFO  TXMqttConnection connect 348  - Start connecting to ssl://DVSVXI409C.iotcloud.tencentdevices.com:8883
15/03/2021 20:01:08,456 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onConnectCompleted 677  - onConnectCompleted, status[OK], reconnect[false], msg[connected to ssl://DVSVXI409C.iotcloud.tencentdevices.com:8883]
15/03/2021 20:01:08,456 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onConnectCompleted 679  - ******subscribe topic:$shadow/operation/result/DVSVXI409C/cert_test_1
15/03/2021 20:01:08,456 [MQTT Call: DVSVXI409Ccert_test_1] INFO  TXMqttConnection subscribe 684  - Starting subscribe topic: $shadow/operation/result/DVSVXI409C/cert_test_1
15/03/2021 20:01:08,471 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onSubscribeCompleted 719  - onSubscribeCompleted, status[OK], errMsg[subscribe success], topics[[$shadow/operation/result/DVSVXI409C/cert_test_1]]

对初始化TXShadowConnection传入的TXShadowActionCallBack为设备行为的回调说明如下:

/**
 * MQTT Connect完成回调
 *
 * @param status        Status.OK: 连接成功; Status.ERROR: 连接失败
 * @param reconnect     true: 重新连接      false: 首次连接
 * @param userContext   用户上下文
 * @param msg           连接信息
 */
public void onConnectCompleted(Status status, boolean reconnect, Object userContext, String msg) {}
    
/**
 * MQTT连接断开回调
 *
 * @param cause       连接断开原因
 */
public void onConnectionLost(Throwable cause) {}

/**
 * 文档请求响应的回调接口
 *
 * @param type 文档操作方式, get/update/delete
 * @param result 请求响应结果, 0: 成功;非0:失败
 * @param jsonDocument   云端返回的json文档
 */
public void onRequestCallback(String type, int result, String jsonDocument) {}

/**
 * 设备属性更新回调接口
 *
 * @param propertyJSONDocument 从云端收到的原始设备属性json文档
 * @param propertyList   更新后的设备属性集
 */
public void onDevicePropertyCallback(String propertyJSONDocument, List<? extends DeviceProperty> propertyList) {}

/**
 * 收到来自云端的消息
 *
 * @param topic   主题名称
 * @param message 消息内容
 */
public void onMessageReceived(String topic, MqttMessage message) {}

/**
 * 发布消息完成回调
 *
 * @param status        Status.OK: 发布消息成功; Status.ERROR: 发布消息失败
 * @param token         消息token,包含消息内容结构体
 * @param userContext   用户上下文
 * @param msg           详细信息
 */
public void onPublishCompleted(Status status, IMqttToken token, Object userContext, String msg) {}

/**
 * 订阅主题完成回调
 *
 * @param status           Status.OK: 订阅成功; Status.ERROR: 订阅失败
 * @param token            消息token,包含消息内容结构体
 * @param userContext      用户上下文
 * @param msg              详细信息
 */
public void onSubscribeCompleted(Status status, IMqttToken token, Object userContext, String msg) {}

/**
 * 取消订阅主题完成回调
 *
 * @param status           Status.OK: 取消订阅成功; Status.ERROR: 取消订阅失败
 * @param token            消息token,包含消息内容结构体
 * @param userContext      用户上下文
 * @param msg              详细信息
 */
public void onUnSubscribeCompleted(Status status, IMqttToken token, Object userContext, String msg) {}

可通过TXShadowConnection getConnectStatus()API获取设备连接状态

enum ConnectStatus {
    kConnectIdle,  //初始状态
    kConnecting,   // 连接中
    kConnected,    // 连接上/上线
    kConnectFailed,// 连接失败
    kDisconnected  // 已断开连接
}

体验设备影子断开连接

运行 ShadowSampleTest.java 的main函数,设备上线后调用closeConnect()。示例代码如下:

private static void closeConnect() {
    try {
        Thread.sleep(2000);
        mShadowConnection.disConnect(null);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

以下是设备成功 取消订阅设备影子 Topic 并下线的logcat日志,在控制台中可查看创建的 gateway1 设备的状态已更新为离线。

15/03/2021 20:07:20,405 [main] INFO  TXMqttConnection unSubscribe 722  - Starting unSubscribe topic: $shadow/operation/result/DVSVXI409C/cert_test_1
15/03/2021 20:07:20,421 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onUnSubscribeCompleted 738  - onUnSubscribeCompleted, status[OK], errMsg[unsubscribe success], topics[[$shadow/operation/result/DVSVXI409C/cert_test_1]]
15/03/2021 20:07:20,423 [MQTT Disc: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onDisconnectCompleted 694  - onDisconnectCompleted, status[OK], msg[disconnected to ssl://DVSVXI409C.iotcloud.tencentdevices.com:8883]

体验注册设备属性

运行 ShadowSampleTest.java 的main函数,设备上线后调用registerProperty()。会创建 DeviceProperty 属性实例并将其添加到属性数组中,等待被上传更新。示例代码如下:

private static void registerProperty() {
    try {
        Thread.sleep(2000);
        DeviceProperty deviceProperty1 = new DeviceProperty();
        deviceProperty1.key("updateCount").data(String.valueOf(mUpdateCount.getAndIncrement())).dataType(TXShadowConstants.JSONDataType.INT);
        mShadowConnection.registerProperty(deviceProperty1);

        DeviceProperty deviceProperty2 = new DeviceProperty();
        deviceProperty2.key("temperatureDesire").data(String.valueOf(mTemperatureDesire.getAndIncrement())).dataType(TXShadowConstants.JSONDataType.INT);
        mShadowConnection.registerProperty(deviceProperty2);

        mDevicePropertyList.add(deviceProperty1);
        mDevicePropertyList.add(deviceProperty2);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

以上方法会在 ShadowSampleTest 维护一个 mDevicePropertyList 装着 DeviceProperty(设备属性)的 List ,当更新设备影子时会将 DeviceProperty(设备属性) 更新到云端的设备影子 json 中。

体验定时更新设备影子

运行 ShadowSampleTest.java 的main函数,设备上线后调用update()。在示例程序中会每隔10秒钟,更新设备属性信息。示例代码如下:

private static void update() {
    try {
        while(true) {
            Thread.sleep(10000);

            for (DeviceProperty deviceProperty : mDevicePropertyList) {
                if ("updateCount".equals(deviceProperty.mKey)) {
                    deviceProperty.data(String.valueOf(mUpdateCount.getAndIncrement()));
                } else if ("temperatureDesire".equals(deviceProperty.mKey)) {
                    deviceProperty.data(String.valueOf(mTemperatureDesire.getAndIncrement()));
                }
            }

            LOG.info("update device property");
            mShadowConnection.update(mDevicePropertyList, null);
        }
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

如果上一步点击了注册设备属性按钮 ,则会把注册的属性信息更新到设备影子 json 中,观察Logcat日志。

16/03/2021 09:33:58,028 [main] DEBUG TXShadowConnection publish 409  - ******publish message id:18197
16/03/2021 09:33:58,028 [main] INFO  TXMqttConnection publish 502  - Starting publish topic: $shadow/operation/DVSVXI409C/cert_test_1 Message: {"clientToken":"DVSVXI409Ccert_test_1-0","state":{"reported":{"updateCount":1,"temperatureDesire":21}},"type":"update","version":0}
16/03/2021 09:33:58,029 [MQTT Call: DVSVXI409Ccert_test_1] INFO  TXMqttConnection deliveryComplete 965  - deliveryComplete, token.getMessageId:2
16/03/2021 09:33:58,029 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onPublishCompleted 708  - onPublishCompleted, status[OK], errMsg[publish success], topics[[$shadow/operation/DVSVXI409C/cert_test_1]]
16/03/2021 09:33:58,058 [MQTT Call: DVSVXI409Ccert_test_1] INFO  TXMqttConnection messageArrived 941  - Received topic: $shadow/operation/result/DVSVXI409C/cert_test_1, id: 0, message: {"clientToken":"DVSVXI409Ccert_test_1-0","payload":{"state":{"reported":{"temperatureDesire":21,"updateCount":1}},"timestamp":1615858438033,"version":0},"result":0,"timestamp":1615858438033,"type":"update"}
16/03/2021 09:33:58,060 [MQTT Call: DVSVXI409Ccert_test_1] INFO  ShadowSampleTest onRequestCallback 211  - onRequestCallback, type[update], result[0], document[{"state":{"reported":{"updateCount":1,"temperatureDesire":21}},"version":0,"timestamp":1615858438033}]
16/03/2021 09:33:58,060 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onMessageReceived 788  - ******update local mDocumentVersion to 0
...

从以上日志可以看出点击定时更新设备影子时,会将注册的属性信息先发布成一条带有 type 为 update 的 Topic 消息,发布消息成功回调后,由于 运行示例程序体验设备影子连接IoT云端 时,订阅过 $shadow/operation/result/${productId}/${deviceName} Topic ,所以会收到带有设备属性的订阅的消息,同时更新本地 version ,用来判断消息中的 version 是否与设备影子服务端中的 version 一致。如果一致,则设备影子服务端执行更新设备影子流程。

体验获取设备文档

运行 ShadowSampleTest.java 的main函数,设备上线后调用getDeviceDocument(),就会把设备影子最新的文档拉取下来。示例代码如下:

private static void getDeviceDocument() {
    try {
        Thread.sleep(2000);
        mShadowConnection.get(null);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

观察Logcat日志。

16/03/2021 09:38:26,216 [main] INFO  TXMqttConnection publish 502  - Starting publish topic: $shadow/operation/DVSVXI409C/cert_test_1 Message: {"clientToken":"DVSVXI409Ccert_test_1-0","type":"get"}
16/03/2021 09:38:26,216 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onPublishCompleted 708  - onPublishCompleted, status[OK], errMsg[publish success], topics[[$shadow/operation/DVSVXI409C/cert_test_1]]
16/03/2021 09:38:26,239 [MQTT Call: DVSVXI409Ccert_test_1] INFO  TXMqttConnection messageArrived 941  - Received topic: $shadow/operation/result/DVSVXI409C/cert_test_1, id: 0, message: {"clientToken":"DVSVXI409Ccert_test_1-0","payload":{"state":{"reported":{"temperatureDesire":47,"updateCount":27}},"timestamp":1615858698132,"version":26},"result":0,"timestamp":1615858706,"type":"get"}
16/03/2021 09:38:26,241 [MQTT Call: DVSVXI409Ccert_test_1] INFO  ShadowSampleTest onRequestCallback 213  - onRequestCallback, type[get], result[0], document[{"state":{"reported":{"updateCount":27,"temperatureDesire":47}},"version":26,"timestamp":1615858698132}]
16/03/2021 09:38:26,242 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onMessageReceived 788  - ******update local mDocumentVersion to 26

从以上日志可以看出,点击获取设备文档时,会发布一条带有 type 为 get 的 Topic 消息,由于 运行示例程序体验设备影子连接IoT云端 时,订阅过 $shadow/operation/result/${productId}/${deviceName} Topic ,所以会收到设备影子最新的文档订阅的消息,在控制台中查看最新的设备影子文档可发现,和拉取得到的文档是一致的。

体验订阅主题

运行示例程序前,需要把将要订阅的 Topic 主题配置在 app-config.json 文件中的SHADOW_TEST_TOPIC(Topic权限),Topic的生成请参考 基于TCP的MQTT设备接入 控制台创建设备 中权限的使用。

运行 ShadowSampleTest.java 的main函数,设备上线后调用subscribeTopic(),订阅设备主题。示例代码如下:

private static void subscribeTopic() {
    try {
        Thread.sleep(2000);
        // QOS等级
        int qos = TXMqttConstants.QOS1;
        // 用户上下文(请求实例)
        MQTTRequest mqttRequest = new MQTTRequest("subscribeTopic", requestID.getAndIncrement());
        LOG.debug("Start to subscribe" + mTestTopic);
        // 调用TXShadowConnection的subscribe方法订阅主题
        mShadowConnection.subcribe(mTestTopic, qos, mqttRequest);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

以下是设备成功订阅主题的logcat日志

16/03/2021 09:40:21,269 [main] INFO  TXMqttConnection subscribe 684  - Starting subscribe topic: DVSVXI409C/cert_test_1/data
16/03/2021 09:40:21,284 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onSubscribeCompleted 723  - onSubscribeCompleted, status[OK], errMsg[subscribe success], topics[[DVSVXI409C/cert_test_1/data]]

体验取消订阅主题

设备之前订阅过的主题,可以取消订阅。

运行 ShadowSampleTest.java 的main函数,设备上线后调用unSubscribeTopic(),取消订阅。示例代码如下:

private static void unSubscribeTopic() {
    try {
        Thread.sleep(2000);
        // 用户上下文(请求实例)
        MQTTRequest mqttRequest = new MQTTRequest("unSubscribeTopic", requestID.getAndIncrement());
        LOG.debug("Start to unSubscribe" + mTestTopic);
        // 取消订阅主题
        mShadowConnection.unSubscribe(mTestTopic, mqttRequest);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

以下是设备成功取消订阅的logcat日志

16/03/2021 09:42:42,628 [main] INFO  TXMqttConnection unSubscribe 722  - Starting unSubscribe topic: DVSVXI409C/cert_test_1/data
16/03/2021 09:42:42,640 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onUnSubscribeCompleted 742  - onUnSubscribeCompleted, status[OK], errMsg[unsubscribe success], topics[[DVSVXI409C/cert_test_1/data]]

体验发布主题

运行示例程序前,需要把将要发布的 Topic 主题配置在 ShadowSampleTest.java 中的mTestTopic(Topic权限),Topic的生成请参考 基于TCP的MQTT设备接入 控制台创建设备 中权限的使用。

运行 ShadowSampleTest.java 的main函数,设备上线后调用publishTopic(),发布设备主题。示例代码如下:

private static void publishTopic() {
    try {
        Thread.sleep(2000);
        // 要发布的数据
        Map<String, String> data = new HashMap<String, String>();
        // 车辆类型
        data.put("car_type", "suv");
        // 车辆油耗
        data.put("oil_consumption", "6.6");
        // 车辆最高速度
        data.put("maximum_speed", "205");

        // MQTT消息
        MqttMessage message = new MqttMessage();

        JSONObject jsonObject = new JSONObject();
        try {
            for (Map.Entry<String, String> entrys : data.entrySet()) {
                jsonObject.put(entrys.getKey(), entrys.getValue());
            }
        } catch (JSONException e) {
            LOG.error("pack json data failed!" + e.getMessage());
        }
        message.setQos(TXMqttConstants.QOS1);
        message.setPayload(jsonObject.toString().getBytes());

        // 用户上下文(请求实例)
        MQTTRequest mqttRequest = new MQTTRequest("publishTopic", requestID.getAndIncrement());

        LOG.debug("pub topic " + mTestTopic + message);
        // 发布主题
        mShadowConnection.publish(mTestTopic, message, mqttRequest);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

以下是成功发布主题的logcat日志。

16/03/2021 09:45:46,807 [main] INFO  TXMqttConnection publish 502  - Starting publish topic: DVSVXI409C/cert_test_1/data Message: {"oil_consumption":"6.6","maximum_speed":"205","car_type":"suv"}
16/03/2021 09:45:46,817 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG ShadowSampleTest onPublishCompleted 259  - onPublishCompleted, status[OK], topics[[DVSVXI409C/cert_test_1/data]],  userContext[], errMsg[publish success]