⑥. Work模式 - 公平分发(Fair-Dispatch)
- ①. 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配
- ②. 生产者代码展示
/** * 轮询模式:公平的方式 */ public class Producer { static final String QUEUE_NAME = "work_queue_fair"; public static void main(String[] args) throws Exception { //2. 创建连接; Connection connection = ConnectionUtil.getConnection(); //3. 创建频道; Channel channel = connection.createChannel(); //4. 声明队列; /** * 参数1:队列名称 * 参数2:是否定义持久化队列(消息会持久化保存在服务器上) * 参数3:是否独占本连接 * 参数4:是否在不使用的时候队列自动删除 * 参数5:其它参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); for(int i=0;i<10;i++) { //5. 发送消息; String message = "你好!小兔纸work-----" + i; /** * 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机) * 参数2:路由key,简单模式中可以使用队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("已发送消息:" + message); } //6. 关闭资源 channel.close(); connection.close(); } }
③. 消费者代码
public class Consumer1 { public static void main(String[] args) throws Exception { //1. 创建连接工厂; //2. 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); //3. 创建频道; final Channel channel = connection.createChannel(); channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); System.out.println("fair的方式consumer1开始消费"); //每次可以预期多少个消息 channel.basicQos(1); //4. 创建消费者(接收消息并处理消息); //fair的方式一定要将应答方式改成手动应答 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //接收到的消息 System.out.println("消费者1----接收到的消息为:" + new String(body, "utf-8")); try { Thread.sleep(200); //确认消失 /* * 参数1:消息id * 参数2:false表示只有当前这条被处理 * */ channel.basicAck(envelope.getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } } }; //6. 监听队列 /** * 参数1:队列名 * 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除; * 如果设置为false则需要手动确认 * 参数3:消费者 */ channel.basicConsume(Producer.QUEUE_NAME, false, defaultConsumer); } }