MQTT获取离线消息小议

简介: 本文主要介绍延迟消息的发送与接收环节需要注意的问题。

作者:俏巴

概述

微消息队列MQ for IoT在处理离线消息时,为了简化离线消息获取机制,微消息队列系统在客户端成功建立连接并通过权限校验后,会自动加载离线消息并下发到客户端,但是实际在使用过程中会出现消费端启动后迟迟无法获取离线消息的问题,本文主要介绍延迟消息的发送与接收环节需要注意的问题。

协议相关

注意在使用SDK进行离线消息的发送过程中需要特别注意QoS和cleanSession两个参数。

  • QoS 指代消息传输的服务质量(主要针对发送端)
取值 1 2 3
意义 最多分发一次 最多分发一次 仅分发一次
  • cleanSession 建立 TCP 连接后是否关心之前状态(主要针对接收端)
true false
客户端再次上线时,将不再关心之前所有的订阅关系以及离线消息 客户端再次上线时,还需要处理之前的离线消息,而之前的订阅关系也会持续生效

为了处理的方便,对于处理离线消息的情况,建议不论是发送端还是接收端,参数都设置为:

QoS = 1

cleanSession = false

Java示例代码

Send Code

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.IOException;
import java.util.Date;

import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;

public class MQTTSendMsg1 {

    public static void main(String[] args) throws IOException {

        final String broker ="tcp://******.mqtt.aliyuncs.com:1883";
        final String acessKey ="******";
        final String secretKey ="******";
        final String topic ="******";
        final String clientId ="GID_******@@@ClientID_device1";
        String sign;
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            final MqttConnectOptions connOpts = new MqttConnectOptions();
            System.out.println("Connecting to broker: " + broker);
            sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
            connOpts.setUserName(acessKey);
            connOpts.setServerURIs(new String[] { broker });
            connOpts.setPassword(sign.toCharArray());
            connOpts.setCleanSession(false);
            connOpts.setKeepAliveInterval(90);
            connOpts.setAutomaticReconnect(true);
            connOpts.setMqttVersion(MQTT_VERSION_3_1_1);
            sampleClient.setCallback(new MqttCallbackExtended() {
                public void connectComplete(boolean reconnect, String serverURI) {
                    System.out.println("connect success");
                    //连接成功,需要上传客户端所有的订阅关系
                }
                public void connectionLost(Throwable throwable) {
                    System.out.println("mqtt connection lost");
                }
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
                }
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
                }
            });
            sampleClient.connect(connOpts);
            for (int i = 0; i < 5; i++) {
                try {
                    String scontent = new Date()+"MQTT Test body" + i;
                    //此处消息体只需要传入 byte 数组即可,对于其他类型的消息,请自行完成二进制数据的转换
                    final MqttMessage message = new MqttMessage(scontent.getBytes());
                    message.setQos(1);//设置离线消息的情况
                    System.out.println(i+" pushed at "+new Date()+" "+ scontent);
                    sampleClient.publish(topic+"/notice/", message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } catch (Exception me) {
            me.printStackTrace();
        }
    }
}

Receive Code

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MQTTRecvMsg {
        public static void main(String[] args) {

            final String broker ="tcp://******.mqtt.aliyuncs.com:1883";
            final String acessKey ="******";
            final String secretKey ="******";
            final String topic ="******";
            final String clientId ="GID_******@@@ClientID_device2";
            String sign;
            MemoryPersistence persistence = new MemoryPersistence();
            try {
                final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
                final MqttConnectOptions connOpts = new MqttConnectOptions();
                System.out.println("Connecting to broker: " + broker);

                sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
                final String[] topicFilters=new String[]{topic+"/notice/"};
                final int[]qos={1};
                connOpts.setUserName(acessKey);
                connOpts.setServerURIs(new String[] { broker });
                connOpts.setPassword(sign.toCharArray());
                connOpts.setCleanSession(false);//设置确定是否继续接受离线消息
                connOpts.setKeepAliveInterval(90);
                connOpts.setAutomaticReconnect(true);
                final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>());
                sampleClient.setCallback(new MqttCallbackExtended() {
                    public void connectComplete(boolean reconnect, String serverURI) {
                        System.out.println("connect success");
                        //连接成功,需要上传客户端所有的订阅关系
                        executorService.submit(new Runnable()
                        {
                            public void run()
                            {
                                try
                                {
                                    sampleClient.subscribe(topicFilters, qos);
                                } catch(Exception me)
                                {
                                    me.printStackTrace();
                                }
                            }
                        });
                    }
                    public void connectionLost(Throwable throwable) {
                        System.out.println("mqtt connection lost");
                    }
                    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                        System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
                    }
                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                        System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
                    }
                });
                //客户端每次上线都必须上传自己所有涉及的订阅关系,否则可能会导致消息接收延迟
                sampleClient.connect(connOpts);
                //每个客户端最多允许存在30个订阅关系,超出限制可能会丢弃导致收不到部分消息
                sampleClient.subscribe(topicFilters,qos);
                Thread.sleep(Integer.MAX_VALUE);
            } catch (Exception me) {
                me.printStackTrace();
            }
        }
}

特别注意:

离线消息生成需要一定的时间,因为推送的消息需要等待客户端的 ack 超时才会被判成离线消息,所以获取离线消息一般也需要订阅端等待一定的时间。

参考链接

微消息队列名词解释

MQTT 获取离线消息

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 网络性能优化
消息队列 MQ产品使用合集之一个设备的离线消息的数量限制是多少
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
247 1
|
消息中间件 API RocketMQ
你的RocketMQ消费者组(Consumer Group)在查看时显示为离线,这可能是由于消费者组的状态没有被正确更新
你的RocketMQ消费者组(Consumer Group)在查看时显示为离线,这可能是由于消费者组的状态没有被正确更新【1月更文挑战第10天】【1月更文挑战第49篇】
2070 5
|
网络性能优化 网络架构
我的mqtt协议和emqttd开源项目个人理解(4) - 客户端CleanSession=0时,上线接收离线消息,源码分析
我的mqtt协议和emqttd开源项目个人理解(4) - 客户端CleanSession=0时,上线接收离线消息,源码分析
380 0
|
消息中间件 网络协议 大数据
MQTT获取离线消息小议
微消息队列MQ for IoT在处理离线消息时,为了简化离线消息获取机制,微消息队列系统在客户端成功建立连接并通过权限校验后,会自动加载离线消息并下发到客户端,但是实际在使用过程中会出现消费端启动后迟迟无法获取离线消息的问题,本文主要介绍延迟消息的发送与接收缓解需要注意的问题。
3795 0
|
5月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
3月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
253 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
889 93
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
397 94
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
205 1