阿里云物联网平台之利用云平台流转如何实现同一款产品下任意俩个设备的通信?

简介: 大部分同学应该都知道这种正常的基于云平台流转的M2M设备间通信,可以指定A产品的某个设备的消息流转到B产品的某个设备,或者从B产品的某个设备流转到A产品的某个设备,具有一定的指向性。那么如何才能够体现任意俩个字呢?假设把产品下每个产品都当成群成员,如何才能让他们之间自由的发言呢?也就是说我如何只配置一条规则,就可以实现这个产品下的设备进行自由通信,而不是A->B要配置一条规则,B->C还要配置一条规则,甚至B->A也要配置一条规则。

原理

实现上述需求,其核心就是任意俩个字,而解决这种问题的手段一般就是“通配”。虽然是通配,但是每一条消息实际上还是有个“明确”的目的地的

(1)规则引擎配置:

源端:配置通配符 "+"

这一条保证消息源可以是任意的

目的端:配置通配符"${TargetDevice}"

这一条保证目的地是任意的,而真正的目的地可以根据TargetDevice变化而变化。

(2)设备端

源设备上报TargetDevice字段 ,表明该消息去向

Step By Step

前提:测试相关的环境准备好,两个程序(设备)(A作为源,B作为目的地),物联网平台上topic啊,物模型啊之类的都定义好。

1、配置规则引擎
(1)编写SQL语句
这里选择的是自定义topic消息,也可以选其他的。(注意topic需要有发布权限)

image.png

SELECT TargetDevice FROM "/a16*JpRCl/+/user/update"

(2)添加操作(配置目的地)
这里选下发到另一个自定义topic(注意,测试设备需要成功订阅这个topic)
image.png

/a16*pRCl/${TargetDevice}/user/get

(3)启动规则
image.png

2、设备端开发
源设备A:上报消息(消息中一定要包含TargetDevice字段)
这里体现TargetDevice必须要有,你们自己测试的时候可以再加些其他字段
image.png

源码:
AliyunIoTSignUtil:

package com.alibaba.taro;

import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;

/**
 * AliyunIoTSignUtil
 */

public class AliyunIoTSignUtil {
    public static String sign(Map<String, String> params, String deviceSecret, String signMethod) {
        //将参数Key按字典顺序排序
        String[] sortedKeys = params.keySet().toArray(new String[] {});
        Arrays.sort(sortedKeys);

        //生成规范化请求字符串
        StringBuilder canonicalizedQueryString = new StringBuilder();
        for (String key : sortedKeys) {
            if ("sign".equalsIgnoreCase(key)) {
                continue;
            }
            canonicalizedQueryString.append(key).append(params.get(key));
        }

        try {
            String key = deviceSecret;
            return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * HMACSHA1加密
     *
     */
    public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
        SecretKey secretKey = new SecretKeySpec(key.getBytes("utf-8"), signMethod);
        Mac mac = Mac.getInstance(secretKey.getAlgorithm());
        mac.init(secretKey);
        byte[] data = mac.doFinal(content.getBytes("utf-8"));
        return bytesToHexString(data);
    }

    public static final String bytesToHexString(byte[] bArray) {

        StringBuffer sb = new StringBuffer(bArray.length);
        String sTemp;
        for (int i = 0; i < bArray.length; i++) {
            sTemp = Integer.toHexString(0xFF & bArray[i]);
            if (sTemp.length() < 2) {
                sb.append(0);
            }
            sb.append(sTemp.toUpperCase());
        }
        return sb.toString();
    }
}

设备A:

package com.alibaba;

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.Map;


public class CustomTopicMessageDemo {

    public static String productKey = "a16***pRCl";
    public static String deviceName = "IoTDeviceDemo1";
    public static String deviceSecret = "a3b15a11*****c4952";
    public static String regionId = "cn-shanghai";

    // 自定义topic,在产品Topic列表位置定义
    private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/update";
    private static String subTopic = "/"+productKey + "/" + deviceName+"/user/update";

    private static MqttClient mqttClient;
    public static void main(String [] args){

        initAliyunIoTClient();
//        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
//                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
//        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
        // 汇报属性
        String payloadJson = "{\"TargetDevice\":\"IoTDeviceDemo2\"}";
        postDeviceProperties(payloadJson);

        try {
            mqttClient.subscribe(subTopic); // 订阅Topic
        } catch (MqttException e) {
            System.out.println("error:" + e.getMessage());
            e.printStackTrace();
        }

        // 设置订阅监听
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                System.out.println("connection Lost");

            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                String payload =  new String(mqttMessage.getPayload());
                System.out.println(" 接收消息:");
                System.out.println("Topic : " + s);
                System.out.println(payload); //打印输出消息payLoad             
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });

    }

    /**
     * 初始化 Client 对象
     */
    private static void initAliyunIoTClient() {

        try {
            // 构造连接需要的参数
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<String, String>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            //String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:443";
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(false);
        //connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 汇报属性
     */
    private static void postDeviceProperties(String payloadJson) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("上报属性值:");
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(pubTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    /**
     * 服务返回
     */
    private static void postServiceReply(String payloadJson,String relpyTopic) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("服务调用返回:");
            //String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
            System.out.println("Topic:");
            System.out.println(relpyTopic);
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(relpyTopic, message);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

}



目的设备B:
订阅规则引擎配置的topic
/a16*pRCl/${TargetDevice}/user/get

image.png

源码:
B设备


package com.alibaba;

import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.Map;


public class CustomTopicMessageDemo2 {

    public static String productKey = "a1*****pRCl";
    public static String deviceName = "IoTDeviceDemo2";
    public static String deviceSecret = "0895205*****7e2b4bf2";
    public static String regionId = "cn-shanghai";

    private static String pubTopic = "/"+productKey + "/" + deviceName+"/user/get";
    private static String subTopic = "/"+productKey + "/" + deviceName+"/user/get";

    private static MqttClient mqttClient;

    public static void main(String [] args){

        initAliyunIoTClient();
//        ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
//                new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
//        scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
       
         String payloadJson = "{\"tts\":\"ss\"}";

        postDeviceProperties(payloadJson);

        try {
            mqttClient.subscribe(subTopic); // 订阅Topic
        } catch (MqttException e) {
            System.out.println("error:" + e.getMessage());
            e.printStackTrace();
        }

        // 设置订阅监听
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                System.out.println("connection Lost");

            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                String payload =  new String(mqttMessage.getPayload());
                System.out.println(" 接收消息:");
                System.out.println("Topic : " + s);
                System.out.println(payload); //打印输出消息payLoad           
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });

    }

    /**
     * 初始化 Client 对象
     */
    private static void initAliyunIoTClient() {

        try {
            // 构造连接需要的参数
            String clientId = "java" + System.currentTimeMillis();
            Map<String, String> params = new HashMap<String, String>(16);
            params.put("productKey", productKey);
            params.put("deviceName", deviceName);
            params.put("clientId", clientId);
            String timestamp = String.valueOf(System.currentTimeMillis());
            params.put("timestamp", timestamp);
            // cn-shanghai
            String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";

            String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
            String mqttUsername = deviceName + "&" + productKey;
            String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");

            connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

        } catch (Exception e) {
            System.out.println("initAliyunIoTClient error " + e.getMessage());
        }
    }

    public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

        MemoryPersistence persistence = new MemoryPersistence();
        mqttClient = new MqttClient(url, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // MQTT 3.1.1
        connOpts.setMqttVersion(4);
        connOpts.setAutomaticReconnect(false);
        connOpts.setCleanSession(false);
        //connOpts.setCleanSession(true);

        connOpts.setUserName(mqttUsername);
        connOpts.setPassword(mqttPassword.toCharArray());
        connOpts.setKeepAliveInterval(60);

        mqttClient.connect(connOpts);
    }

    /**
     * 汇报属性
     */
    private static void postDeviceProperties(String payloadJson) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("上报属性值:");
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(pubTopic, message);
            System.out.println("=================================================================");
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    /**
     * 服务返回
     */
    private static void postServiceReply(String payloadJson,String relpyTopic) {

        try {
            //上报数据
            //高级版 物模型-属性上报payload
            System.out.println("服务调用返回:");
            //String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
            System.out.println("Topic:");
            System.out.println(relpyTopic);
            System.out.println(payloadJson);
            MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
            message.setQos(0);
            mqttClient.publish(relpyTopic, message);
            System.out.println("=================================================================");
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

}


测试结果

1、运行B程序、再运行A程序
A程序上报消息
image.png

B程序收到云平台转发消息
image.png

2、控制台日志
2020/12/20 14:16:28.304
设备A上报消息:
image.png

2020/12/20 14:16:28.334
平台触发规则引擎转发消息
image.png

2020/12/20 14:16:28.336
设备B收到转发的消息
image.png

总览:
image.png

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
相关文章
|
1天前
|
安全 物联网 定位技术
2G网络和基站的撤销对物联网设备的影响
2G网络和基站的撤销对物联网设备的影响是多方面的,以下是对这一影响的详细分析:
2G网络和基站的撤销对物联网设备的影响
|
8天前
|
存储 安全 物联网
智能家居安全:物联网设备的风险与防护
在智能家居的浪潮中,物联网技术让生活更加便捷。然而,随之而来的安全问题也不容忽视。本文将揭示智能家居设备可能面临的安全风险,并提供实用的防护措施,帮助用户构建一个更安全的智能生活环境。
|
12天前
|
传感器 监控 物联网
物联网卡在不同应用设备中的基本操作
物联网卡(IoT SIM卡)在物联网(IoT)设备中扮演着至关重要的角色,它们为设备提供网络连接能力,使得设备能够远程交换数据。物联网卡的应用设备广泛,涵盖了从智能家居、智能城市、工业自动化到远程监控等多个领域。以下是物联网卡在不同应用设备中的基本操作流程概述:
|
11天前
|
监控 安全 物联网
智能家居安全:物联网设备的风险与防护
在智能家居的便捷背后,潜藏着不容忽视的安全风险。本文旨在揭示物联网设备可能遭遇的网络攻击类型,并探讨如何通过合理的预防措施来加固我们的智能家园。从技术角度出发,我们将深入分析黑客入侵的途径,并提出有效的防御策略,以期为打造一个更安全的智能家居环境提供指导。
25 1
|
23天前
|
机器学习/深度学习 安全 物联网
智能家居安全:物联网设备的双刃剑
【8月更文挑战第28天】 随着物联网技术的飞速发展,智能家居已成为现代生活的一部分。然而,随之而来的安全问题也日益凸显。本文将深入探讨智能家居中的安全挑战,分析物联网设备如何成为一把双刃剑,既带来便利也可能引发风险。通过案例分析和专家建议,为读者提供实用的防护措施和未来趋势的展望。
|
16天前
|
机器学习/深度学习 人工智能 算法
物联网(IoT)就像是一个大型派对,无数的设备都在欢快地交流着信息
【9月更文挑战第4天】在这个万物互联的时代,物联网(IoT)犹如一场盛大的派对,各类设备欢聚一堂。然而,如何让这些设备互相理解并协同工作呢?这就需要机器学习与人工智能的助力。例如,智能空调通过学习你的使用习惯来调节温度,使你更加舒适;智能安防系统则能识别异常行为并及时报警,保障家庭安全。此外,智能农业、交通等领域也因机器学习和人工智能的应用变得更加高效。下面通过一个简单的温度预测代码示例,展示机器学习在物联网中的实际应用,让我们一起感受其强大潜力。
20 0
|
27天前
|
存储 安全 物联网
物联网中的通信模型
【8月更文挑战第23天】
25 0
|
28天前
|
物联网 C语言
C语言与物联网:设备间的通信与控制
C语言与物联网:设备间的通信与控制
36 0
|
12月前
|
机器学习/深度学习 Kubernetes Cloud Native
SAP 云平台 (Cloud Platform) 架构概述
SAP 云平台 (Cloud Platform) 架构概述
|
12月前
|
移动开发 IDE Java
SAP 云平台从 Neo 到 Multi-Cloud 的演化历史
SAP 云平台从 Neo 到 Multi-Cloud 的演化历史

相关产品

  • 物联网平台