我要实现一个推单功能了

简介: 以前都是作为消息接收方,接收消息。记得当时做支付的时候,接收第三方支付公司的各种消息,如支付成功、支付失败、退款成功、退款失败。

以前都是作为消息接收方,接收消息。记得当时做支付的时候,接收第三方支付公司的各种消息,如支付成功、支付失败、退款成功、退款失败。

有的公司在消息推送的准确性与及时性方面做的很好,有的就很差,尤其是几家欧洲公司,如果我方消费失败,该消息会阻塞在队头,导致整个消费队列卡住,无法消费后面消息,引起我们的各种吐槽。

天道好轮回,苍天饶过了谁,终于轮到我当发送方了。虽然这个项目难度不是很高,但这是自己第一个推送相关的项目,所以记录一下。

原因

本来提供了拉单功能,消费方可定时拉单。

但定时拉单有几个问题,一是时效性问题,如消费方每N分钟拉取一次数据,则最大延迟为N分钟,时效性不好;二是拉取到的订单信息可能不全,对于信息不全的订单,消费方需要定时重新获取,增加了复杂性。

推单方案则能解决以上问题,首先只有信息完整才推单,其次是信息完整后立即推单。这既能保证时效性,同时也能降低对接复杂度。

方案调研

以前没做过推单功能,仔细想了一下,需要先调研三方面内容:推送什么内容、如何推送、推送失败如何处理。

推送内容

如何确定给消费方推送的内容呢?

通过与消费方沟通、咨询公司业务方、调查竞品,确定好推送内容。

不过仍有未考虑到的地方,在与接收方联调推单模式时,有的接收方想要批量推送功能。

目前设计为一单一单推送,基于现状也不会做批量推送功能,还是因为时效性问题,有消息就尽快推送给接收方,接收方收到后尽快流转。批量推送会让整体时效降低,而且实现上也会更加复杂。

但这个示例说明调研上有缺漏,今后可多调研几家,有些需求可以选择不做,但要知晓需求。

如何推送

如何将数据推送出去呢?

当然得有个平台!因为推送要考虑很多细节,如数据安全、异常处理、接口管理等,好在有部门已经做了推送的管理平台。提供出两种方案,同步方案和异步方案。

同步方案

同步方式,是消息发送方,直接调用接收方接口,能够立即感知结果

异步方案

异步方式,接收方将消息推送到消息队列,接收方按需进行消费

区别

同步和异步方式有如下区别

  1. 同步方式可直接知道接收方处理结果;异步方式无法知道处理结果
  2. 同步方式由发送方保证消息推送成功;异步方式由接收方自行保证

异步方式在设计上,只支持同类型的消息推送给同一个接收方,但商家订单属于不同接收方,使用异步方式会导致信息泄露。而且异步方式也无法知道消息接收情况,所以最终选择同步方案。

异常处理

推送流程比较简单,消费mq消息,给接收者推送订单信息。我们永远不要相信网络和接收方,总是出现各种各样的问题,当消息消费失败后如何处理呢?

当然希望Consumer在消费消息包的时候,如果出现一些异常,希望消息包不被直接丢弃,而是可以过段时间继续消费,同时不产生阻塞。简单来说就是重试。

rabbitmq

以前用的rabbitmq,消费异常后,可以将消息包重新放入主队列的队尾,有两种方案:

死信队列
  • basic_consume设置ack模式
  • 声明死信队列,设置x-message-ttl=30000x-dead-letter-routing-key=主队列名
  • 声明主队列,设置x-dead-letter-routing-key=死信队列名
  • 主队列通过RoutingKey绑定到Exchange

如果消费逻辑出现异常,消费脚本会调用basic\_reject(),消息包会被RabbitMQ Requeue到死信队列中。30s超时后,消息包会重新进入主队列的队尾。

重新投递

通过chanel.basicAck(tagId, false)与chanel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), true, MessageProperties.PERSISTENT\_TEXT\_PLAIN, body.getbytes()); 搭配可将消息放回消息队尾,这两个函数一个不再将未被确认的消息发送回队列,一个用于重新投递消息。

rocketmq

现在公司使用的rocketmq,基于我对rocketmq的理解,它也是可以进行无阻塞重试的,因为有重试队列嘛:

重试队列

消费失败后,消息会进入到 RocketMQ 的重试队列中。

  • 比如说消费者所属的消息组名称为AAAConsumerGroup
  • 其重试队列名称就叫做%RETRY%AAAConsumerGroup
  • 重试队列中的消息过一段时间会再次发送给消费者,如果还是无法正常执行会再次进入重试队列
  • 默认重试16次,还是无法执行,消息就会从重试队列进入到死信队列

因为公司对rocketmq做了一些更改,所以找同学确认重试机制。问了很多同学,大家都说是阻塞性重试,消息阻塞在队头,直到重试成功或达到重试上限。没办法,只能再找对应的研发同学进行确认。

原来他们设计了两种配置,消息有序和消息无序。在有序情况下,消费失败后会阻塞在队头,直到重试成功;无序情况下,会进入重试队列,根据设置的重试间隔和重试次数进行重试,不会阻塞。如此一来,重试问题也解决了。

之所以关注这一点,是因为阻塞性重试会导致后面的消息无法推送,对功能产生影响。

而且即使是非阻塞性重试,也最好设置重试上限,如果异常太多,容易导致消息生产方压力过大,产生崩溃。需要明白多次投递失败的责任方在接收者。

监控

项目上线后,需要进行监控,否则无法感知运行情况。数据团队同学给力,很快整理好报表,能够实时查看推单成功数量、推单失败数量、订单信息完整时间、推单失败细节。

通过这些数据,能快速发现隐藏问题,也能分析出各接收方的情况,有的接收方确实是一言难尽。

发展

项目上线后,能够确定订单信息完整时间,并能在订单信息完整后,将数据推送给相关方。

后续会优化拉单功能,保证拉到的订单都是完整订单,同时会更改系统中判断信息完整的逻辑,将判断模块做简化和收敛,为今后的系统更新做好准备。

资料

  1. 消息队列RocketMQ版消费消息失败是否会重新消费
  2. 消息队列中消息消费失败后的处理机制
  3. rabbitmq消息重回队列
  4. rabbitmq重试机制
  5. RabbitMQ的消息确认机制
  6. 团队使用RabbitMQ几个场景
  7. https://www.rabbitmq.com/documentation.html
  8. rabbitmq死信队列
  9. RabbitMQ 死信队列 + TTL介绍
  10. RocketMQ 死信队列 | 消费者出现异常如何处理?
  11. RocketMQ消费消息失败的处理办法

最后

大家如果喜欢我的文章,可以关注我的公众号(程序员麻辣烫)

我的个人博客为:https://shidawuhen.github.io/

往期文章回顾:

  1. 设计模式
  2. 招聘
  3. 思考
  4. 存储
  5. 算法系列
  6. 读书笔记
  7. 小工具
  8. 架构
  9. 网络
  10. Go语言
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 监控
自顶向下学习 RocketMQ(十):消息重投和消息重试
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。
自顶向下学习 RocketMQ(十):消息重投和消息重试
|
缓存 监控 网络协议
微服务系列:服务注册与发现原理详解
本文详细解析了微服务架构中的服务注册与发现原理,大厂面试高频,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
微服务系列:服务注册与发现原理详解
|
缓存 小程序 数据库
小程序页面之间(传值)传递数据的方法
小程序页面之间(传值)传递数据的方法
471 63
|
消息中间件 存储 监控
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
798 0
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
343 0
|
消息中间件 Kafka Apache
RocketMQ - 生产者原理
RocketMQ - 生产者原理
239 0
|
负载均衡 网络协议 数据管理
深入解析Nacos:服务发现、配置管理与更多特性解析
深入解析Nacos:服务发现、配置管理与更多特性解析
1276 0
|
消息中间件 存储 负载均衡
【Alibaba中间件技术系列】「RocketMQ技术专题」帮你梳理RocketMQ或Kafka的选择理由以及二者PK
【Alibaba中间件技术系列】「RocketMQ技术专题」帮你梳理RocketMQ或Kafka的选择理由以及二者PK
560 78
【Alibaba中间件技术系列】「RocketMQ技术专题」帮你梳理RocketMQ或Kafka的选择理由以及二者PK
|
消息中间件 存储 缓存
消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的?(上)
消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的?(上)
消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的?(上)
|
消息中间件 存储 对象存储
消息队列 MQ产品使用合集之对于RocketMQ Proxy GRPC消息重发,该怎么解决
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
260 0