开发者学堂课程【高校精品课-厦门大学 -JavaEE 平台技术:消息的和发送和接收】学习笔记,与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/80/detail/15976
消息的和发送和接收
消息的发送和接收,Rocket MQ 中间有三种发送消息的方式,同步的、异步的和单向的,同步的这种发送的方式 发送出这个消息以后 他会处于等待状态要,等待这个接收方发回确认的消息,响应以后才会发送下一个这个消息,那这个一般我们用的比较少,因为我们使用消息服务器的话,主要是要用他的这个异步的这样的一种
方式,那对于某些特别重要的通知,我们可能会采用这种同步的方式。
第一步发送的方式是我们主要会使用的一种方式,那当消息的发送方他在发送完消息以后,它其实是不用等待这个接收方的这个结果的,他会给接收方提供一个这个回调函数,当这个接收方有成功的消费的消息以后,他会发挥一个成功的响应,一个这个失败的这样的一个响应,所以它是基于一种回调的方式来处理这个消息的消费结果的,所以他不需要等不需要再发送完了以后,来等待他的这个执行的结果,那第三种就是这个单项的方式,单项发送的方式其实它就不需要回调。对于这种这个发送的方式以后,它基本上就是发送以后不管的,也不用关心他到底有没有处理
成功。
对于一些这种比较简单的业务,特别是我们相信这个消息服务器这个一定会把这种消息处理完成的,这样的一种业务来说,我们可以用单向发送的方式,比如说像写日志,从消费的这个角度来看这个消息服务器有提供了两种方式来这个获取这个消息,让消费者可以用拉取的方式或者用 pet的方式,那拉取的方式是消费者主动从服务器来获取这个消息,当获取到消息以后,然后他会再启动它的这个消费的过程,所以说我们称这个破的这种方式叫做主动的消费的这种方式,推送的这种方式都是用服务器去主动推送消息到这个消费者,这样的一种方式来说,由于是服务器主动推送过来的,所以说他不会考虑到这个消费者的处理能力,就把这个消息会推送过来,这种方式其实我们用的比较少,那在我们的这个例子中间,我们可以看到是可以定义这个消费方式的,但实际上来说啊,在我们的程序中间,虽然我们定义了消费方式,在整个的内部,它其实还是用拉取的方式来实现,他也是定时的到服
务器上去拉取这个消息,然后去调用我们的这个接口。来消费这样的一个消息。
从这个消息的类型上来说,它又被分为这个顺序消费和这个并行消费这两种,顺序消费是一种很特定的这种消费的方式,他要求说这个消息在消费的时候必须按照发送的顺序逐一的来做消费,那这样的一种消费方式,会使得说我们在整个消息里头,只会维持一个消息队列,这个消息队列里头只有一个消费者来消费,所以他会
对性能有非常大的影响。
我们比较常用的是这种并行消费的方式,就是发送到这个消息队列中间的这个发生的这个话题,Topic 中间的这个消息,其实是由多个消息的消费者来进行并行消费,所以他并不去保证说先发过来的消息一定会被先消费。这是两种这个消息消费的这样的一种方式我们在发送我们前面提到的这些消息都是普通的消息那在消息服务器中间 rocket mq 中间有两种很特殊的消息,一个是延时消息,一个是事物消息,延时消息是指生产者在发送这个消息到消息服务器 rockman q 的时候,他不希望消费者立马去消费这条消息,而是可以指定一个延时,就是在一个特定的时间点之后,再来消费这一条消息,延时的这个消息使用的场景很多,我们最常用的场景就是在电商中间的这个订单的支付问题,我们知道在系统里面订单的支付是由第三方的支付平台来完成的,所以我们的系统并不知道说这个订单是否有支付成功,就是当时并不知道订单是否支付成功,要等这个第三方的付平台回调这个接口,那这个因为会受到这个第三方的这个系统的回调时间,当然也可能是因为用户自己的原因,他在下一个订单以后没有立马支付,所以我们一般会定一个这个时间窗口,就支付的时间窗口,一般是30分钟,就是认为下了订单以后,30分钟他没有支付的
话,这条订单就会被取消掉,把这个库存给释放出来,供其他的人去买这样的一种实现的方式,利用消息服务器的延迟消息其实是非常容易。
那就是当我们产生一个订单的时候,我们会发一个延迟30分钟以后消费的这样的一个延迟消息,那当这个消费者在30分钟以后收到这条消息以后,再去检查这个订单,如果这个订单依然是一个未支付的状态,就是或者说是没有收到第三方的回调
函数啊,或者说是因为客户的原因,他没有去支付,那则会取消这个订单,把这个库存给它啊,把它扣回去,把它释放出来,这是这个延时消费的这样延时消息的这种方式。
那第二个我们要提到的是这个延时,第二个我们要提到的是这个延时消费其实他并不是说可以指定任何的时间点来消费的,在 rocket mq 中间这个延迟的消费是有一个固定的时间点的,分别是一秒五秒十秒三十秒一分钟两分钟三分钟四分钟五分钟六分钟七分钟八分钟到十分钟二十分钟三十分钟一个小时两个小时这些间隔,所以说当我们只指定这个延时发送的时候我们只能指定一个整数,整数就代表是我们刚刚这些间隔里头的第几集,这是因为说在 rom 空间这个延时消费其实他并不是单独去实现的一个功能,它利用的是我们在这个消息消费中间的重发机制,也就是当我们的服务消费者端那边程序,当消费一个数据不成功,要处理的数据他是不成功的时候,这种消息服务器发一个让他在同发的这样的一个消息,那消息服务器会把就是没有处理成功或者没有发送成功的这个消息,把它放到这个重发队列里头来,就是我们刚刚的这个间隔,间隔时间越来越长,然后再去把它重发给这个消费者去处理,延迟消息就是利用这个重发间隔来实现的,所以说当我们发到去到一个消息以后他不会立马在当前这个队列里头,而是放到那个重发队列里头,在我们指定的这个间隔之后,才会放到这个消费者的队列里头来,这是延时消息,失误消息是
rocket mq 中间跟我们数据库的事务关联在一起的这个消息,它可以在一个事物中间发送消息的时候。
他的分为几个阶段,第一个阶段发送 prepare的消息,那这时候其实消费者并不会去看得到这个状态的消息,虽然这个消息已经发送到消息服务器。那消息发送完以后,它就会回到这个生产者的这个里头,来继续执行,那生产者如果说执行了相应的这个数据库的事务,而且这个事物成功以后,那这个消息去返回一个 commit的这样的一个动作,那当这个消息服务器收到了这个 commit的这样的一个动作的时
候,它就会把这个成为 commit 状态的消息放到消费者的这个对中间,消费者就可以看到。
所以说这个事务型的消息,其实是当我们把消息发送到服务器的时候,他不会立马放到消费者的队列中间,而是通过一个回调函数来查询说在服务器在生产这边的事
务执行状况是什么样的,如果生产者的事务执行状况是成功的,消费者才能看到这个消息,去消费这个消息。