/** * 通配符模式;消费者接收消息 */ public class Consumer2 { public static void main(String[] args) throws Exception { //1. 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); //2. 创建频道; Channel channel = connection.createChannel(); //3. 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key为:" + envelope.getRoutingKey()); //交换机 System.out.println("交换机为:" + envelope.getExchange()); //消息id System.out.println("消息id为:" + envelope.getDeliveryTag()); //接收到的消息 System.out.println("消费者1 --- 接收到的消息为:" + new String(body, "utf-8")); } }; //6. 监听队列 /** * 参数1:队列名 * 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除; * 如果设置为false则需要手动确认 * 参数3:消费者 */ channel.basicConsume(Producer.TOPIC_QUEUE_2, true, defaultConsumer); } }
④. 测试:
在执行完成测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击topix_exchange的交换机,可以查看到如下的绑定:
⑤. Work模式 - 轮询模式(Round-Robin)
- ①. 特点:该模式接收消息是当有多个消费者接入时,消息的分配模式是一个消费者分配一条,直至消息消费完成(轮询的方式)
- ②. 生产者代码展示:
/** * 轮询模式:发送消息 */ public class Producer { static final String QUEUE_NAME = "work_queue_round-robin"; public static void main(String[] args) throws Exception { //2. 创建连接; Connection connection = ConnectionUtil.getConnection(); //3. 创建频道; Channel channel = connection.createChannel(); //4. 声明队列; /** * 参数1:队列名称 * 参数2:是否定义持久化队列(消息会持久化保存在服务器上) * 参数3:是否独占本连接 * 参数4:是否在不使用的时候队列自动删除 * 参数5:其它参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); for(int i=0;i<10;i++) { //5. 发送消息; String message = "你好!小兔纸work-----" + i; /** * 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机) * 参数2:路由key,简单模式中可以使用队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("已发送消息:" + message); } //6. 关闭资源 channel.close(); connection.close(); } }