MQ 事务消息
本文档主要介绍 MQ 事务的概念、适用场景以及使用过程中的注意事项。
概念介绍
- 事务消息:MQ 提供类似 X/Open XA 的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致。
- 半消息:暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
- 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。
适用场景
MQ 事务消息适用于如下场景:
- 帮助用户实现类似 X/Open XA 的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致。
使用方式
交互流程
MQ 事务消息交互流程如下所示:
其中:
- 发送方向 MQ 服务端发送消息;
- MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
- 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。
事务消息发送对应步骤1、2、3、4,事务消息回查对应步骤5、6、7。
示例代码
注意事项
事务消息的 Producer ID 不能与其他类型消息的 Producer ID 共用。
通过 ONSFactory.createTransactionProducer 创建事务消息的 Producer 时必须指定 LocalTransactionChecker 的实现类,处理异常情况下事务消息的回查。
事务消息发送完成本地事务后,可在 execute 方法中返回如下三种状态:
- TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。
- TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。
- TransactionStatus.Unknow 暂时无法判断状态,期待固定时间以后 MQ Server 向发送方进行消息回查。
可通过如下方式给每条消息设定第一次消息回查的最快时间: Message message = new Message();
// 在消息属性中添加第一次消息回查的最快时间,单位秒,如下设置实际第一次回查时间为 120 ~ 125 秒之间
message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"120");
// 以上方式只确定事务消息的第一次回查的最快时间,实际回查时间向后浮动0~5秒;如第一次回查后事务仍未提交,后续每隔5秒回查一次。