就是来了波狸猫换太子,其实延时消息也是这么实现的,最终将换了皮的消息入盘。
Broker 处理提交或者回滚消息的处理方法是 EndTransactionProcessor#processRequest
,我们来看一看它做了什么操作。
可以看到,如果是提交事务就是把皮再换回来写入真正的topic所属的队列中,供消费者消费,如果是回滚则是将半消息记录到一个 half_op 主题下,到时候后台服务扫描半消息的时候就依据其来判断这个消息已经处理过了。
那个后台服务就是 TransactionalMessageCheckService
服务,它会定时的扫描半消息队列,去请求反查接口看看事务成功了没,具体执行的就是TransactionalMessageServiceImpl#check
方法。
我大致说一下流程,这一步骤其实涉及到的代码很多,我就不贴代码了,有兴趣的同学自行了解。不过我相信用语言也是能说清楚的。
首先取半消息 topic 即RMQ_SYS_TRANS_HALF_TOPIC
下的所有队列,如果还记得上面内容的话,就知道半消息写入的队列是 id 是 0 的这个队列,然后取出这个队列对应的 half_op 主题下的队列,即 RMQ_SYS_TRANS_OP_HALF_TOPIC
主题下的队列。
这个 half_op 主要是为了记录这个事务消息已经被处理过,也就是说已经得知此事务消息是提交的还是回滚的消息会被记录在 half_op 中。
然后调用 fillOpRemoveMap
方法,从 half_op 取一批已经处理过的消息来去重,将那些没有记录在 half_op 里面的半消息调用 putBackHalfMsgQueue
又写入了 commitlog 中,然后发送事务反查请求,这个反查请求也是 oneWay,即不会等待响应。当然此时的半消息队列的消费 offset 也会推进。
然后producer中的 ClientRemotingProcessor#processRequest 会处理这个请求,会把任务扔到 TransactionMQProducer 的线程池中进行,最终会调用上面我们发消息时候定义的 checkLocalTransactionState
方法,然后将事务状态发送给 Broker,也是用 oneWay 的方式。
看到这里相信大家会有一些疑问,比如为什么要有个 half_op ,为什么半消息处理了还要再写入 commitlog 中别急听我一一道来。
首先 RocketMQ 的设计就是顺序追加写入,所以说不会更改已经入盘的消息,那事务消息又需要更新反查的次数,超过一定反查失败就判定事务回滚。
因此每一次要反查的时候就将以前的半消息再入盘一次,并且往前推进消费进度。而 half_op 又会记录每一次反查的结果,不论是提交还是回滚都会记录,因此下一次还循环到处理此半消息的时候,可以从 half_op 得知此事务已经结束了,因此就被过滤掉不需要处理了。
如果得到的反查的结果是 UNKNOW,那 half_op 中也不会记录此结果,因此还能再次反查,并且更新反查次数。
到现在整个流程已经清晰了,我再画个图总结一下 Broker 的事务处理流程。
Kafka 事务消息
Kafka 的事务消息和 RocketMQ 的事务消息又不一样了,RocketMQ 解决的是本地事务的执行和发消息这两个动作满足事务的约束。
而 Kafka 事务消息则是用在一次事务中需要发送多个消息的情况,保证多个消息之间的事务约束,即多条消息要么都发送成功,要么都发送失败,就像下面代码所演示的。
Kafka 的事务基本上是配合其幂等机制来实现 Exactly Once 语义的,所以说 Kafka 的事务消息不是我们想的那种事务消息,RocketMQ 的才是。
讲到这我就想扯一下了,说到这个 Exactly Once 其实不太清楚的同学很容易会误解。
我们知道消息可靠性有三种,分别是最多一次、恰好一次、最少一次,之前在消息队列连环问的文章我已经提到了基本上我们都是用最少一次然后配合消费者端的幂等来实现恰好一次。
消息恰好被消费一次当然我们所有人追求的,但是之前文章我已经从各方面已经分析过了,基本上难以达到。
而 Kafka 竟说它能实现 Exactly Once?这么牛啤吗?这其实是 Kafka 的一个噱头,你要说他错,他还真没错,你要说他对但是他实现的 Exactly Once 不是你心中想的那个 Exactly Once。
它的恰好一次只能存在一种场景,就是从 Kafka 作为消息源,然后做了一番操作之后,再写入 Kafka 中。
那他是如何实现恰好一次的?就是通过幂等,和我们在业务上实现的一样通过一个唯一 Id, 然后记录下来,如果已经记录过了就不写入,这样来保证恰好一次。
所以说 Kafka 实现的是在特定场景下的恰好一次,不是我们所想的利用 Kafka 来发送消息,那么这条消息只会恰巧被消费一次。
这其实和 Redis 说他实现事务了一样,也不是我们心想的事务。
所以开源软件说啥啥特性开发出来了,我们一味的相信,因此其往往都是残血的或者在特殊的场景下才能满足,不要被误导了,不能相信表面上的描述,还得详细的看看文档或者源码。
不过从另一个角度看也无可厚非,作为一个开源软件肯定是想更多的人用,我也没说谎呀,我文档上写的很清楚的,这标题也没骗人吧?
确实,比如你点进震惊xxxx标题的文章,人家也没骗你啥,他自己确实震惊的呢。
再回来谈 Kafka 的事务消息,所以说这个事务消息不是我们想要的那个事务消息,其实不是今天的主题了,不过我还是简单的说一下。
Kafka 的事务有事务协调者角色,事务协调者其实就是 Broker 的一部分。
在开始事务的时候,生产者会向事务协调者发起请求表示事务开启,事务协调者会将这个消息记录到特殊的日志-事务日志中,然后生产者再发送真正想要发送的消息,这里 Kafka 和 RocketMQ 处理不一样,Kafka 会像对待正常消息一样处理这些事务消息,由消费端来过滤这个消息。
然后发送完毕之后生产者会向事务协调者发送提交或者回滚请求,由事务协调者来进行两阶段提交,如果是提交那么会先执行预提交,即把事务的状态置为预提交然后写入事务日志,然后再向所有事务有关的分区写入一条类似事务结束的消息,这样消费端消费到这个消息的时候就知道事务好了,可以把消息放出来了。
最后协调者会向事务日志中再记一条事务结束信息,至此 Kafka 事务就完成了,我拿 confluent.io 上的图来总结一下这个流程。
最后
至此我们已经知道了 RocketMQ 和 Kakfa 的事务消息全流程,可以看到 RocketMQ 的事务消息才是我们想要的,当然你要是用的流式计算那么 Kakfa 的事务消息也是你想要的。
需要贴代码的文章其实很难受,这贴的多不好,贴的少又怕不清晰,真的难,如果觉得文章不错记得点个在看哟。