我用ChatGPT,给RabbitMQ加了个连接池

简介: 上次我把 RabbitMQ 集成到项目中,但是每次使用 RabbitMQ 时都去 New 一个连接,导致并发起不来,所以这次我们就给 RabbitMQ 加一个连接池。为了偷个懒,我直接用 ChatGPT 教我加。

上次我把 RabbitMQ 集成到项目中,但是每次使用 RabbitMQ 时都去 New 一个连接,导致并发起不来,所以这次我们就给 RabbitMQ 加一个连接池。

为了偷个懒,我直接用 ChatGPT 教我加。

01 ChatGPT 生成的代码

我输入的关键字如下:

rabbitmq 实现一个连接池 java

下面是 GhatGPT 4.0 给我的回复:

以下是在Java中使用连接池创建RabbitMQ连接的示例:
首先,需要添加pika客户端依赖项,例如:
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>
创建一个RabbitMQ连接类:
import com.rabbitmq.client.Connection;
public class RabbitMQConnection {
    private Connection connection;
    public RabbitMQConnection(String host, int port, String username, String password) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        try {
            connection = factory.newConnection();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
    public Connection getConnection() {
        return connection;
    }
    public void close() {
        try {
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
创建一个RabbitMQ连接池类:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class RabbitMQConnectionPool {
    private final BlockingQueue<RabbitMQConnection> pool;
    public RabbitMQConnectionPool(String host, int port, String username, String password, int poolSize) {
        pool = new LinkedBlockingQueue<>(poolSize);
        for (int i = 0; i < poolSize; i++) {
            pool.add(new RabbitMQConnection(host, port, username, password));
        }
    }
    public RabbitMQConnection getConnection() throws InterruptedException {
        return pool.take();
    }
    public void returnConnection(RabbitMQConnection connection) {
        pool.add(connection);
    }
    public void close() {
        pool.forEach(RabbitMQConnection::close);
    }
}
创建一个消费者类:
import com.rabbitmq.client.*;
public class RabbitMQConsumer implements Runnable {
    private final RabbitMQConnectionPool connectionPool;
    private final String queueName;
    public RabbitMQConsumer(RabbitMQConnectionPool connectionPool, String queueName) {
        this.connectionPool = connectionPool;
        this.queueName = queueName;
    }
    @Override
    public void run() {
        try {
            Connection connection = connectionPool.getConnection().getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, false, false, false, null);
            channel.basicConsume(queueName, true, (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Received message: " + message);
            }, consumerTag -> {});
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }
}
创建一个生产者类:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
public class RabbitMQProducer {
    private final RabbitMQConnectionPool connectionPool;
    private final String queueName;
    public RabbitMQProducer(RabbitMQConnectionPool connectionPool, String queueName) {
        this.connectionPool = connectionPool;
        this.queueName = queueName;
    }
    public void send(String message) {
        try {
            Connection connection = connectionPool.getConnection().getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, false, false, false, null);
            channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("Sent message: " + message);
            channel.close();
            connection.close();
        } catch (InterruptedException|IOException e) {
            e.printStackTrace();
        }
    }
}
创建一个RabbitMQ连接池和多个消费者来消费队列:
public class Main {
    public static void main(String[] args) throws InterruptedException {
        RabbitMQConnectionPool connectionPool = new RabbitMQConnectionPool("localhost", 5672, "guest", "guest", 10);
        RabbitMQProducer producer = new RabbitMQProducer(connectionPool, "hello");
        producer.send("Hello, world!");
        RabbitMQConsumer consumer1 = new RabbitMQConsumer(connectionPool, "hello");
        Thread thread1 = new Thread(consumer1);
        thread1.start();
        RabbitMQConsumer consumer2 = new RabbitMQConsumer(connectionPool, "hello");
        Thread thread2 = new Thread(consumer2);
        thread2.start();
        thread1.join();
        thread2.join();
        connectionPool.close();
    }
}
该示例代码创建了大小为10的连接池,并创建了两个消费者线程来消费队列,并使用一个生产者向队列发送消息。每个消费者从连接池获取连接,并消费队列中的消息,当完成任务后,将连接还回连接池。在实际部署中,您需要根据您的应用程序需求选择合适大小的池,并调整代码以使用正确的参数。

我靠,这代码写的真牛逼,我需要做的事情,就是把里面连接池的部分,集成到我的项目中。

02 给 RabbitMQ 加连接池

2.1 加入连接池

之前我们给 ConnectionFactory 加了个单例工厂,具体可以参考我上一篇文章,由于我们有连接池,所以这个单例工厂就直接废弃了。

/**
 * @author Louzai
 * @date 2023/5/10
 */
public class RabbitmqConnection {
    private Connection connection;
    public RabbitmqConnection(String host, int port, String userName, String password, String virtualhost) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualhost);
        try {
            connection = connectionFactory.newConnection();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
    /**
     * 获取链接
     *
     * @return
     */
    public Connection getConnection() {
        return connection;
    }
    /**
     * 关闭链接
     *
     */
    public void close() {
        try {
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

这个代码就是 ChatGPT 给我写的,我是直接 Copy 过来,然后稍微改动了一下。

public class RabbitmqConnectionPool {
    private static BlockingQueue<RabbitmqConnection> pool;
    public static void initRabbitmqConnectionPool(String host, int port, String userName, String password,
                                             String virtualhost,
                                           Integer poolSize) {
        pool = new LinkedBlockingQueue<>(poolSize);
        for (int i = 0; i < poolSize; i++) {
            pool.add(new RabbitmqConnection(host, port, userName, password, virtualhost));
        }
    }
    public static RabbitmqConnection getConnection() throws InterruptedException {
        return pool.take();
    }
    public static void returnConnection(RabbitmqConnection connection) {
        pool.add(connection);
    }
    public static void close() {
        pool.forEach(RabbitmqConnection::close);
    }
}

2.2 RabbitMQ 发送消费消息

RabbitMQ 发送消息:从连接池拿到连接 -> 创建通道 -> 声明交换机 -> 发送消息 -> 将连接归还连接池。

这里的逻辑基本和之前的一样,只是之前是 New 一个连接,现在是直接从连接池拿到连接,然后最后多了一步归还连接的操作。

@Override
public void publishMsg(String exchange,
                       BuiltinExchangeType exchangeType,
                       String toutingKey,
                       String message) {
    try {
        //创建连接
        RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection();
        Connection connection = rabbitmqConnection.getConnection();
        //创建消息通道
        Channel channel = connection.createChannel();
        // 声明exchange中的消息为可持久化,不自动删除
        channel.exchangeDeclare(exchange, exchangeType, true, false, null);
        // 发布消息
        channel.basicPublish(exchange, toutingKey, null, message.getBytes());
        System.out.println("Publish msg:" + message);
        channel.close();
        RabbitmqConnectionPool.returnConnection(rabbitmqConnection);
    } catch (InterruptedException | IOException | TimeoutException e) {
        e.printStackTrace();
    }
}

RabbitMQ 消费消息:从连接池拿到连接 -> 创建通道 -> 确定消息队列 -> 绑定队列到交换机 -> 接受并消费消息 -> 将连接归还连接池。

同上,这里的逻辑基本和之前的一样,只是多了一个拿连接和归还连接的过程。

@Override
public void consumerMsg(String exchange,
                        String queueName,
                        String routingKey) {
    try {
        //创建连接
        RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection();
        Connection connection = rabbitmqConnection.getConnection();
        //创建消息信道
        final Channel channel = connection.createChannel();
        //消息队列
        channel.queueDeclare(queueName, true, false, false, null);
        //绑定队列到交换机
        channel.queueBind(queueName, exchange, routingKey);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Consumer msg:" + message);
                // 获取Rabbitmq消息,并保存到DB
                // 说明:这里仅作为示例,如果有多种类型的消息,可以根据消息判定,简单的用 if...else 处理,复杂的用工厂 + 策略模式
                notifyService.saveArticleNotify(JsonUtil.toObj(message, UserFootDO.class), NotifyTypeEnum.PRAISE);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 取消自动ack
        channel.basicConsume(queueName, false, consumer);
        RabbitmqConnectionPool.returnConnection(rabbitmqConnection);
    } catch (InterruptedException | IOException e) {
        e.printStackTrace();
    }
}

这个代码,其实 ChatGPT 写的有问题,你再回过头去看 ChatGPT 写的代码,发现连接取出,但是没有归还,那会出现什么问题呢?

这里给大家分析一下,由于我们的连接池用的是 BlockingQueue,连接池大小是 5,如果连接全部取出并都不归还,当第 6 个请求过来后,请求就卡住了,导致界面操作会被阻塞,请求完全没有反应。

不要问我怎么知道,因为我是踩坑过来的。

2.3 代码仓库

为了方便大家学习功能演变的过程,每个模块都会单独开个分支,连接池的分支和仓库如下:

如果需要运行 RabbitMQ,下面的配置需要改成 true,因为代码默认是 false。

3 实际效果

我们是把技术派的“点赞”功能消息,通过 RabbitMQ 方式处理,我们多次点击“点赞”按钮,触发 RammitMQ 消息发送。

可以通过日志,也可以看到发送和消费过的消息。

最后就是大家关心的连接池个数,打开 RabbitMQ 后台,发现永远只有 5 个连接,和我们的连接池大小一致,符合预期。

再看看打开的 Channel,由于每次都关闭,所以也没有了,也符合预期。

这里抛个疑问,每次新开一个 Channel,用完后关闭,是不是也很耗时?是否需要给 Channel 也搞一个连接池呢?可以评论区告诉我哈~~

4 后记

如果用 ChatGPT 3.5,给的结果就不一样,需要加入更多关键字,如果需要达到 GhatGPT 4.0 给的结果,你需要给 ChatGPT 3.5 以下关键字。

rabbitmq 用BlockingQueue实现一个连接池  java

再回到 RabbitMQ 本身,上一篇文章只是告诉大家 RabbitMQ 的基本原理,以及如何集成 RabbitMQ,这篇文章主要教大家加连接池,其实这个示例,里面可以加的东西还很多。

学习嘛,就是边学边玩,后面有空,我想再加点东西,主要是想把 RabbitMQ 的消费方式,由阻塞改成非阻塞方式,可能会在下一篇文章给出,敬请期待!

本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
XML 缓存 前端开发
Thymeleaf一篇就够了
Thymeleaf是Springboot官方支持的模板引擎,有着动静分离等独有特点,通过本文简单学习下吧!
63461 24
Thymeleaf一篇就够了
|
前端开发 网络协议 Dubbo
超详细Netty入门,看这篇就够了!
本文主要讲述Netty框架的一些特性以及重要组件,希望看完之后能对Netty框架有一个比较直观的感受,希望能帮助读者快速入门Netty,减少一些弯路。
93633 33
超详细Netty入门,看这篇就够了!
|
消息中间件 SQL 存储
超详细的RabbitMQ入门,看这篇就够了!
RabbitMQ入门,看这篇就够了
219899 69
|
消息中间件 存储 网络协议
从原理到实战,手把手教你在项目中使用RabbitMQ
RabbitMQ 的文章之前写过,但是当时给的示例是 Demo 版的,这篇文章主要是结合之前写的理论知识,将 RabbitMQ 集成到技术派项目中。 话不多说,上文章目录: 下面我们先回顾一下理论知识,如果对这块知识已经清楚的同学,可以直接跳到实战部分。 1. 消息队列 1.1 消息队列模式 消息队列目前主要 2 种模式,分别为“点对点模式”和“发布/订阅模式”。 点对点模式 一个具体的消息只能由一个消费者消费,多个生产者可以向同一个消息队列发送消息,但是一个消息在被一个消息者处理的时候,这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。 需要额外注意的是,如果消费者
1318 5
|
8月前
|
文字识别 网络协议 开发工具
GitHub封锁?推荐5个国产的Git仓库替代平台
近日,GitHub对中国区IP的部分限制引发了广泛关注。未登录用户被拒,已登录用户功能受限,南北网络环境差异更显“内卷”。为应对这一挑战,本文推荐了多个国产Git平台:Gitee(码云)、GitCode(CSDN旗下)、CODING(腾讯系)、CodeUP(阿里云支持)及微信代码管理工具。这些平台功能全面、稳定性强,是开发者迁移项目的理想选择。通过同步代码、配置CI/CD流水线等简单步骤,可确保项目平稳过渡。此次事件提醒我们,掌握核心技能与支持国产平台同样重要!
6225 11
|
9月前
|
网络安全
Xshell7连接Debian12系统,中文显示乱码,解决办法一览!
在使用Xshell 7连接Debian 12时,中文乱码通常由字符编码或字体设置不当引起。解决方法包括:1) 设置Xshell编码为UTF-8;2) 配置支持中文字体(如Microsoft YaHei);3) 调整Debian 12的Locale配置,确保支持zh_CN.UTF-8;4) 检查SSH服务端配置。完成设置后,重新连接并验证中文显示是否正常。注意字体优先级及系统兼容性,必要时调整环境变量或权限设置。
573 3
|
缓存 监控 安全
Spring AOP 详细深入讲解+代码示例
Spring AOP(Aspect-Oriented Programming)是Spring框架提供的一种面向切面编程的技术。它通过将横切关注点(例如日志记录、事务管理、安全性检查等)从主业务逻辑代码中分离出来,以模块化的方式实现对这些关注点的管理和重用。 在Spring AOP中,切面(Aspect)是一个模块化的关注点,它可以跨越多个对象,例如日志记录、事务管理等。切面通过定义切点(Pointcut)和增强(Advice)来介入目标对象的方法执行过程。 切点是一个表达式,用于匹配目标对象的一组方法,在这些方法执行时切面会被触发。增强则定义了切面在目标对象方法执行前、执行后或抛出异常时所
17370 4
|
Java 数据库连接 Maven
mybatis使用一:springboot整合mybatis、mybatis generator,使用逆向工程生成java代码。
这篇文章介绍了如何在Spring Boot项目中整合MyBatis和MyBatis Generator,使用逆向工程来自动生成Java代码,包括实体类、Mapper文件和Example文件,以提高开发效率。
632 2
mybatis使用一:springboot整合mybatis、mybatis generator,使用逆向工程生成java代码。
|
机器学习/深度学习 数据采集 算法
大数据中缺失值处理使用算法处理
【10月更文挑战第21天】
803 3