RocketMQ事务消息实战

简介: 本文主要是考虑在使用消息中间件时,如果保证不丢消息的一些实践思考。

若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群


我们以一个订单流转流程来举例,例如订单子系统创建订单,需要将订单数据下发到其他子系统(与第三方系统对接)这个场景,我们通常会将两个系统进行解耦,不直接使用服务调用的方式进行交互。其业务实现步骤通常为:

  1. A系统创建订单并入库。
  2. 发送消息到MQ。
  3. MQ消费者消费消息,发送远程RPC服务调用,完成订单数据的同步。

1、方案一
伪代码如下:
clipboard
方案弊端:

  1. 如果消息发送成功,在提交事务的时候JVM突然挂掉,事务没有成功提交,导致两个系统之间数据不一致。
  2. 由于消息是在事务提交之前提交,发送的消息内容是订单实体的内容,会造成在消费端进行消费时如果需要去验证订单是否存在时可能出现订单不存在。

方案二:
伪代码如下:
clipboard
然后在控制器层,使用异步发送,将消息发送,并在消息发送成功后,更新待发送状态为已发送。
然后通过定时任务,扫描待发送,结合创建时间的记录(小于当前时间5分钟的消息待发送记录),进行消息发送。
方案弊端:
1、消息有可能重复发送,但在消费端可以通过唯一业务编号来进行去重设计。
2、实现过于复杂,为了避免极端情况下的消息丢失,需要使用定时任务。

方案三:基于RocketMQ4.3版本事务消息
clipboard
额外需要实现事务会查监听器:TransactionListener,其实例代码:

package org.apache.rocketmq.example.transaction;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;

@SuppressWarnings("unused")
public class OrderTransactionListenerImpl implements TransactionListener {
    
    private ConcurrentHashMap<String, Integer> countHashMap = new ConcurrentHashMap<>();
    
    private final static int MAX_COUNT = 5;


    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 
        String bizUniNo = msg.getUserProperty("bizUniNo"); // 从消息中获取业务唯一ID。
        // 将bizUniNo入库,表名:t_message_transaction,表结构  bizUniNo(主键),业务类型。
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = 0;
        // 从数据库查查询t_message_transaction表,如果该表中存在记录,则提交,
        String bizUniNo = msg.getUserProperty("bizUniNo"); // 从消息中获取业务唯一ID。
        // 然后t_message_transaction 表,是否存在bizUniNo,如果存在,则返回COMMIT_MESSAGE,
        // 不存在,则记录查询次数,未超过次数,返回UNKNOW,超过次数,返回ROLLBACK_MESSAGE
        
        if(query(bizUniNo) > 0 ) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        
        return rollBackOrUnown(bizUniNo);
    }
    
    public int query(String bizUniNo) {
        return 1; //select count(1) from t_message_transaction a where a.biz_uni_no=#{bizUniNo}
    }
    
    public LocalTransactionState rollBackOrUnown(String bizUniNo) {
        Integer num = countHashMap.get(bizUniNo);
        
        if(num != null &&  ++num > MAX_COUNT) {
            countHashMap.remove(bizUniNo);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        
        if(num == null) {
            num = new Integer(1);
        }
        
        countHashMap.put(bizUniNo, num);
        return LocalTransactionState.UNKNOW;
        
    }
  
} 

TransactionListener 实现要点:

  • executeLocalTransaction:
    该方法,主要是设置本地事务状态,该方法与业务方代码在一个事务中,例如OrderServer#createMap中,只要本地事务提交成功,该方法也会提交成功。

在这里主要是t_message_transaction添加一条记录,在事务会查时,如果存在记录,就认为是该消息需要提交。

  • checkLocalTransaction:
    该方法主要是告知RocketMQ消息是否需要提交还是回滚,如果本地事务表(t_message_transaction)存在记录,则认为提交,如果不存在,可以设置会查次数,如果指定次数内还是未查到消息,则回滚,否则返回未知,rocketmq会按一定的频率回查事务,当然回查次数也有限制,默认为5次,可配置。

本节的分享就到此结束了,本文主要是考虑在使用消息中间件时,如果保证不丢消息的一些实践思考。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
7月前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
2月前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
100 2
|
2月前
|
消息中间件 数据采集 中间件
RabbitMQ的使用—实战
RabbitMQ的使用—实战
87 0
|
7月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
7月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
3月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
179 0
|
5月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
171 15
|
4月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
5月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
346 1