基于MQTT接入的设备靠心跳保活,但心跳具有周期性、自动收发包和超时重连等特性,这些特性给主动检测设备端是否在线带来了一定难度。服务端虽提供了GetDeviceStatus和BatchGetDeviceState接口查询设备状态,但是调用API获取设备状态是基于会话实现的,会话保活是建立在心跳基础上的。本文提供通过使用RRPC来判定设备是否在线的原理、流程、实现方式。
如果设备可以接收服务端下发的消息并作出响应,则该设备通信正常,设备一定在线。
消息收发是物联网平台的核心能力。因此,这种判定方法不会因为物联网平台架构升级或业务变动而变化,也不会因为设备使用的客户端不同而不同。这是服务端检测设备是否在线最通用的一种原理。
物联网平台提供的RRPC能力就是该原理的一种特殊实现。
服务端调用RRpc接口发送指令,例如{"id":123,"version":"1.0","time":1234567890123}。 消息内容可自定义,但建议使用此格式。
参数说明如下表。
字段 | 类新 | 说明 |
---|---|---|
id | Object | 消息ID,用于验证收发的消息是否是同一条。请自行在业务层生成,并保证其唯一性。 |
version | String | 版本号。目前版本号固定为1.0。 |
time | Long | 发送消息的时间戳,可以用于计算消息来回的延时,评估当前的通信质量。 |
设备端接收指令并响应RRPC请求。接收的消息格式:{"id":123,"version":"1.0","time":1234567890123} 离线判定逻辑如下。
为方便体验,本例基于设备端Java SDK Demo和服务端Java SDK Demo开发。
分别下载Demo工程,服务端添加CheckDeviceStatusOnServer类,设备端添加Device类,填写您的阿里云账号AccessKey信息和设备证书信息。
设备端代码如下: import java.io.UnsupportedEncodingException;
import com.aliyun.alink.dm.api.DeviceInfo; import com.aliyun.alink.dm.api.InitResult; import com.aliyun.alink.linkkit.api.ILinkKitConnectListener; import com.aliyun.alink.linkkit.api.IoTMqttClientConfig; import com.aliyun.alink.linkkit.api.LinkKit; import com.aliyun.alink.linkkit.api.LinkKitInitParams; import com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest; import com.aliyun.alink.linksdk.cmp.connect.channel.MqttSubscribeRequest; import com.aliyun.alink.linksdk.cmp.core.base.AMessage; import com.aliyun.alink.linksdk.cmp.core.base.ARequest; import com.aliyun.alink.linksdk.cmp.core.base.AResponse; import com.aliyun.alink.linksdk.cmp.core.base.ConnectState; import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener; import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSendListener; import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSubscribeListener; import com.aliyun.alink.linksdk.tools.AError;
public class Device {
// ===================需要用户填写的参数,开始===========================
// 产品productKey,设备证书参数之一
private static String productKey = "";
// 设备名字deviceName,设备证书参数之一
private static String deviceName = "";
// 设备密钥deviceSecret,设备证书参数之一
private static String deviceSecret = "";
// 消息通信的Topic,无需创建和定义,直接使用即可
private static String rrpcTopic = "/sys/" + productKey + "/" + deviceName + "/rrpc/request/+";
// ===================需要用户填写的参数,结束===========================
public static void main(String[] args) throws InterruptedException {
Device device = new Device();
// 初始化
device.init(productKey, deviceName, deviceSecret);
// 下行数据监听
device.registerNotifyListener();
// 订阅Topic
device.subscribe(rrpcTopic);
}
/**
* 初始化
*
* @param pk productKey
* @param dn devcieName
* @param ds deviceSecret
* @throws InterruptedException
*/
public void init(String pk, String dn, String ds) throws InterruptedException {
LinkKitInitParams params = new LinkKitInitParams();
// 设置 MQTT 初始化参数
IoTMqttClientConfig config = new IoTMqttClientConfig();
config.productKey = pk;
config.deviceName = dn;
config.deviceSecret = ds;
params.mqttClientConfig = config;
// 设置初始化设备证书信息,用户传入
DeviceInfo deviceInfo = new DeviceInfo();
deviceInfo.productKey = pk;
deviceInfo.deviceName = dn;
deviceInfo.deviceSecret = ds;
params.deviceInfo = deviceInfo;
LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
public void onError(AError aError) {
System.out.println("init failed !! code=" + aError.getCode() + ",msg=" + aError.getMsg() + ",subCode="
+ aError.getSubCode() + ",subMsg=" + aError.getSubMsg());
}
public void onInitDone(InitResult initResult) {
System.out.println("init success !!");
}
});
// 确保初始化成功后才执行后面的步骤,可以根据实际情况适当延长这里的延时
Thread.sleep(2000);
}
/**
* 监听下行数据
*/
public void registerNotifyListener() {
LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
@Override
public boolean shouldHandle(String connectId, String topic) {
// 只处理特定Topic的消息
if (topic.contains("/rrpc/request/")) {
return true;
} else {
return false;
}
}
@Override
public void onNotify(String connectId, String topic, AMessage aMessage) {
// 接收RRPC请求并回复RRPC响应
try {
String response = topic.replace("/request/", "/response/");
publish(response, new String((byte[]) aMessage.getData(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void onConnectStateChange(String connectId, ConnectState connectState) {
}
});
}
/**
* 发布消息
*
* @param topic 发送消息的Topic
* @param payload 发送的消息内容
*/
public void publish(String topic, String payload) {
MqttPublishRequest request = new MqttPublishRequest();
request.topic = topic;
request.payloadObj = payload;
request.qos = 0;
LinkKit.getInstance().getMqttClient().publish(request, new IConnectSendListener() {
@Override
public void onResponse(ARequest aRequest, AResponse aResponse) {
}
@Override
public void onFailure(ARequest aRequest, AError aError) {
}
});
}
/**
* 订阅消息
*
* @param topic 订阅消息的Topic
*/
public void subscribe(String topic) {
MqttSubscribeRequest request = new MqttSubscribeRequest();
request.topic = topic;
LinkKit.getInstance().getMqttClient().subscribe(request, new IConnectSubscribeListener() {
@Override
public void onSuccess() {
}
@Override
public void onFailure(AError aError) {
}
});
}
}
服务端代码如下: import java.io.UnsupportedEncodingException;
import org.apache.commons.codec.binary.Base64;
import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.iot.model.v20170420.RRpcRequest; import com.aliyuncs.iot.model.v20170420.RRpcResponse; import com.aliyuncs.profile.DefaultProfile; import com.aliyuncs.profile.IClientProfile;
public class CheckDeviceStatusOnServer extends BaseTest {
// ===================需要用户填写的参数,开始===========================
// 用户账号AccessKey
private static String accessKeyID = "";
// 用户账号AccesseKeySecret
private static String accessKeySecret = "";
// 产品productKey,设备证书参数之一
private static String productKey = "";
// 设备名字deviceName,设备证书参数之一
private static String deviceName = "";
// ===================需要用户填写的参数,结束===========================
public static void main(String[] args) throws ServerException, ClientException, UnsupportedEncodingException {
// -------------------------------------------------------------------
// 要发送的消息,可以自定义,建议使用当前格式
// -------------------------------------------------------------------
// Field | Tyep | Desc
// -------------------------------------------------------------------
// id | Object | 用于验证收发的消息是否是同一个,请自行业务层保证唯一
// -------------------------------------------------------------------
// version | String | 版本号固定1.0
// -------------------------------------------------------------------
// time | Long | 发送消息的时间戳,可以计算消息来回的延时,评估当前的通信质量
// -------------------------------------------------------------------
String payload = "{\"id\":123, \"version\":\"1.0\",\"time\":" + System.currentTimeMillis() + "}";
// 构建RRPC请求
RRpcRequest request = new RRpcRequest();
request.setProductKey(productKey);
request.setDeviceName(deviceName);
request.setRequestBase64Byte(Base64.encodeBase64String(payload.getBytes()));
request.setTimeout(5000);
// 获取服务端请求客户端
DefaultAcsClient client = getClient();
// 发起RRPC请求
RRpcResponse response = (RRpcResponse) client.getAcsResponse(request);
// RRPC响应处理
// 这个不能看response.getSuccess(),这个仅表明RRPC请求发送成功,不代表设备接收成功和响应成功
// 需要根据RrpcCode来判定,参考文档https://help.aliyun.com/document_detail/69797.html
if (response != null && "SUCCESS".equals(response.getRrpcCode())) {
if (payload.equals(new String(Base64.decodeBase64(response.getPayloadBase64Byte()), "UTF-8"))) {
System.out.println("Device is online");
} else {
System.out.println("Device is offline1");
}
} else {
System.out.println("Device is offline");
}
}
public static DefaultAcsClient getClient() {
DefaultAcsClient client = null;
try {
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", accessKeyID, accessKeySecret);
DefaultProfile.addEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com");
client = new DefaultAcsClient(profile);
} catch (Exception e) {
System.out.println("init client failed !! exception:" + e.getMessage());
}
return client;
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。