基本使用姿势
公共代码封装
封装工厂类:
public class RabbitUtil { public static ConnectionFactory getConnectionFactory() { //创建连接工程,下面给出的是默认的case ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); return factory; } }
封装生成者:
public class MsgProducer { public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); //创建连接 Connection connection = factory.newConnection(); //创建消息通道 Channel channel = connection.createChannel(); // 声明exchange中的消息为可持久化,不自动删除 channel.exchangeDeclare(exchange, exchangeType, true, false, null); // 发布消息 channel.basicPublish(exchange, toutingKey, null, message.getBytes()); System.out.println("Sent '" + message + "'"); channel.close(); connection.close(); } }
封装消费者:
public class MsgConsumer { public static void consumerMsg(String exchange, String queue, String routingKey) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); //创建连接 Connection connection = factory.newConnection(); //创建消息信道 final Channel channel = connection.createChannel(); //消息队列 channel.queueDeclare(queue, true, false, false, null); //绑定队列到交换机 channel.queueBind(queue, exchange, routingKey); System.out.println("[*] Waiting for message. To exist press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { System.out.println(" [x] Received '" + message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 取消自动ack channel.basicConsume(queue, false, consumer); } }
Direct方式
Direct示例
生产者:
public class DirectConsumer { private static final String exchangeName = "direct.exchange"; public void msgConsumer(String queueName, String routingKey) { try { MsgConsumer.consumerMsg(exchangeName, queueName, routingKey); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { DirectConsumer consumer = new DirectConsumer(); String[] routingKey = new String[]{"aaa", "bbb", "ccc"}; String[] queueNames = new String[]{"qa", "qb", "qc"}; for (int i = 0; i < 3; i++) { consumer.msgConsumer(queueNames[i], routingKey[i]); } Thread.sleep(1000 * 60 * 100); } }
执行生产者,往消息队列中放入10条消息,其中key分别为“aaa”、“bbb”和“ccc”,分别放入qa、qb、qc三个队列:
下面是qa队列的信息:
消费者:
public class DirectProducer { private static final String EXCHANGE_NAME = "direct.exchange"; public void publishMsg(String routingKey, String msg) { try { MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { DirectProducer directProducer = new DirectProducer(); String[] routingKey = new String[]{"aaa", "bbb", "ccc"}; String msg = "hello >>> "; for (int i = 0; i < 10; i++) { directProducer.publishMsg(routingKey[i % 3], msg + i); } System.out.println("----over-------"); Thread.sleep(1000 * 60 * 100); } }
执行后的输出:
[*] Waiting for message. To exist press CTRL+C [x] Received 'hello >>> 0 [x] Done [x] Received 'hello >>> 3 [x] Done [x] Received 'hello >>> 6 [x] Done [x] Received 'hello >>> 9 [x] Done [*] Waiting for message. To exist press CTRL+C [x] Received 'hello >>> 1 [x] Done [x] Received 'hello >>> 4 [x] Done [x] Received 'hello >>> 7 [x] Done [*] Waiting for message. To exist press CTRL+C [x] Received 'hello >>> 2 [x] Done [x] Received 'hello >>> 5 [x] Done [x] Received 'hello >>> 8 [x] Done
可以看到,分别从qa、qb、qc中将不同的key的数据消费掉。
问题探讨
有个疑问:这个队列的名称qa、qb和qc是RabbitMQ自动生成的么,我们可以指定队列名称么?
我做了个简单的实验,我把消费者代码修改了一下:
public static void main(String[] args) throws InterruptedException { DirectConsumer consumer = new DirectConsumer(); String[] routingKey = new String[]{"aaa", "bbb", "ccc"}; String[] queueNames = new String[]{"qa", "qb", "qc1"}; // 将qc修改为qc1 for (int i = 0; i < 3; i++) { consumer.msgConsumer(queueNames[i], routingKey[i]); } Thread.sleep(1000 * 60 * 100); }
执行后如下图所示:
我们可以发现,多了一个qc1,所以可以判断这个界面中的queues,是消费者执行时,会将消费者指定的队列名称和direct.exchange绑定,绑定的依据就是key。
当我们把队列中的数据全部消费掉,然后重新执行生成者后,会发现qc和qc1中都有3条待消费的数据,因为绑定的key都是“ccc”,所以两者的数据是一样的:
绑定关系如下:
注意:当没有Queue绑定到Exchange时,往Exchange中写入的消息也不会重新分发到之后绑定的queue上。
思考:不执行消费者,看不到这个Queres中信息,我其实可以把这个界面理解为消费者信息界面。不过感觉还是怪怪的,这个queues如果是消费者信息,就不应该叫queues,我理解queues应该是RabbitMQ中实际存放数据的queues,难道是我理解错了?
Fanout方式(指定队列)
生产者封装:
public class FanoutProducer { private static final String EXCHANGE_NAME = "fanout.exchange"; public void publishMsg(String routingKey, String msg) { try { MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { FanoutProducer directProducer = new FanoutProducer(); String msg = "hello >>> "; for (int i = 0; i < 10; i++) { directProducer.publishMsg("", msg + i); } } }
消费者:
public class FanoutConsumer { private static final String EXCHANGE_NAME = "fanout.exchange"; public void msgConsumer(String queueName, String routingKey) { try { MsgConsumer.consumerMsg(EXCHANGE_NAME, queueName, routingKey); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public static void main(String[] args) { FanoutConsumer consumer = new FanoutConsumer(); String[] queueNames = new String[]{"qa-2", "qb-2", "qc-2"}; for (int i = 0; i < 3; i++) { consumer.msgConsumer(queueNames[i], ""); } } }
执行生成者,结果如下:
我们发现,生产者生产的10条数据,在每个消费者中都可以消费,这个是和Direct不同的地方,但是使用Fanout方式时,有几个点需要注意一下:
- 生产者的routkey可以为空,因为生产者的所有数据,会下放到每一个队列,所以不会通过routkey去路由;
- 消费者需要指定queues,因为消费者需要绑定到指定的queues才能消费。
这幅图就画出了Fanout的精髓之处,exchange会和所有的queue进行绑定,不区分路由,消费者需要绑定指定的queue才能发起消费。
注意:往队列塞数据时,可能通过界面看不到消息个数的增加,可能是你之前已经开启了消费进程,导致增加的消息马上被消费了。
Fanout方式(随机获取队列)
上面我们是指定了队列,这个方式其实很不友好,比如对于Fanout,我其实根本无需关心队列的名字,如果还指定对应队列进行消费,感觉这个很冗余,所以我们这里就采用随机获取队列名字的方式,下面代码直接Copy官网。
生成者封装:
public static void publishMsgV2(String exchange, BuiltinExchangeType exchangeType, String message) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); //创建连接 Connection connection = factory.newConnection(); //创建消息通道 Channel channel = connection.createChannel(); // 声明exchange中的消息 channel.exchangeDeclare(exchange, exchangeType); // 发布消息 channel.basicPublish(exchange, "", null, message.getBytes("UTF-8")); System.out.println("Sent '" + message + "'"); channel.close(); connection.close(); }
消费者封装:
public static void consumerMsgV2(String exchange) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(exchange, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchange, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }
生产者:
public class FanoutProducer { private static final String EXCHANGE_NAME = "fanout.exchange.v2"; public void publishMsg(String msg) { try { MsgProducer.publishMsgV2(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, msg); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { FanoutProducer directProducer = new FanoutProducer(); String msg = "hello >>> "; for (int i = 0; i < 10000; i++) { directProducer.publishMsg(msg + i); } } }
消费者:
public class FanoutConsumer { private static final String EXCHANGE_NAME = "fanout.exchange.v2"; public void msgConsumer() { try { MsgConsumer.consumerMsgV2(EXCHANGE_NAME); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public static void main(String[] args) { FanoutConsumer consumer = new FanoutConsumer(); for (int i = 0; i < 3; i++) { consumer.msgConsumer(); } } }
执行后,管理界面如下:
Topic方式
代码详见官网:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
更多方式,请直接查看官网:https://www.rabbitmq.com/getstarted.html
RabbitMQ进阶
durable和autoDeleted
在定义Queue时,可以指定这两个参数:
/** * Declare an exchange. * @see com.rabbitmq.client.AMQP.Exchange.Declare * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk * @param exchange the name of the exchange * @param type the exchange type * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) * @param autoDelete true if the server should delete the exchange when it is no longer in use * @param arguments other properties (construction arguments) for the exchange * @return a declaration-confirm method to indicate the exchange was successfully declared * @throws java.io.IOException if an error is encountered */ Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; /** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
durable
持久化,保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化。
若是将queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,会重新读取之前被持久化的queue。
虽然队列可以被持久化,但是里面的消息是否为持久化,还要看消息的持久化设置。即重启queue,但是queue里面还没有发出去的消息,那队列里面还存在该消息么?这个取决于该消息的设置。
autoDeleted
自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
当一个Queue被设置为自动删除时,当消费者断掉之后,queue会被删除,这个主要针对的是一些不是特别重要的数据,不希望出现消息积累的情况。
小节
- 当一个Queue已经声明好了之后,不能更新durable或者autoDelted值;当需要修改时,需要先删除再重新声明
- 消费的Queue声明应该和投递的Queue声明的 durable,autoDelted属性一致,否则会报错
- 对于重要的数据,一般设置 durable=true, autoDeleted=false
- 对于设置 autoDeleted=true 的队列,当没有消费者之后,队列会自动被删除
ACK
执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。
但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
因此手动ACK的常见手段:
// 接收消息之后,主动ack/nak Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { System.out.println(" [ " + queue + " ] Received '" + message); channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(envelope.getDeliveryTag(), false, true); } } }; // 取消自动ack channel.basicConsume(queue, false, consumer);