RabbitMQ特殊应用

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: RabbitMQ特殊应用

一、简介

按照现有rabbitMQ的相关知识,⽣产者会发送消息到达消息服务器。但是在实际⽣产环境下,消息⽣产者发送的消息很有可能当到达了消息服务器之后,由于消息服务器的问题导致消息丢失,如宕机。因为消息服务器默认会将消息存储在内存中。⼀旦消息服务器宕机,则消息会产⽣丢失。因此要保证⽣产者的消息不丢失,要开始持久化策略。

rabbitMQ持久化:
 1. 交换机持久化
 2. 队列持久化
 3. 消息持久化

RabbitMQ数据保护机制:

事务机制

事务机制采⽤类数据库的事务机制进⾏数据保护,当消息到达消息服务器,⾸先会开启⼀个事务,接着进⾏数据磁盘持久化,只有持久化成功才会进⾏事务提交,向消息⽣产者返回成功通知,消息⽣产者⼀旦接收成功通知则不会再发送此条消息。当出现异常,则返回失败通知.消息⽣产者⼀旦接收失败通知,则继续发送该条消息。 事务机制虽然能够保证数据安全,但是此机制采⽤的是同步机制,会产⽣系统间消息阻塞,影响整个系统的消息吞吐量。从⽽导致整个系统的性能下降,因此不建议使⽤。

confirm机制

confirm模式需要基于channel进⾏设置, ⼀旦某条消息被投递到队列之后,消息队列就会发送⼀个确认信息给⽣产者,如果队列与消息是可持久化的, 那么确认消息会等到消息成功写⼊到磁盘之后发出。 confirm的性能⾼,主要得益于它是异步的.⽣产者在将第⼀条消息发出之后等待确认消息的同时也可以继续发送后续的消息.当确认消息到达之后,就可以通过回调⽅法处理这条确认消息. 如果MQ服务宕机了,则会返回nack消息. ⽣产者同样在回调⽅法中进⾏后续处理。

二、必达消息(confirm)

1、原理

基于实现的ConfirmCallback接口,假如RabbitMQ收到消息后,会回调实现这个接口的类。

@FunctionalInterface
public interface ConfirmCallback {
    void confirm(@Nullable CorrelationData var1, boolean var2, @Nullable String var3);
}

2、pom.xml

# 开启confirm机制
spring.rabbitmq.publisher-returns=true

3、配置类

@Configuration
public class RabbitMQConfig {
    //声明队列,并开启持久化
    @Bean
    public Queue queue() {
        /**
         * 第⼀个参数:队列名称
         * 第⼆个参数:是否开启队列持久化
         */
        return new Queue("seckill_order", true);
    }
}

4、业务实现

@Override
public boolean add(Long id, String time, String username) {
    //发送消息(消息必达)
    customMessageSender.sendMessage("", "seckill_order", JSON.toJSONString(seckillOrder));
}

5、必达工具类

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
 * ⾃定义消息发送类
 * 增强RabbitTemplate
 */
@Component
public class CustomMessageSender implements RabbitTemplate.ConfirmCallback {
    static final Logger log = LoggerFactory.getLogger(CustomMessageSender.class);
    private static final String MESSAGE_CONFIRM_ = "message_confirm_";
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private RedisTemplate redisTemplate;
    /**
     * 构造⽅法
     *
     * @param rabbitTemplate
     */
    public CustomMessageSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }
    /**
     * ⽣产者通知回调⽅法
     *
     * @param correlationData 唯⼀标识
     * @param ack             成功/失败
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            //返回成功通知
            //删除redis中的相关数据
            redisTemplate.delete(correlationData.getId());
            redisTemplate.delete(MESSAGE_CONFIRM_ + correlationData.getId());
        } else {
            //返回失败通知
            Map<String, String> map = (Map<String, String>) redisTemplate.opsForHash().entries(MESSAGE_CONFIRM_ + correlationData.getId());
            String exchange = map.get("exchange");
            String routingKey = map.get("routingKey");
            String sendMessage = map.get("sendMessage");
            //重新发送
            rabbitTemplate.convertAndSend(exchange, routingKey, JSON.toJSONString(sendMessage));
        }
    }
    /**
     * ⾃定义发送⽅法
     *
     * @param exchange   交换器
     * @param routingKey 路由键
     * @param message    消息内容
     */
    public void sendMessage(String exchange, String routingKey, String message) {
        //设置消息唯⼀标识并存⼊缓存
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        redisTemplate.opsForValue().set(correlationData.getId(), message);
        //本次发送到相关元信息存⼊缓存
        Map<String, String> map = new HashMap<>();
        map.put("exchange", exchange);
        map.put("routingKey", routingKey);
        map.put("sendMessage", message);
        redisTemplate.opsForHash().putAll(MESSAGE_CONFIRM_ + correlationData.getId(),
                map);
        //携带唯⼀标识发送消息
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
    }
}

三、成功后回执

1、原理

⾃动应答机制: 消息消费者成功接收到消息后,会进⾏消费并⾃动通知消息服务器将该条消息删除。

手动应答机制: 只有在消息消费者将消息处理完,才会通知消息服务器将该条消息删除

消费者发起成功通知

  • DeliveryTag: 消息的唯⼀标识 channel+消息编号
  • 第⼆个参数:是否开启批量处理。false:不开启批量
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

返回失败通知

  • 第⼀个booleantrue 所有消费者都会拒绝这个消息。false代表只有当前消费者拒绝。
  • 第⼆个booleantrue当前消息会进⼊到死信队列。false重新回到原有队列中,默认回到头部。
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

2、pom.xml

# 关闭自动提交
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3、成功与失败处理机制

package com.lydms.demorabbitmq.client;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class SecKillOrderListener {
    @RabbitListener(queues = "cancel_order_queue")
    public void receiveSecKillOrderMessage(Channel channel, Message message) {
        boolean result = true;
        if (result) {
            /**
             * 更新数据库操作成功
             * 消费者发起成功通知
             * DeliveryTag: 消息的唯⼀标识 channel+消息编号
             * 第⼆个参数:是否开启批量处理 false:不开启批量
             */
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else {
            /**
             * 返回失败通知
             * 第⼀个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝
             * 第⼆个boolean true当前消息会进⼊到死信队列,false重新回到原有队列中,默认回到头部
             */
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

四、流量削峰

在秒杀这种⾼并发的场景下,每秒都有可能产⽣⼏万甚⾄⼗⼏万条消息,如果没有对消息处理量进⾏任何限制的话,很有可能因为过多的消息堆积从⽽导致消费者宕机的情况。因此官⽹建议对每⼀个消息消费者都设置处理消息总数(消息抓取总数)。

消息抓取总数的值,设置过⼤或者过⼩都不好,过⼩的话,会导致整个系统消息吞吐能⼒下降,造成性能浪费。过⼤的话,则很有可能导致消息过多,导致整个系统OOM(out of memory)内存溢出。因此官⽹建议每⼀个消费者将该值设置在100-300之间。

@RabbitListener(queues = "cancel_order_queue")
public void receiveSecKillOrderMessage(Channel channel, Message message) {
    // 预抓取总数
    try {
        channel.basicQos(300);
    } catch (IOException e) {
        e.printStackTrace();
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
传感器 监控 物联网
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
360 3
|
8月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
79 1
|
8月前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
250 2
|
20天前
|
消息中间件 存储 监控
说说MQ在你项目中的应用(一)
本文总结了消息队列(MQ)在项目中的应用,主要围绕异步处理、系统解耦和流量削峰三大功能展开。通过分析短信通知和业务日志两个典型场景,介绍了MQ的实现方式及其优势。短信通知中,MQ用于异步发送短信并处理状态更新;业务日志中,Kafka作为高吞吐量的消息系统,负责收集和传输系统及用户行为日志,确保数据的可靠性和高效处理。MQ不仅提高了系统的灵活性和响应速度,还提供了重试机制和状态追踪等功能,保障了业务的稳定运行。
60 6
|
20天前
|
消息中间件 存储 中间件
说说MQ在你项目中的应用(二)商品支付
本文总结了消息队列(MQ)在支付订单业务中的应用,重点分析了RabbitMQ的优势。通过异步处理、系统解耦和流量削峰等功能,RabbitMQ确保了支付流程的高效与稳定。具体场景包括用户下单、支付请求、商品生产和物流配送等环节。相比Kafka,RabbitMQ在低吞吐量、高实时性需求下表现更优,提供了更低延迟和更高的可靠性。
33 0
|
2月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
123 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
8月前
|
网络协议 Go 数据安全/隐私保护
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
453 2
|
7月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
8月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
5月前
|
消息中间件 开发工具
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?