【RocketMQ系列六】RocketMQ事务消息

简介: 【RocketMQ系列六】RocketMQ事务消息

1. 事务消息的定义

事务消息可以认为是一个两阶段的提交消息实现,以确保分布式事务的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子执行。

两阶段提交主要保证了分布式事务的原子性:即所有结点要么全做要么全不做,所谓的两个阶段是指:第一阶段:准备阶段;第二阶段:提交阶段。

事务消息有三种状态:

  1. TransactionStatus.CommitTransaction: 提交事务,表示允许消费者消费该消息。
  2. TransactionStatus.RollbackTransaction: 回滚事务,表示该消息将被删除,不允许消费。
  3. TransactionStatus.Unknow: 中间状态,表示需要MQ回查才能确定状态。

2.事务消息的实现流程

  1. 生产者发送half消息,broker接收到half消息并回复half消息。
  2. 生产者调用 TransactionListener.executeTransaction() 方法执行本地事务。
  3. 生产者获得本地事务执行状态,提交给broker。如果状态是COMMIT_MESSAGE状态的话则broker会将消息推送给消费者。如果状态是ROLLBACK_MESSAGE状态的话则broker会丢弃此消息。如果状态是中间状态UNKNOW状态则broker会回查本地事务状态。
  4. 生产者调用 TransactionListener.checkLocalTransaction() 方法回查本地事务执行状态,并再次执行5,6,7三步骤,若回查次数超过15次则丢弃。

使用限制:

  1. 事务性消息没有调度和批处理支持。
  2. 为避免单条消息被检查次数过多,导致半队列消息堆积,我们默认单条消息的检查次数限制为15次,但用户可以通过更改 transactionCheckMax 来更改此限制,如果一条消息的检查次数超过 transactionCheckMax 次,broker默认会丢弃这条消息,同时打印错误日志。用户可以重写 AbstractTransactionCheckListener 类来改变这种行为。
  3. 事务消息将一定时间后检查,该时间由代理配置中的参数 transactionTimeout 确定。并且用户也可以在发送事务消息时通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,这个参数优先于 transactionMsgTimeout 参数。
  4. 一个事务性消息会被检查或消费不止一次。
  5. 事务性消息的生产者ID不能与其他类型消息的生产者ID共享,与其他类型的消息不同,事务性消息允许向后查询。MQ服务器通过其生产者ID查询客户端。
  6. 提交给用户目标主题的消息reput可能会失败,目前它取决于日志记录,高可用是由RocketMQ本身的高可用机制来保证的。如果要保证事务消息不丢失,保证事务完整性,推荐使用同步双写机制。

3. 事务消息的实现示例

3.1. 事务消息的消费者

事务消息的消费者与普通消息的消费者基本相同,也就是说事务消息是控制生产者端和broker端。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_transaction_consumer");
    consumer.setNamesrvAddr("172.31.184.89:9876");
    consumer.subscribe("TransactionTopic", "*");
    // 4.创建一个回调函数
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
      // 5.处理消息
      for (MessageExt msg : msgs) {
        System.out.println(msg);
        System.out.println("收到的消息内容:" + new String(msg.getBody()));
      }
      // 返回消费成功的对象
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    // 6.启动消费者
    consumer.start();
    System.out.println("消费者已经启动");

3.2. 本地事务的实现

事务消息最关键的地方是生产者本地事务的实现,生产者本地事务实现 TransactionListener 接口,并实现该接口中的executeLocalTransaction方法和checkLocalTransaction方法。

其中,executeLocalTransaction 方法的作用是执行本地事务。它在生产者每次发送half消息的时候被调用,

  1. 如果调用此方法返回LocalTransactionState.COMMIT_MESSAGE状态,则此消息会被消费者消费到。
  2. 如果返回 LocalTransactionState.ROLLBACK_MESSAGE 状态,则此消息会被broker丢弃
  3. 如果返回 LocalTransactionState.UNKNOW 状态,即中间状态,则broker会调用checkLocalTransaction方法进行回查,最多回查15次。

checkLocalTransaction方法的作用是检查本地事务, 它是生产者发送完所有消息的时候调用,主要是针对的是中间状态的消息进行调用。

同样的如果调用此方法返回前面提到的三种状态,broker也会做出相同的处理。

public class TransactionListenerImpl implements TransactionListener {
  /**
   * 执行本地事务
   * 当事务half消息发送成功,这个方法将被执行
   * 事务的half消息是发到 RMQ_SYS_TRANS_OP_HALF_TOPIC 的topic中
   *
   * @param msg 消息
   * @param arg arg 自定义业务参数
   * @return {@link LocalTransactionState}
   */
  @Override
  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    String tags = msg.getTags();
    System.out.println("============执行executeLocalTransaction方法,;消息内容是="+new String(msg.getBody()));
    if (StringUtils.contains(tags, "tagA")) {
      return LocalTransactionState.COMMIT_MESSAGE;
    } else if (StringUtils.contains(tags, "tagB")) {
      return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.UNKNOW;
  }
  /**
   * 检查本地事务
   * 回查本地事务状态,当half消息没响应时调用。
   *
   * @param msg 消息
   * @return {@link LocalTransactionState}
   */
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    String tags = msg.getTags();
    System.out.println("============执行checkLocalTransaction方法,;消息内容是="+new String(msg.getBody()));
    if (StringUtils.contains(tags, "tagC")) {
      return LocalTransactionState.COMMIT_MESSAGE;
    } else if (StringUtils.contains(tags, "tagD")) {
      return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.UNKNOW;
  }
}

3.3. 事务消息的生产者

事务消息的生产者与普通消息的生产者最核心的区别是事务消息的生产者需要事务监听器,并且是调用sendMessageInTransaction 方法发送 half 消息。

//1.定义事务监听器
    TransactionListener transactionListener = new TransactionListenerImpl();
    //2.定义生产者
    TransactionMQProducer producer = new TransactionMQProducer("transaction_produce_group");
    producer.setNamesrvAddr("172.31.184.89:9876");
    //3.定义线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 10, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(100), (runnable, executor) -> {
      BlockingQueue<Runnable> queue = executor.getQueue();
      try {
        queue.put(runnable);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    });
    //4.设置线程池
    producer.setExecutorService(threadPoolExecutor);
    //5.设置事务监听器
    producer.setTransactionListener(transactionListener);
    // 启动生产者
    producer.start();
    String[] tags = {"tagA", "tagB", "tagC", "tagD","tagE"};
    //发送10条half消息,消费者是收不到half消息的
    for (int i = 0; i < 10; i++) {
      Message message = new Message("TransactionTopic", tags[i % tags.length],
          "key" + i, ("飞哥测试事务消息" + tags[i % tags.length]+"_"+i).getBytes(StandardCharsets.UTF_8));
      TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
      System.out.println("本次发送的消息是=" + new String(message.getBody()));
      System.out.printf("%s%n", transactionSendResult);
      Thread.sleep(10);
    }
    System.out.println("==========所有消息发送完成======");
运行结果:

生产者:

从运行结果可以看出中间状态的消息最多回查15次,就像图中的消息 执行checkLocalTransaction方法,;消息内容是=飞哥测试事务消息tagE_9 broker调用checkLocalTransaction 方法回查了15次。

消费者:

我们可以看到最终消费者消费到的是消费的tags是tagA以及tagC的四条消息。

那么,为啥生产者发送的half消息,消费者不会里面收到呢?这是因为half消息会被放到 RMQ_SYS_TRANS_OP_HALF_TOPIC 的topic中,直到本地事务返回 COMMIT_MESSAGE 状态时,消费者才能消费到此消息 。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
2月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
2月前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
121 2
|
7月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
4月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
5月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
359 1
|
6月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
6月前
|
消息中间件 网络性能优化 RocketMQ
消息队列 MQ产品使用合集之本地事务还没有执行完就触发了回查是什么导致的
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
7月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
645 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
|
7月前
|
消息中间件 RocketMQ
MQ与本地事务一致性问题---RocketMQ事务型消息
MQ与本地事务一致性问题---RocketMQ事务型消息
79 2