JAVA MQTT Client如何连接阿里云IoT?

简介: 在使用阿里云官方IoT JAVA Device SDK连接云端测试的时候,发现日志总是会打印一些莫名其妙Topic消息的订阅和发布,但是用户并没有操作这些Topic,这是因为SDK底层默认做了很多系统Topic的订阅和发布设置,且无法关闭,导致很多测试不能满足预期的测试期望。如果不希望一些系统Topic的默认订阅和发布,建议可以使用开源MQTT Client进行Topic消息的订阅和发布。

概述



在使用阿里云官方IoT JAVA Device SDK连接云端测试的时候,发现日志总是会打印一些莫名其妙Topic消息的订阅和发布,但是用户并没有操作这些Topic,这是因为SDK底层默认做了很多系统Topic的订阅和发布设置,且无法关闭,导致很多测试不能满足预期的测试期望。如果不希望一些系统Topic的默认订阅和发布,建议可以使用开源MQTT Client进行Topic消息的订阅和发布



操作步骤


1、创建产品和设备



参考:阿里云物联网平台Qucik Start 创建产品和设备部分。



2、pom.xml


   <dependencies>
    &lt;dependency&gt;
        &lt;groupId&gt;org.eclipse.paho&lt;/groupId&gt;
        &lt;artifactId&gt;org.eclipse.paho.client.mqttv3&lt;/artifactId&gt;
        &lt;version&gt;1.1.0&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
        &lt;groupId&gt;com.google.guava&lt;/groupId&gt;
        &lt;artifactId&gt;guava&lt;/artifactId&gt;
        &lt;version&gt;23.0&lt;/version&gt;
    &lt;/dependency&gt;
&lt;/dependencies&gt;</code></pre> 

3、工具类 AliyunIoTSignUtil


import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;

/**

AliyunIoTSignUtil
*/

public class AliyunIoTSignUtil {

public static String sign(Map&lt;String, String&gt; params, String deviceSecret, String signMethod) {
    //将参数Key按字典顺序排序
    String[] sortedKeys &#61; params.keySet().toArray(new String[] {});
    Arrays.sort(sortedKeys);

    //生成规范化请求字符串
    StringBuilder canonicalizedQueryString &#61; new StringBuilder();
    for (String key : sortedKeys) {
        if (&#34;sign&#34;.equalsIgnoreCase(key)) {
            continue;
        }
        canonicalizedQueryString.append(key).append(params.get(key));
    }

    try {
        String key &#61; deviceSecret;
        return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

/**
 * HMACSHA1加密
 *
 */
public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
    SecretKey secretKey &#61; new SecretKeySpec(key.getBytes(&#34;utf-8&#34;), signMethod);
    Mac mac &#61; Mac.getInstance(secretKey.getAlgorithm());
    mac.init(secretKey);
    byte[] data &#61; mac.doFinal(content.getBytes(&#34;utf-8&#34;));
    return bytesToHexString(data);
}

public static final String bytesToHexString(byte[] bArray) {

    StringBuffer sb &#61; new StringBuffer(bArray.length);
    String sTemp;
    for (int i &#61; 0; i &lt; bArray.length; i&#43;&#43;) {
        sTemp &#61; Integer.toHexString(0xFF &amp; bArray[i]);
        if (sTemp.length() &lt; 2) {
            sb.append(0);
        }
        sb.append(sTemp.toUpperCase());
    }
    return sb.toString();
}

}


4、main方法


import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;

public class IoTDemoPubSubDemo {

public static String productKey &#61; &#34;********&#34;;
public static String deviceName &#61; &#34;OpenMQTTDevice&#34;;
public static String deviceSecret &#61; &#34;********&#34;;
public static String regionId &#61; &#34;cn-shanghai&#34;;

// 物模型-属性上报topic
private static String pubTopic &#61; &#34;/sys/&#34; &#43; productKey &#43; &#34;/&#34; &#43; deviceName &#43; &#34;/thing/event/property/post&#34;;
// 自定义topic&#xff0c;在产品Topic列表位置定义
private static String subTopic &#61; &#34;/&#34;&#43;productKey &#43; &#34;/&#34; &#43; deviceName&#43;&#34;/user/newdatademo&#34;;

private static MqttClient mqttClient;

public static void main(String [] args){

    initAliyunIoTClient();

// ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
// new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
// scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);

    // 汇报属性
    postDeviceProperties();
    try {
        mqttClient.subscribe(subTopic); // 订阅Topic
    } catch (MqttException e) {
        System.out.println(&#34;error:&#34; &#43; e.getMessage());
        e.printStackTrace();
    }

    // 设置订阅监听
    mqttClient.setCallback(new MqttCallback() {
        &#64;Override
        public void connectionLost(Throwable throwable) {
            System.out.println(&#34;connection Lost&#34;);

        }

        &#64;Override
        public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
            System.out.println(&#34;Sub message&#34;);
            System.out.println(&#34;Topic : &#34; &#43; s);
            System.out.println(new String(mqttMessage.getPayload())); //打印输出消息payLoad
        }

        &#64;Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }
    });

}

/**
 * 初始化 Client 对象
 */
private static void initAliyunIoTClient() {

    try {
        // 构造连接需要的参数
        String clientId &#61; &#34;java&#34; &#43; System.currentTimeMillis();
        Map&lt;String, String&gt; params &#61; new HashMap&lt;&gt;(16);
        params.put(&#34;productKey&#34;, productKey);
        params.put(&#34;deviceName&#34;, deviceName);
        params.put(&#34;clientId&#34;, clientId);
        String timestamp &#61; String.valueOf(System.currentTimeMillis());
        params.put(&#34;timestamp&#34;, timestamp);
        // cn-shanghai
        String targetServer &#61; &#34;tcp://&#34; &#43; productKey &#43; &#34;.iot-as-mqtt.&#34;&#43;regionId&#43;&#34;.aliyuncs.com:1883&#34;;

        String mqttclientId &#61; clientId &#43; &#34;|securemode&#61;3,signmethod&#61;hmacsha1,timestamp&#61;&#34; &#43; timestamp &#43; &#34;|&#34;;
        String mqttUsername &#61; deviceName &#43; &#34;&amp;&#34; &#43; productKey;
        String mqttPassword &#61; AliyunIoTSignUtil.sign(params, deviceSecret, &#34;hmacsha1&#34;);

        connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

    } catch (Exception e) {
        System.out.println(&#34;initAliyunIoTClient error &#34; &#43; e.getMessage());
    }
}

public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {

    MemoryPersistence persistence &#61; new MemoryPersistence();
    mqttClient &#61; new MqttClient(url, clientId, persistence);
    MqttConnectOptions connOpts &#61; new MqttConnectOptions();
    // MQTT 3.1.1
    connOpts.setMqttVersion(4);
    connOpts.setAutomaticReconnect(false);
    connOpts.setCleanSession(true);

    connOpts.setUserName(mqttUsername);
    connOpts.setPassword(mqttPassword.toCharArray());
    connOpts.setKeepAliveInterval(60);

    mqttClient.connect(connOpts);
}

/**
 * 汇报属性
 */
private static void postDeviceProperties() {

    try {
        //上报数据
        //高级版 物模型-属性上报payload
        System.out.println(&#34;上报属性值&#34;);
        String payloadJson &#61; &#34;{\&#34;params\&#34;:{\&#34;Status\&#34;:0,\&#34;Data\&#34;:\&#34;15\&#34;}}&#34;;
        MqttMessage message &#61; new MqttMessage(payloadJson.getBytes(&#34;utf-8&#34;));
        message.setQos(1);
        mqttClient.publish(pubTopic, message);
    } catch (Exception e) {
        System.out.println(e.getMessage());
    }
}

}


5、运行测试情况







参考链接


基于开源MQTT自主接入阿里云IoT平台(Java)

MQTT-TCP连接通信


云服务器ECS地址:阿里云·云小站

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 存储 Serverless
【实践】快速学会使用阿里云消息队列RabbitMQ版
云消息队列 RabbitMQ 版是一款基于高可用分布式存储架构实现的 AMQP 0-9-1协议的消息产品。云消息队列 RabbitMQ 版兼容开源 RabbitMQ 客户端,解决开源各种稳定性痛点(例如消息堆积、脑裂等问题),同时具备高并发、分布式、灵活扩缩容等云消息服务优势。
106 2
|
21天前
|
Arthas 监控 Java
拥抱 OpenTelemetry:阿里云 Java Agent 演进实践
本文介绍了阿里云 Java Agent 4.x 版本在基于 OTel Java Agent 二次开发过程中的实践与思考,并重点从功能、性能、稳定性、兼容性四个方面介绍了所做的工作。同时也介绍了阿里云可观测团队积极参与开源建设取得的丰厚成果。
153 5
拥抱 OpenTelemetry:阿里云 Java Agent 演进实践
|
6月前
|
消息中间件 安全 API
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(1)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
304 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(1)
|
6月前
|
消息中间件 安全 Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(4)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
186 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(4)
|
6月前
|
消息中间件 安全 Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(2)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
252 0
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(2)
|
3月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
77 9
|
3月前
|
存储 SQL 分布式计算
Java连接阿里云MaxCompute例
要使用Java连接阿里云MaxCompute数据库,首先需在项目中添加MaxCompute JDBC驱动依赖,推荐通过Maven管理。避免在代码中直接写入AccessKey,应使用环境变量或配置文件安全存储。示例代码展示了如何注册驱动、建立连接及执行SQL查询。建议使用RAM用户提升安全性,并根据需要配置时区和公网访问权限。具体步骤和注意事项请参考阿里云官方文档。
369 10
|
4月前
|
Java 开发工具
通过Java SDK调用阿里云模型服务
在阿里云平台上,可以通过创建应用并使用模型服务完成特定任务,如生成文章内容。本示例展示了一段简化的Java代码,演示了如何调用阿里云模型服务生成关于“春秋战国经济与文化”的简短文章。示例代码通过设置系统角色为历史学家,并提出文章生成需求,最终处理并输出生成的文章内容。在实际部署前,请确保正确配置环境变量中的密钥和ID,并根据需要调整SDK导入语句及类名。更多详情和示例,请参考相关链接。
|
6月前
|
消息中间件 Cloud Native Apache
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(6)
阿里云瑶池数据库云原生化和一体化产品能力升级,多款产品更新迭代
123 1
《阿里云产品四月刊》—Apache RocketMQ ACL 2.0 全新升级(6)
|
4月前
|
机器学习/深度学习 存储 缓存
Java本地高性能缓存实践问题之阿里云机器学习团队开源社区的问题如何解决
Java本地高性能缓存实践问题之阿里云机器学习团队开源社区的问题如何解决