设备影子本质上是一份在服务器端缓存的设备数据(JSON 形式),为设备缓存的一份状态和配置数据。请参考官网 设备影子详情 设备影子数据流
作为中介,设备影子可以有效实现设备和用户应用之间的数据双向同步:
- 对于设备配置,用户应用不需要直接修改设备,只需要修改服务器端的设备影子,由设备影子同步到设备。即使当时设备不在线,设备上线后仍能从设备影子同步到最新配置。
- 对于设备状态,设备将状态上报到设备影子,用户应用查询时,只需查询设备影子即可。这样可以有效减少设备和服务器端的网络交互,尤其是低功耗设备。
示例中编辑 unit_test_config.json 文件中的参数配置信息
{
"TESTSHADOWSAMPLE_PRODUCT_ID": "",
"TESTSHADOWSAMPLE_DEVICE_NAME": "",
"TESTSHADOWSAMPLE_DEVICE_PSK": ""
}
以密钥认证方式为例,需要在 unit_test_config.json 填写 TESTSHADOWSAMPLE_PRODUCT_ID(产品ID)、TESTSHADOWSAMPLE_DEVICE_NAME(设备名称)、TESTSHADOWSAMPLE_DEVICE_PSK(设备密钥)。
如果使用的是证书认证方式,除了需要在 unit_test_config.json 填写 TESTSHADOWSAMPLE_PRODUCT_ID(产品ID)、TESTSHADOWSAMPLE_DEVICE_NAME(设备名称),还需在 ShadowSampleTest.java 将mDevPSK(设备密钥)设置为null之外,还需将证书和私钥放到 resources文件夹中,填写mCertFilePath (设备证书文件名称)、mPrivKeyFilePath(设备私钥文件名称)。
运行 ShadowSampleTest.java 的main函数。将设备进行认证连接到云端。示例代码如下:
public static void main(String[] args) {
...
// init connection
MqttConnectOptions options = new MqttConnectOptions();
options.setConnectionTimeout(8);
options.setKeepAliveInterval(60);
options.setAutomaticReconnect(true);
if (mDevPSK != null) {
} else {
String workDir = System.getProperty("user.dir") + "/hub/hub-device-java/src/test/resources/";
options.setSocketFactory(AsymcSslUtils.getSocketFactoryByFile(workDir + mCertFilePath, workDir + mPrivKeyFilePath));
}
mShadowConnection = new TXShadowConnection(mProductID, mDevName, mDevPSK, new callback());
mShadowConnection.connect(options, null);
}
以下是设备成功上线并订阅设备影子 Topic 的logcat日志,在控制台中可查看创建的 gateway1 设备的状态已更新为上线。
15/03/2021 20:01:07,918 [main] INFO TXMqttConnection connect 348 - Start connecting to ssl://DVSVXI409C.iotcloud.tencentdevices.com:8883
15/03/2021 20:01:08,456 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onConnectCompleted 677 - onConnectCompleted, status[OK], reconnect[false], msg[connected to ssl://DVSVXI409C.iotcloud.tencentdevices.com:8883]
15/03/2021 20:01:08,456 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onConnectCompleted 679 - ******subscribe topic:$shadow/operation/result/DVSVXI409C/cert_test_1
15/03/2021 20:01:08,456 [MQTT Call: DVSVXI409Ccert_test_1] INFO TXMqttConnection subscribe 684 - Starting subscribe topic: $shadow/operation/result/DVSVXI409C/cert_test_1
15/03/2021 20:01:08,471 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onSubscribeCompleted 719 - onSubscribeCompleted, status[OK], errMsg[subscribe success], topics[[$shadow/operation/result/DVSVXI409C/cert_test_1]]
对初始化TXShadowConnection传入的TXShadowActionCallBack为设备行为的回调说明如下:
/**
* MQTT Connect完成回调
*
* @param status Status.OK: 连接成功; Status.ERROR: 连接失败
* @param reconnect true: 重新连接 false: 首次连接
* @param userContext 用户上下文
* @param msg 连接信息
*/
public void onConnectCompleted(Status status, boolean reconnect, Object userContext, String msg) {}
/**
* MQTT连接断开回调
*
* @param cause 连接断开原因
*/
public void onConnectionLost(Throwable cause) {}
/**
* 文档请求响应的回调接口
*
* @param type 文档操作方式, get/update/delete
* @param result 请求响应结果, 0: 成功;非0:失败
* @param jsonDocument 云端返回的json文档
*/
public void onRequestCallback(String type, int result, String jsonDocument) {}
/**
* 设备属性更新回调接口
*
* @param propertyJSONDocument 从云端收到的原始设备属性json文档
* @param propertyList 更新后的设备属性集
*/
public void onDevicePropertyCallback(String propertyJSONDocument, List<? extends DeviceProperty> propertyList) {}
/**
* 收到来自云端的消息
*
* @param topic 主题名称
* @param message 消息内容
*/
public void onMessageReceived(String topic, MqttMessage message) {}
/**
* 发布消息完成回调
*
* @param status Status.OK: 发布消息成功; Status.ERROR: 发布消息失败
* @param token 消息token,包含消息内容结构体
* @param userContext 用户上下文
* @param msg 详细信息
*/
public void onPublishCompleted(Status status, IMqttToken token, Object userContext, String msg) {}
/**
* 订阅主题完成回调
*
* @param status Status.OK: 订阅成功; Status.ERROR: 订阅失败
* @param token 消息token,包含消息内容结构体
* @param userContext 用户上下文
* @param msg 详细信息
*/
public void onSubscribeCompleted(Status status, IMqttToken token, Object userContext, String msg) {}
/**
* 取消订阅主题完成回调
*
* @param status Status.OK: 取消订阅成功; Status.ERROR: 取消订阅失败
* @param token 消息token,包含消息内容结构体
* @param userContext 用户上下文
* @param msg 详细信息
*/
public void onUnSubscribeCompleted(Status status, IMqttToken token, Object userContext, String msg) {}
可通过TXShadowConnection getConnectStatus()API获取设备连接状态
enum ConnectStatus {
kConnectIdle, //初始状态
kConnecting, // 连接中
kConnected, // 连接上/上线
kConnectFailed,// 连接失败
kDisconnected // 已断开连接
}
运行 ShadowSampleTest.java 的main函数,设备上线后调用closeConnect()。示例代码如下:
private static void closeConnect() {
try {
Thread.sleep(2000);
mShadowConnection.disConnect(null);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
以下是设备成功 取消订阅设备影子 Topic 并下线的logcat日志,在控制台中可查看创建的 gateway1 设备的状态已更新为离线。
15/03/2021 20:07:20,405 [main] INFO TXMqttConnection unSubscribe 722 - Starting unSubscribe topic: $shadow/operation/result/DVSVXI409C/cert_test_1
15/03/2021 20:07:20,421 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onUnSubscribeCompleted 738 - onUnSubscribeCompleted, status[OK], errMsg[unsubscribe success], topics[[$shadow/operation/result/DVSVXI409C/cert_test_1]]
15/03/2021 20:07:20,423 [MQTT Disc: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onDisconnectCompleted 694 - onDisconnectCompleted, status[OK], msg[disconnected to ssl://DVSVXI409C.iotcloud.tencentdevices.com:8883]
运行 ShadowSampleTest.java 的main函数,设备上线后调用registerProperty()。会创建 DeviceProperty 属性实例并将其添加到属性数组中,等待被上传更新。示例代码如下:
private static void registerProperty() {
try {
Thread.sleep(2000);
DeviceProperty deviceProperty1 = new DeviceProperty();
deviceProperty1.key("updateCount").data(String.valueOf(mUpdateCount.getAndIncrement())).dataType(TXShadowConstants.JSONDataType.INT);
mShadowConnection.registerProperty(deviceProperty1);
DeviceProperty deviceProperty2 = new DeviceProperty();
deviceProperty2.key("temperatureDesire").data(String.valueOf(mTemperatureDesire.getAndIncrement())).dataType(TXShadowConstants.JSONDataType.INT);
mShadowConnection.registerProperty(deviceProperty2);
mDevicePropertyList.add(deviceProperty1);
mDevicePropertyList.add(deviceProperty2);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
以上方法会在 ShadowSampleTest 维护一个 mDevicePropertyList 装着 DeviceProperty(设备属性)的 List ,当更新设备影子时会将 DeviceProperty(设备属性) 更新到云端的设备影子 json 中。
运行 ShadowSampleTest.java 的main函数,设备上线后调用update()。在示例程序中会每隔10秒钟,更新设备属性信息。示例代码如下:
private static void update() {
try {
while(true) {
Thread.sleep(10000);
for (DeviceProperty deviceProperty : mDevicePropertyList) {
if ("updateCount".equals(deviceProperty.mKey)) {
deviceProperty.data(String.valueOf(mUpdateCount.getAndIncrement()));
} else if ("temperatureDesire".equals(deviceProperty.mKey)) {
deviceProperty.data(String.valueOf(mTemperatureDesire.getAndIncrement()));
}
}
LOG.info("update device property");
mShadowConnection.update(mDevicePropertyList, null);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
如果上一步点击了注册设备属性
按钮 ,则会把注册的属性信息更新到设备影子 json 中,观察Logcat日志。
16/03/2021 09:33:58,028 [main] DEBUG TXShadowConnection publish 409 - ******publish message id:18197
16/03/2021 09:33:58,028 [main] INFO TXMqttConnection publish 502 - Starting publish topic: $shadow/operation/DVSVXI409C/cert_test_1 Message: {"clientToken":"DVSVXI409Ccert_test_1-0","state":{"reported":{"updateCount":1,"temperatureDesire":21}},"type":"update","version":0}
16/03/2021 09:33:58,029 [MQTT Call: DVSVXI409Ccert_test_1] INFO TXMqttConnection deliveryComplete 965 - deliveryComplete, token.getMessageId:2
16/03/2021 09:33:58,029 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onPublishCompleted 708 - onPublishCompleted, status[OK], errMsg[publish success], topics[[$shadow/operation/DVSVXI409C/cert_test_1]]
16/03/2021 09:33:58,058 [MQTT Call: DVSVXI409Ccert_test_1] INFO TXMqttConnection messageArrived 941 - Received topic: $shadow/operation/result/DVSVXI409C/cert_test_1, id: 0, message: {"clientToken":"DVSVXI409Ccert_test_1-0","payload":{"state":{"reported":{"temperatureDesire":21,"updateCount":1}},"timestamp":1615858438033,"version":0},"result":0,"timestamp":1615858438033,"type":"update"}
16/03/2021 09:33:58,060 [MQTT Call: DVSVXI409Ccert_test_1] INFO ShadowSampleTest onRequestCallback 211 - onRequestCallback, type[update], result[0], document[{"state":{"reported":{"updateCount":1,"temperatureDesire":21}},"version":0,"timestamp":1615858438033}]
16/03/2021 09:33:58,060 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onMessageReceived 788 - ******update local mDocumentVersion to 0
...
从以上日志可以看出点击定时更新设备影子时,会将注册的属性信息先发布成一条带有 type 为 update 的 Topic 消息,发布消息成功回调后,由于 运行示例程序体验设备影子连接IoT云端 时,订阅过 $shadow/operation/result/${productId}/${deviceName}
Topic ,所以会收到带有设备属性的订阅的消息,同时更新本地 version ,用来判断消息中的 version 是否与设备影子服务端中的 version 一致。如果一致,则设备影子服务端执行更新设备影子流程。
运行 ShadowSampleTest.java 的main函数,设备上线后调用getDeviceDocument(),就会把设备影子最新的文档拉取下来。示例代码如下:
private static void getDeviceDocument() {
try {
Thread.sleep(2000);
mShadowConnection.get(null);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
观察Logcat日志。
16/03/2021 09:38:26,216 [main] INFO TXMqttConnection publish 502 - Starting publish topic: $shadow/operation/DVSVXI409C/cert_test_1 Message: {"clientToken":"DVSVXI409Ccert_test_1-0","type":"get"}
16/03/2021 09:38:26,216 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onPublishCompleted 708 - onPublishCompleted, status[OK], errMsg[publish success], topics[[$shadow/operation/DVSVXI409C/cert_test_1]]
16/03/2021 09:38:26,239 [MQTT Call: DVSVXI409Ccert_test_1] INFO TXMqttConnection messageArrived 941 - Received topic: $shadow/operation/result/DVSVXI409C/cert_test_1, id: 0, message: {"clientToken":"DVSVXI409Ccert_test_1-0","payload":{"state":{"reported":{"temperatureDesire":47,"updateCount":27}},"timestamp":1615858698132,"version":26},"result":0,"timestamp":1615858706,"type":"get"}
16/03/2021 09:38:26,241 [MQTT Call: DVSVXI409Ccert_test_1] INFO ShadowSampleTest onRequestCallback 213 - onRequestCallback, type[get], result[0], document[{"state":{"reported":{"updateCount":27,"temperatureDesire":47}},"version":26,"timestamp":1615858698132}]
16/03/2021 09:38:26,242 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onMessageReceived 788 - ******update local mDocumentVersion to 26
从以上日志可以看出,点击获取设备文档时,会发布一条带有 type 为 get 的 Topic 消息,由于 运行示例程序体验设备影子连接IoT云端 时,订阅过 $shadow/operation/result/${productId}/${deviceName}
Topic ,所以会收到设备影子最新的文档订阅的消息,在控制台中查看最新的设备影子文档可发现,和拉取得到的文档是一致的。
运行示例程序前,需要把将要订阅的 Topic 主题配置在 app-config.json 文件中的SHADOW_TEST_TOPIC(Topic权限),Topic的生成请参考 基于TCP的MQTT设备接入 控制台创建设备 中权限的使用。
运行 ShadowSampleTest.java 的main函数,设备上线后调用subscribeTopic(),订阅设备主题。示例代码如下:
private static void subscribeTopic() {
try {
Thread.sleep(2000);
// QOS等级
int qos = TXMqttConstants.QOS1;
// 用户上下文(请求实例)
MQTTRequest mqttRequest = new MQTTRequest("subscribeTopic", requestID.getAndIncrement());
LOG.debug("Start to subscribe" + mTestTopic);
// 调用TXShadowConnection的subscribe方法订阅主题
mShadowConnection.subcribe(mTestTopic, qos, mqttRequest);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
以下是设备成功订阅主题的logcat日志
16/03/2021 09:40:21,269 [main] INFO TXMqttConnection subscribe 684 - Starting subscribe topic: DVSVXI409C/cert_test_1/data
16/03/2021 09:40:21,284 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onSubscribeCompleted 723 - onSubscribeCompleted, status[OK], errMsg[subscribe success], topics[[DVSVXI409C/cert_test_1/data]]
设备之前订阅过的主题,可以取消订阅。
运行 ShadowSampleTest.java 的main函数,设备上线后调用unSubscribeTopic(),取消订阅。示例代码如下:
private static void unSubscribeTopic() {
try {
Thread.sleep(2000);
// 用户上下文(请求实例)
MQTTRequest mqttRequest = new MQTTRequest("unSubscribeTopic", requestID.getAndIncrement());
LOG.debug("Start to unSubscribe" + mTestTopic);
// 取消订阅主题
mShadowConnection.unSubscribe(mTestTopic, mqttRequest);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
以下是设备成功取消订阅的logcat日志
16/03/2021 09:42:42,628 [main] INFO TXMqttConnection unSubscribe 722 - Starting unSubscribe topic: DVSVXI409C/cert_test_1/data
16/03/2021 09:42:42,640 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG TXShadowConnection onUnSubscribeCompleted 742 - onUnSubscribeCompleted, status[OK], errMsg[unsubscribe success], topics[[DVSVXI409C/cert_test_1/data]]
运行示例程序前,需要把将要发布的 Topic 主题配置在 ShadowSampleTest.java 中的mTestTopic(Topic权限),Topic的生成请参考 基于TCP的MQTT设备接入 控制台创建设备 中权限的使用。
运行 ShadowSampleTest.java 的main函数,设备上线后调用publishTopic(),发布设备主题。示例代码如下:
private static void publishTopic() {
try {
Thread.sleep(2000);
// 要发布的数据
Map<String, String> data = new HashMap<String, String>();
// 车辆类型
data.put("car_type", "suv");
// 车辆油耗
data.put("oil_consumption", "6.6");
// 车辆最高速度
data.put("maximum_speed", "205");
// MQTT消息
MqttMessage message = new MqttMessage();
JSONObject jsonObject = new JSONObject();
try {
for (Map.Entry<String, String> entrys : data.entrySet()) {
jsonObject.put(entrys.getKey(), entrys.getValue());
}
} catch (JSONException e) {
LOG.error("pack json data failed!" + e.getMessage());
}
message.setQos(TXMqttConstants.QOS1);
message.setPayload(jsonObject.toString().getBytes());
// 用户上下文(请求实例)
MQTTRequest mqttRequest = new MQTTRequest("publishTopic", requestID.getAndIncrement());
LOG.debug("pub topic " + mTestTopic + message);
// 发布主题
mShadowConnection.publish(mTestTopic, message, mqttRequest);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
以下是成功发布主题的logcat日志。
16/03/2021 09:45:46,807 [main] INFO TXMqttConnection publish 502 - Starting publish topic: DVSVXI409C/cert_test_1/data Message: {"oil_consumption":"6.6","maximum_speed":"205","car_type":"suv"}
16/03/2021 09:45:46,817 [MQTT Call: DVSVXI409Ccert_test_1] DEBUG ShadowSampleTest onPublishCompleted 259 - onPublishCompleted, status[OK], topics[[DVSVXI409C/cert_test_1/data]], userContext[], errMsg[publish success]