前言
又到了金三银四的找工作阶段,你一定被问过MQ是如何保证消息可靠性的或者MQ是如何保证消息不丢失的。我们都知道MQ发送消息一般分为三个阶段分别是生产者发送消息到MQ、MQ存储消息到内存或者硬盘,消费者消费消息。但是这三个过程都有可能因为种种原因导致消息丢失。例如在生产者发送阶段,这个阶段可能由于网络延迟导致mq消息丢失;存储阶段,Broker将消息先放到内存,然后再根据刷盘策略持久化到硬盘上,但是刚收到消息,还没持久化到硬盘服务器宕机了,那么消息就会丢失。在消费端消费时,mq由于网络原因在传输过程中把消息传丢了,而此时MQ也从队列中把消息删除了,或者消费者消费失败消息丢失了等等。那么MQ是如何保证消息不丢失的呢,下面总结一下rocketmq、rabbitmq、kafka是如何保证消息不丢失的。
正文
RocketMQ
Producer保证消息不丢失
1、RocketMQ发送消息有三种模式,即同步发送,异步发送、单向发送。
同步发送消息时会同步阻塞等待Broker返回发送结果,如果发送失败不会收到发送结果SendResult,这种是最可靠的发送方式。
异步发送消息可以在回调方法中得知发送结果。
单向发送是消息发送完之后就不管了,不管发送成功没成功,是最不可靠的一种方式。
/** * @description: 单向发送 * 这种方式主要用在不特别关心发送结果的场景,例如日志发送。 * @param: * @return: void * @author xiaojie * @date: 2021/11/9 23:39 */ public void sendMq() { for (int i = 0; i < 10; i++) { rocketMQTemplate.convertAndSend("xiaojie-test", "测试发送消息》》》》》》》》》" + i); } } /***********************************************************************************/ /** * @description: 同步发送 * 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。 * @param: * @return: void * @author xiaojie * @date: 2021/11/10 22:25 */ public void sync() { SendResult sendResult = rocketMQTemplate.syncSend("xiaojie-test", "sync发送消息。。。。。。。。。。"); log.info("发送结果{}", sendResult); } /***********************************************************************************/ /** * @description: 异步发送 * 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。 * @param: * @return: void * @author xiaojie * @date: 2021/11/10 22:29 */ public void async() { String msg = "异步发送消息。。。。。。。。。。"; log.info(">msg:<<" + msg); rocketMQTemplate.asyncSend("xiaojie-test", msg, new SendCallback() { @Override public void onSuccess(SendResult var1) { log.info("异步发送成功{}", var1); } @Override public void onException(Throwable var1) { //发送失败可以执行重试 log.info("异步发送失败{}", var1); } }); }
2、生产者的重试机制
mq为生产者提供了失败重试机制,同步发送和异步发送默认都是失败重试两次当然可以修改重试次数,如果多次还是失败,那么可以采取记录这条信息,然后人工采取补偿机制。
Broker保证消息不丢失
1、刷盘策略
RocketMq持久化消息有两种策略即同步刷盘和异步刷盘。默认情况下是异步刷盘,此模式下当生产者把消息发送到broker,消息存到内存之后就认为消息发送成功了,就会返回给生产者消息发送成功的结果。但是如果消息还没持久化到硬盘,服务器宕机了,那么消息就会丢失。同步刷盘是当Broker接收到消息并且持久化到硬盘之后才会返回消息发送成功的结果,这样就会保证消息不会丢失,但是同步刷盘相对于异步刷盘来说效率上有所降低,大概降低10%,具体情况根据业务需求设定吧。
修改配置文件中刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
2、集群模式
#主从复制方式ASYNC_MASTER异步复制,SYNC_MASTER同步复制 brokerRole=SYNC_MASTER #刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘 flushDiskType=SYNC_FLUSH
此模式是broker保证消息不丢失的配置,主从复制同步复制,刷盘模式同步刷盘,但是这种模式下性能会有所降低。
Consumer保证消息不丢失
1、手动ack
/** * @author xiaojie * @version 1.0 * @description: 消费端确认消息消费成功的消费者 * @date 2022/3/8 23:23 */ @Component @Slf4j public class MqConsumerAck implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg:msgs){ log.info("接收到的消息是>>>>>>>{}",new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
2、消费者消费失败重试机制
消费者消费失败会自动重试,如果消费失败没有手动ack则会自动重试15次。
RabbitMQ
Producer保证消息不丢失
1、rabbitMQ引入了事务机制和确认机制(confirm)
事务机制开启之后,相当于同步执行,必然会降低系统的性能,因此一般我们不采用这种方式。
确实机制,是当mq收到生产者发送的消息时,会返回一个ack告知生产者,收到了这条消息,如果没有收到,那就采取重试机制后者其他方式补偿。
事务模式
public static void main(String[] args) { try { System.out.println("生产者启动成功.."); // 1.创建连接 connection = MyConnection.getConnection(); // 2.创建通道 channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "测试事务机制保证消息发送可靠性。。。。"; channel.txSelect(); //开启事务 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8)); //发生异常时,mq中并没有新的消息入队列 //int i=1/0; //没有发生异常,提交事务 channel.txCommit(); System.out.println("生产者发送消息成功:" + msg); } catch (Exception e) { e.printStackTrace(); //发生异常则回滚事务 try { if (channel != null) { channel.txRollback(); } } catch (IOException ioException) { ioException.printStackTrace(); } } finally { try { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
comfirm模式
#开启生产者确认模式 publisher-confirm-type: correlated # 打开消息返回,如果投递失败,会返回消息 publisher-returns: true #publisher-confirm-type有3种取值 #NONE值是禁用发布确认模式,是默认值 #CORRELATED值是发布消息成功到交换器后会触发回调方法 #SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法
回调函数方法类
@Component @Slf4j public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { //指定 ConfirmCallback rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("correlation>>>>>>>{},ack>>>>>>>>>{},cause>>>>>>>>{}", correlationData, ack, cause); if (ack) { //确认收到消息 } else { //收到消息失败,可以自定义重试机制,或者将失败的存起来,进行补偿 } } /* * * @param returnedMessage * 消息是否从Exchange路由到Queue, 只有消息从Exchange路由到Queue失败才会回调这个方法 * @author xiaojie * @date 2021/9/29 13:53 * @return void */ @Override public void returnedMessage(ReturnedMessage returnedMessage) { log.info("被退回信息是》》》》》》{}", returnedMessage.getMessage()); log.info("replyCode》》》》》》{}", returnedMessage.getReplyCode()); log.info("replyText》》》》》》{}", returnedMessage.getReplyText()); log.info("exchange》》》》》》{}", returnedMessage.getExchange()); log.info("routingKey>>>>>>>{}", returnedMessage.getRoutingKey()); } }
2、重试机制
rabbitmq同样为生产者设置了重试机制默认是3次,同样可以修改重试次数,超过了最大重试次数限制采取人工补偿机制。
Broker保证消息不丢失
1、rabbitMq持久化机制
消息到达mq之后,mq宕机了,然后消息又没有进行持久化,这时消息就会丢失。开启mq的持久化机制,消息队列,交换机、消息都要开启持久化。
开启持久化操作请参考 RabbitMq确认机制&SpringBoot整合RabbitMQ_熟透的蜗牛的博客-CSDN博客
2、使用镜像集群
3、如果队列满了,多余的消息发送到Broker时可以使用死信队列保证消息不会被丢弃
Consumer保证消息不丢失
1、开启消费端的手动ack
manual-手动ack
auto 自动
none 不使用ack
手动ack代码
@Component @Slf4j public class SnailConsumer { @RabbitListener(queues = "snail_direct_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { // 获取消息Id String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); log.info("获取到的消息>>>>>>>{},消息id>>>>>>{}", msg, messageId); try { int result = 1 / 0; System.out.println("result" + result); // // 手动ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手动签收 channel.basicAck(deliveryTag, false); } catch (Exception e) { //拒绝消费消息(丢失消息) 给死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } }
2、同样可以使用消费者的重试机制,重试超过最大次数还没成功则采取人工补偿机制。
Kafka
Producer保证消息不丢失
1、producer的ack机制
kafka的生产者确认机制有三种取值分别为0、1、-1(all)
acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障)。
acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有follwer服务器的完全确认即可做出回应,在这种情况下,当leader还没有将数据同步到Follwer宕机,存在丢失数据的可能性。
acks = -1代表所有的所有的分区副本备份完成,不会丢失数据这是最强有力的保证。但是这种模式往往效率相对较低。
2、producer重试机制
Broker保证消息不丢失
kafka的broker使用副本机制保证数据的可靠性。每个broker中的partition我们一般都会设置有replication(副本)的个数,生产者写入的时候首先根据分发策略(有partition按partition,有key按key,都没有轮询)写入到leader中,follower(副本)再跟leader同步数据,这样有了备份,也可以保证消息数据的不丢失。
Consumer保证消息不丢失
1、手动ack
/* * * @param message * @param ack * @手动提交ack * containerFactory 手动提交消息ack * errorHandler 消费端异常处理器 * @author xiaojie * @date 2021/10/14 * @return void */ @KafkaListener(containerFactory = "manualListenerContainerFactory", topics = "xiaojie-topic", errorHandler = "consumerAwareListenerErrorHandler" ) public void onMessageManual(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) { for (int i=0;i<record.size();i++){ System.out.println(record.get(i).value()); } ack.acknowledge();//直接提交offset }
2、offset commit
消费者通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。kafka并不像其他消息队列,消费完消息之后,会将数据从队列中删除,而是维护了一个日志文件,通过时间和储存大小进行日志删除策略。如果offset没有提交,程序启动之后,会从上次消费的位置继续消费,有可能存在重复消费的情况。
Offset Reset 三种模式
earliest(最早):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest(最新的):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。
none(没有):topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
以上有完整的代码可以自行取用 请点我!走你