Rabbitmq怎么用java代码控制对列大小
RabbitMQ有两种对队列长度的限制方式
对队列中消息的条数进行限制 x-max-length对队列中消息的总量进行限制 x-max-length-bytes对消息总条数进行限制(总条数包括未被消费的消息+被消费但未被确认的消息):
public class QueueLengthLimit {
private static final String QUEUE_NAME = 'queueLengthLimit';
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost('127.0.0.1');
factory.setPort(5672);
factory.setUsername('guest');
factory.setPassword('guest');
Connection connection = factory.newConnection();
Channel senderChannel = connection.createChannel();
Channel consumerChannel = connection.createChannel();
// 设置队列最大消息数量为5
Map args = new HashMap();
args.put('x-max-length', 5);
args.put('x-dead-letter-exchange','normal_exchange');
args.put('x-dead-letter-routing-key','normal');
senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);
// 发布6个消息
for (int i = 0; i
}GetResponse resp.getMessageCount() 队列中未被消费的消息的数量,其中不包含被消费未确认的消息。
当队列中的消息要超过队列限制时,将失效队首元素,
这是接收死信的队列,可知被失效的消息是NO.1(队首) 试验验证结果真是上面结论。
第二种是对队列中消息总字节数进行限制:
Map args = new HashMap();args.put('x-max-length-bytes ', 1000);senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);只计算消息体的字节数,不算消息投,消息属性等字节数。
RabbitMQ可以设置队列的最大优先级,也可以设置消息的优先级,优先级高的队列中的消息具有更高的被优先消费的权限。
可以通过如下参数:
队列的最大优先级:x-max-priority消息的优先级:priority队列优先级设置方式:
也可以通过代码去实现:
Map queueParam = new HashMap<>();queueParam.put('x-max-priority',10);channel.queueDeclare('queue_priority',true,false,false,queueParam);配置了队列优先级之后,会在管理后台界面看到如下Pri的标记:
上面我们设置了队列的最大优先级,之后我们发送消息的时候便可以设置消息自身的优先级别,来调整消息被消费的优先级顺序。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();builder.priority(5);AMQP.BasicProperties build = builder.build();channel.basicPublish('exchange_priority','rk_priority',build,('message-'+i).getBytes());接下来我们看一个实现;
public class PriorityQueue {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost('127.0.0.1');
factory.setPort(5672);
factory.setUsername('guest');
factory.setPassword('guest');
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare('exchange_priority','direct',true);
Map queueParam = new HashMap<>();
queueParam.put('x-max-priority',10);
channel.queueDeclare('queue_priority',true,false,false,queueParam);
channel.queueBind('queue_priority','exchange_priority','rk_priority');
for(int i = 0 ; i
}消费者代码:
public class PriorityConsumer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost('127.0.0.1');
factory.setPort(5672);
factory.setUsername('guest');
factory.setPassword('guest');
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
while(true) {
GetResponse response = channel.basicGet('queue_priority', false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
TimeUnit.MILLISECONDS.sleep(1000);
}
}
}==》
message-0message-2message-4message-6message-8message-1message-3message-5message-7message-9
赞1
踩0