RocketMQ系列(七)事务消息

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 终于到了今天了,终于要讲RocketMQ最牛X的功能了,那就是**事务消息**。为什么事务消息被吹的比较热呢?近几年微服务大行其道,整个系统被切成了多个服务,每个服务掌管着一个数据库。那么多个数据库之间的数据一致性就成了问题,虽然有像XA这种强一致性事务的支持,但是这种强一致性在互联网的应用中并不适合,人们还是更倾向于使用最终一致性的解决方案,在最终一致性的解决方案中,使用MQ保证各个系统之间的数据一致性又是首选。

终于到了今天了,终于要讲RocketMQ最牛X的功能了,那就是事务消息。为什么事务消息被吹的比较热呢?近几年微服务大行其道,整个系统被切成了多个服务,每个服务掌管着一个数据库。那么多个数据库之间的数据一致性就成了问题,虽然有像XA这种强一致性事务的支持,但是这种强一致性在互联网的应用中并不适合,人们还是更倾向于使用最终一致性的解决方案,在最终一致性的解决方案中,使用MQ保证各个系统之间的数据一致性又是首选。

RocketMQ为我们提供了事务消息的功能,它使得我们投放消息和其他的一些操作保持一个整体的原子性。比如:向数据库中插入数据,再向MQ中投放消息,把这两个动作作为一个原子性的操作。貌似其他的MQ是没有这种功能的。

但是,纵观全网,讲RocketMQ事务消息的博文中,几乎没有结合数据库的,都是直接投放消息,然后讲解事务消息的几个状态,虽然讲的也没毛病,但是和项目中事务最终一致性的落地方案还相距甚远。包括我自己在内,在项目中,服务化以后,用MQ保证事务的最终一致性,在网上一搜,根本没有落地的方案,都是侃侃而谈。于是,我写下这篇博文,结合数据库,来谈一谈RocketMQ的事务消息到底怎么用。

基础概念

要使用RocketMQ的事务消息,要实现一个TransactionListener的接口,这个接口中有两个方法,如下:

/**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

/**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
LocalTransactionState checkLocalTransaction(final MessageExt msg);

RocketMQ的事务消息是基于两阶段提交实现的,也就是说消息有两个状态,prepared和commited。当消息执行完send方法后,进入的prepared状态,进入prepared状态以后,就要执行executeLocalTransaction方法,这个方法的返回值有3个,也决定着这个消息的命运,

  • COMMIT_MESSAGE:提交消息,这个消息由prepared状态进入到commited状态,消费者可以消费这个消息;
  • ROLLBACK_MESSAGE:回滚,这个消息将被删除,消费者不能消费这个消息;
  • UNKNOW:未知,这个状态有点意思,如果返回这个状态,这个消息既不提交,也不回滚,还是保持prepared状态,而最终决定这个消息命运的,是checkLocalTransaction这个方法。

当executeLocalTransaction方法返回UNKNOW以后,RocketMQ会每隔一段时间调用一次checkLocalTransaction,这个方法的返回值决定着这个消息的最终归宿。那么checkLocalTransaction这个方法多长时间调用一次呢?我们在BrokerConfig类中可以找到,

 /**
  * Transaction message check interval.
  */
@ImportantField
private long transactionCheckInterval = 60 * 1000;

这个值是在brokder.conf中配置的,默认值是60*1000,也就是1分钟。那么会检查多少次呢?如果每次都返回UNKNOW,也不能无休止的检查吧,

/**
 * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
 */
@ImportantField
private int transactionCheckMax = 5;

这个是检查的最大次数,超过这个次数,如果还返回UNKNOW,这个消息将被删除。

事务消息中,TransactionListener这个最核心的概念介绍完后,我们看看代码如何写吧。

落地案例

我们在数据库中有一张表,具体如下:

CREATE TABLE `s_term` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `term_year` year(4) NOT NULL ,
  `type` int(1) NOT NULL DEFAULT '1' ,
  PRIMARY KEY (`id`)
) 

字段的具体含义大家不用管,一会我们将向这张表中插入一条数据,并且向MQ中投放消息,这两个动作是一个原子性的操作,要么全成功,要么全失败。

我们先来看看事务消息的客户端的配置,如下:

@Bean(name = "transactionProducer",initMethod = "start",destroyMethod = "shutdown")
public TransactionMQProducer transactionProducer() {
    TransactionMQProducer producer = new
        TransactionMQProducer("TransactionMQProducer");
    producer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
    producer.setTransactionListener(transactionListener());
    return producer;
}

@Bean
public TransactionListener transactionListener() {
    return new TransactionListenerImpl();
}

我们使用TransactionMQProducer生命生产者的客户端,并且生产者组的名字叫做TransactionMQProducer,后面NameServer的地址没有变化。最后就是设置了一个TransactionListener监听器,这个监听器的实现我们也定义了一个Bean,返回的是我们自定义的TransactionListenerImpl,我们看看里边怎么写的吧。

public class TransactionListenerImpl implements TransactionListener {
    @Autowired
    private TermMapper termMapper;

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        Integer termId = (Integer)arg;
        Term term = termMapper.selectById(termId);
        System.out.println("executeLocalTransaction termId="+termId+" term:"+term);
        if (term != null) return COMMIT_MESSAGE;

        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String termId = msg.getKeys();
        Term term = termMapper.selectById(Integer.parseInt(termId));
        System.out.println("checkLocalTransaction termId="+termId+" term:"+term);
        if (term != null) {
            System.out.println("checkLocalTransaction:COMMIT_MESSAGE");
            return COMMIT_MESSAGE;
        }
        System.out.println("checkLocalTransaction:ROLLBACK_MESSAGE");
        return ROLLBACK_MESSAGE;
    }
}

在这个类中,我们要实现executeLocalTransaction和checkLocalTransaction两个方法,其中executeLocalTransaction是在执行完send方法后立刻执行的,里边我们根据term表的id去查询,如果能够查询出结果,就commit,消费端可以消费这个消息,如果查询不到,就返回一个UNKNOW,说明过一会会调用checkLocalTransaction再次检查。在checkLocalTransaction方法中,我们同样用termId去查询,这次如果再查询不到就直接回滚了。

好了,事务消息中最重要的两个方法都已经实现了,我们再来看看service怎么写吧,

@Autowired
private TermMapper termMapper;
@Autowired
@Qualifier("transactionProducer")
private TransactionMQProducer producer;

@Transactional(rollbackFor = Exception.class)
public void sendTransactionMQ() throws Exception {
    Term term = new Term();
    term.setTermYear(2020);
    term.setType(1);
    int insert = termMapper.insert(term);

    Message message = new Message();
    message.setTopic("cluster-topic");
    message.setKeys(term.getId()+"");
    message.setBody(new String("this is transaction mq "+new Date()).getBytes());

    TransactionSendResult sendResult = producer
        .sendMessageInTransaction(message, term.getId());
    System.out.println("sendResult:"+sendResult.getLocalTransactionState() 
                       +" 时间:"+new Date());
}
  • 在sendTransactionMQ方法上,我们使用了@Transactional注解,那么在这个方法中,发生任何的异常,数据库事务都会回滚;
  • 然后,我们创建Term对象,向数据库中插入Term;
  • 构建Mesaage的信息,将termId作为message的key;
  • 使用sendMessageInTransaction发送消息,传入message和termId,这两个参数和executeLocalTransaction方法的入参是对应的。

最后,我们在test方法中,调用sendTransactionMQ方法,如下:

@Test
public void sendTransactionMQ() throws InterruptedException {
    try {
        transactionService.sendTransactionMQ();
    } catch (Exception e) {
        e.printStackTrace();
    }

    Thread.sleep(600000);
}

整个生产端的代码就是这些了,消费端的代码没有什么变化,就不给大家贴出来了。接下来,我们把消费端的应用启动起来,消费端的应用最好不要包含生产端的代码,因为TransactionListener实例化以后,就会进行监听,而我们在消费者端是不希望看到TransactionListener中的日志的。

我们运行一下生产端的代码,看看是什么情况,日志如下:

executeLocalTransaction termId=15 term:com.example.rocketmqdemo.entity.Term@4a3509b0
sendResult:COMMIT_MESSAGE 时间:Wed Jun 17 08:56:49 CST 2020
  • 我们看到,先执行的是executeLocalTransaction这个方法,termId打印出来了,发送的结果也出来了,是COMMIT_MESSAGE,那么消费端是可以消费这个消息的;
  • 注意一下两个日志的顺序,先执行的executeLocalTransaction,说明在执行sendMessageInTransaction时,就会调用监听器中的executeLocalTransaction,它的返回值决定着这个消息是否真正的投放到队列中;

再看看消费端的日志,

msgs.size():1
this is transaction mq Wed Jun 17 08:56:49 CST 2020

消息被正常消费,没有问题。那么数据库中有没有termId=15的数据呢?我们看看吧,

image-20200617090229595.png

数据是有的,插入数据也是成功的。

这样使用就真的正确的吗?我们改一下代码看看,在service方法中抛个异常,让数据库的事务回滚,看看是什么效果。改动代码如下:

@Transactional(rollbackFor = Exception.class)
public void sendTransactionMQ() throws Exception {
    ……
    throw new Exception("数据库事务异常");
}

抛出异常后,数据库的事务会回滚,那么MQ呢?我们再发送一个消息看看,

生产端的日志如下:

executeLocalTransaction termId=16 term:com.example.rocketmqdemo.entity.Term@5d6b5d3d
sendResult:COMMIT_MESSAGE 时间:Wed Jun 17 09:07:15 CST 2020

java.lang.Exception: 数据库事务异常
  • 从日志中,我们可以看到,消息是投放成功的,termId=16,事务的返回状态是COMMIT_MESSAGE;
  • 最后抛出了我们定义的异常,那么数据库中应该是不存在这条消息的啊;

我们先看看数据库吧,

image-20200617091042921.png

数据库中并没有termId=16的数据,那么数据库的事务是回滚了,而消息是投放成功的,并没有保持原子性啊。那么为什么在执行executeLocalTransaction方法时,能够查询到termId=16的数据呢?还记得MySQL的事务隔离级别吗?忘了的赶快复习一下吧。在事务提交前,我们是可以查询到termId=16的数据的,所以消息提交了,看看消费端的情况,

msgs.size():1
this is transaction mq Wed Jun 17 09:07:15 CST 2020

消息也正常消费了,这明显不符合我们的要求,我们如果在微服务之间使用这种方式保证数据的最终一致性,肯定会有大麻烦的。那我们该怎么使用s呢?我们可以在executeLocalTransaction方法中,固定返回UNKNOW,数据插入数据库成功也好,失败也罢,我们都返回UNKNOW。那么这个消息是否投放到队列中,就由checkLocalTransaction决定了。checkLocalTransaction肯定在sendTransactionMQ后执行,而且和sendTransactionMQ不在同一事务中。我们改一下程序吧,

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    return LocalTransactionState.UNKNOW;
}

其他的地方不用改,我们再发送一下消息,

sendResult:UNKNOW 时间:Wed Jun 17 09:56:59 CST 2020
java.lang.Exception: 数据库事务异常

checkLocalTransaction termId=18 term:null
checkLocalTransaction:ROLLBACK_MESSAGE
  • 事务消息发送的结果是UNKNOW,然后抛出异常,事务回滚;
  • checkLocalTransaction方法,查询termId=18的数据,为null,消息再回滚;

又看了一下消费端,没有日志。数据库中也没有termId=18的数据,这才符合我们的预期,数据库插入不成功,消息投放不成功。我们再把抛出异常的代码注释掉,看看能不能都成功。

@Transactional(rollbackFor = Exception.class)
public void sendTransactionMQ() throws Exception {
    ……
    //throw new Exception("数据库事务异常");
}

再执行一下发送端程序,日志如下:

sendResult:UNKNOW 时间:Wed Jun 17 10:02:57 CST 2020
checkLocalTransaction termId=19 term:com.example.rocketmqdemo.entity.Term@3b643475
checkLocalTransaction:COMMIT_MESSAGE
  • 发送结果返回UNKNOW;
  • checkLocalTransaction方法查询termId=19的数据,能够查到;
  • 返回COMMIT_MESSAGE,消息提交到队列中;

先看看数据库中的数据吧,

image-20200617100601213.png

termId=19的数据入库成功了,再看看消费端的日志,

msgs.size():1
this is transaction mq Wed Jun 17 10:02:56 CST 2020

消费成功,这才符合我们的预期。数据插入数据库成功,消息投放队列成功,消费消息成功。

总结

事务消息最重要的就是TransactionListener接口的实现,我们要理解executeLocalTransaction和checkLocalTransaction这两个方法是干什么用的,以及它们的执行时间。再一个就是和数据库事务的结合,数据库事务的隔离级别大家要知道。把上面这几点掌握了,就可以灵活的使用RocketMQ的事务消息了。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
3月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
3月前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
193 2
|
8月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
5月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
6月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
439 1
|
6月前
|
消息中间件 调度 RocketMQ
【RocketMQ系列六】RocketMQ事务消息
【RocketMQ系列六】RocketMQ事务消息
1090 1
|
7月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
7月前
|
消息中间件 网络性能优化 RocketMQ
消息队列 MQ产品使用合集之本地事务还没有执行完就触发了回查是什么导致的
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
8月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
662 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景