开发者社区> 问答> 正文

rocketmq:当同一消费组的消费方式被修改后,消费组将按什么方式消费。

已解决

rocketmq:当同一消费组group1, 被第一个程序置为:BROADCASTING 消费。被第二个程序置为:CLUSTERING消费,再被第三个程序置为BROADCASTING 消费。此时三个程序是否都是按指定消费方式消费

展开
收起
sun丶baobao 2018-05-22 15:34:20 4281 0
2 条回答
写回答
取消 提交回答
  • 采纳回答

    一、Consumer 批量消费

    可以通过

    [java] view plain copy
    consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条
    这里需要分为2种情况1、Consumer端先启动 2、Consumer端后启动. 正常情况下:应该是Consumer需要先启动
    1、Consumer端先启动

    Consumer代码如下

    [java] view plain copy
    package quickstart;

    import java.util.List;

    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;

    /**

    • Consumer,订阅消息
      */

    public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
        consumer.setConsumeMessageBatchMaxSize(10);  
        /** 
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> 
         * 如果非第一次启动,那么按照上次消费的位置继续消费 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
    
        consumer.subscribe("TopicTest", "*");  
    
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
    
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
                  
                try {  
                    System.out.println("msgs的长度" + msgs.size());  
                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);  
                } catch (Exception e) {  
                    e.printStackTrace();  
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
                }  
                  
                  
                  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            }  
        });  
    
        consumer.start();  
    
        System.out.println("Consumer Started.");  
    }  

    }

    由于这里是Consumer先启动,所以他会去轮询MQ上是否有订阅队列的消息,由于每次producer插入一条,Consumer就拿一条所以测试结果如下(每次size都是1):

    2、Consumer端后启动,也就是Producer先启动

    由于这里是Consumer后启动,所以MQ上也就堆积了一堆数据,Consumer的

    consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条

    所以这段代码就生效了测试结果如下(每次size最多是10):

    二、消息重试机制:消息重试分为2种1、Producer端重试 2、Consumer端重试

    1、Producer端重试

    也就是Producer往MQ上发消息没有发送成功,我们可以设置发送失败重试的次数

    [java] view plain copy
    package quickstart;

    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;

    /**

    • Producer,发送消息
    • */

    public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {  
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");  
        producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
        producer.setRetryTimesWhenSendFailed(10);//失败的 情况发送10次  
        producer.start();  
    
        for (int i = 0; i < 1000; i++) {  
            try {  
                Message msg = new Message("TopicTest",// topic  
                        "TagA",// tag  
                        ("Hello RocketMQ " + i).getBytes()// body  
                );  
                SendResult sendResult = producer.send(msg);  
                System.out.println(sendResult);  
            } catch (Exception e) {  
                e.printStackTrace();  
                Thread.sleep(1000);  
            }  
        }  
    
        producer.shutdown();  
    }  

    }

    2、Consumer端重试

    2.1、exception的情况,一般重复16次 10s、30s、1分钟、2分钟、3分钟等等

    上面的代码中消费异常的情况返回

    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试

    正常则返回:

    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功

    [java] view plain copy
    package quickstart;

    import java.util.List;

    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;

    /**

    • Consumer,订阅消息
      */

    public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
        consumer.setConsumeMessageBatchMaxSize(10);  
        /** 
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> 
         * 如果非第一次启动,那么按照上次消费的位置继续消费 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
    
        consumer.subscribe("TopicTest", "*");  
    
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
    
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
    
                try {  
                    // System.out.println("msgs的长度" + msgs.size());  
                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);  
                    for (MessageExt msg : msgs) {  
                        String msgbody = new String(msg.getBody(), "utf-8");  
                        if (msgbody.equals("Hello RocketMQ 4")) {  
                            System.out.println("======错误=======");  
                            int a = 1 / 0;  
                        }  
                    }  
    
                } catch (Exception e) {  
                    e.printStackTrace();  
                    if(msgs.get(0).getReconsumeTimes()==3){  
                        //记录日志  
                          
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
                    }else{  
                          
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试  
                    }  
                }  
    
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
            }  
        });  
    
        consumer.start();  
    
        System.out.println("Consumer Started.");  
    }  

    }
    打印结果:

    假如超过了多少次之后我们可以让他不再重试记录 日志。

    if(msgs.get(0).getReconsumeTimes()==3){
    //记录日志
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
    }

    2.2超时的情况,这种情况MQ会无限制的发送给消费端。

    就是由于网络的情况,MQ发送数据之后,Consumer端并没有收到导致超时。也就是消费端没有给我返回return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;这样的就认为没有到达Consumer端。

    这里模拟Producer只发送一条数据。consumer端暂停1分钟并且不发送接收状态给MQ

    [java] view plain copy
    package model;

    import java.util.List;

    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;

    /**

    • Consumer,订阅消息
      */

    public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
        consumer.setConsumeMessageBatchMaxSize(10);  
        /** 
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> 
         * 如果非第一次启动,那么按照上次消费的位置继续消费 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
    
        consumer.subscribe("TopicTest", "*");  
    
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
    
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
    
                try {  
    
                    // 表示业务处理时间  
                    System.out.println("=========开始暂停===============");  
                    Thread.sleep(60000);  
    
                    for (MessageExt msg : msgs) {  
                        System.out.println(" Receive New Messages: " + msg);  
                    }  
    
                } catch (Exception e) {  
                    e.printStackTrace();  
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试  
                }  
    
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
            }  
        });  
    
        consumer.start();  
    
        System.out.println("Consumer Started.");  
    }  

    }

    三、消费模式

    广播消费:rocketMQ默认是集群消费,我们可以通过在Consumer来支持广播消费

    consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费

    [java] view plain copy
    package model;

    import java.util.List;

    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

    /**

    • Consumer,订阅消息
      */

    public class Consumer2 {

    public static void main(String[] args) throws InterruptedException, MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
        consumer.setConsumeMessageBatchMaxSize(10);  
        consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费  
      
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
    
        consumer.subscribe("TopicTest", "*");  
    
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
    
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
    
                try {  
    
                    for (MessageExt msg : msgs) {  
                        System.out.println(" Receive New Messages: " + msg);  
                    }  
    
                } catch (Exception e) {  
                    e.printStackTrace();  
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试  
                }  
    
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
            }  
        });  
    
        consumer.start();  
    
        System.out.println("Consumer Started.");  
    }  

    }
    如果我们有2台节点(非主关系),2个节点物理上是分开的,Producer往MQ上写入20条数据 其中broker1中拉取了12条 。broker2中拉取了8 条,这种情况下,假如broker1宕机,那么我们消费数据的时候,只能消费到broker2中的8条,broker1中的12条已经持久化到中。需要broker1回复之后这12条数据才能继续被消费。

    异步复制和同步双写主要是主和从的关系。消息需要实时消费的,就需要采用主从模式部署

    异步复制:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就算从producer端发送成功了,然后通过异步复制的方法将数据复制到从节点

    同步双写:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就并不算从producer端发送成功了,需要通过同步双写的方法将数据同步到从节点后, 才算数据发送成功。

    四、刷盘方式

    同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。

    异步刷盘:,同步刷盘是指数据到达内存之后,返回producer说数据已经发送成功。,然后再写入commitlog日志。

    commitlog:

    commitlog就是来存储所有的元信息,包含消息体,类似于Mysql、Oracle的redolog,所以主要有CommitLog在,Consume Queue即使数据丢失,仍然可以恢复出来。

    consumequeue:记录数据的位置,以便Consume快速通过consumequeue找到commitlog中的数据

    2019-07-17 22:27:01
    赞同 展开评论 打赏
  • 不能,这样会错乱的。出现订阅关系不一致的情况

    2020-03-12 19:41:52
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
基于RocketMQ Connect 构建全新数据流转处理平 立即下载
万亿级数据洪峰下的消息引擎——Apache RocketMQ 立即下载
万亿级数据洪峰下的消息引擎-Apache RocketMQ 立即下载