Rabbitmq怎么用java代码控制对列大小,当对列满了停止生产,当对列小于对列存放的最大值,则继续生产
RabbitMQ有两种对队列长度的限制方式
对队列中消息的条数进行限制 x-max-length
对队列中消息的总量进行限制 x-max-length-bytes
RabbitMQ有两种对队列长度的限制方式
对队列中消息的条数进行限制 x-max-length
对队列中消息的总量进行限制 x-max-length-bytes
对消息总条数进行限制(总条数包括未被消费的消息+被消费但未被确认的消息):
public class QueueLengthLimit {
private static final String QUEUE_NAME = "queueLengthLimit";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel senderChannel = connection.createChannel();
Channel consumerChannel = connection.createChannel();
// 设置队列最大消息数量为5
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 5);
args.put("x-dead-letter-exchange","normal_exchange");
args.put("x-dead-letter-routing-key","normal");
senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);
// 发布6个消息
for (int i = 0; i < 6;) {
String message = "NO. " + ++i;
senderChannel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
}
// 获取的消息为 NO. 2,说明队列头部第一条消息被抛弃
Thread.sleep(1000);
GetResponse resp = consumerChannel.basicGet(QUEUE_NAME, false);
String message = new String(resp.getBody(), "UTF-8");
System.out.printf("consume: %s\n", message);
System.out.printf("queue size: %d\n", resp.getMessageCount());
// 现在队列中有4个 Ready消息,1个 Unacked消息。此时再发布两条消息,应该只有 NO. 3 被抛弃。
senderChannel.basicPublish("", QUEUE_NAME, null, "NO. 7".getBytes("UTF-8"));
senderChannel.basicPublish("", QUEUE_NAME, null, "NO. 8".getBytes("UTF-8"));
Thread.sleep(100);
GetResponse resp2 = consumerChannel.basicGet(QUEUE_NAME, false);
message = new String(resp2.getBody(), "UTF-8");
System.out.printf("consume: %s\n\n", message);
// 现在队列中有4个 Ready消息,2个 Unacked消息。
// 此时Nack,消息2、4取消退回队列头导致队列消息数量超过设定值,谁能留下?
consumerChannel.basicNack(resp2.getEnvelope().getDeliveryTag(), true, true);
Thread.sleep(5000);
System.out.println("======================================");
while (true) {
resp = consumerChannel.basicGet(QUEUE_NAME, true);
if (resp == null) {
break;
} else {
message = new String(resp.getBody(), "UTF-8");
System.out.printf("consume: %s\n", message);
}
}
}
}
GetResponse resp.getMessageCount() 队列中未被消费的消息的数量,其中不包含被消费未确认的消息。
当队列中的消息要超过队列限制时,将失效队首元素,
这是接收死信的队列,可知被失效的消息是NO.1(队首) 试验验证结果真是上面结论。
第二种是对队列中消息总字节数进行限制:
Map args = new HashMap();
args.put("x-max-length-bytes ", 1000);
senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);
只计算消息体的字节数,不算消息投,消息属性等字节数。
RabbitMQ可以设置队列的最大优先级,也可以设置消息的优先级,优先级高的队列中的消息具有更高的被优先消费的权限。
可以通过如下参数:
队列的最大优先级:x-max-priority
消息的优先级:priority
队列优先级设置方式:
也可以通过代码去实现:
Map queueParam = new HashMap<>();
queueParam.put("x-max-priority",10);
channel.queueDeclare("queue_priority",true,false,false,queueParam);
配置了队列优先级之后,会在管理后台界面看到如下Pri的标记:
上面我们设置了队列的最大优先级,之后我们发送消息的时候便可以设置消息自身的优先级别,来调整消息被消费的优先级顺序。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
AMQP.BasicProperties build = builder.build();
channel.basicPublish("exchange_priority","rk_priority",build,("message-"+i).getBytes());
接下来我们看一个实现;
public class PriorityQueue {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("exchange_priority","direct",true);
Map<String,Object> queueParam = new HashMap<>();
queueParam.put("x-max-priority",10);
channel.queueDeclare("queue_priority",true,false,false,queueParam);
channel.queueBind("queue_priority","exchange_priority","rk_priority");
for(int i = 0 ; i < 10 ; i++){
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
if(i % 2 == 0){
builder.priority(5);
}
AMQP.BasicProperties build = builder.build();
channel.basicPublish("exchange_priority","rk_priority",build,("message-"+i).getBytes());
}
channel.close();
connection.close();
}
}
消费者代码:
public class PriorityConsumer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
while(true) {
GetResponse response = channel.basicGet("queue_priority", false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
TimeUnit.MILLISECONDS.sleep(1000);
}
}
}
==》
message-0
message-2
message-4
message-6
message-8
message-1
message-3
message-5
message-7
message-9
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。