在开发环境下,基于Springboot的RocketMQ示例(含安装步骤、错误分析)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 在看这文章之前建议先看看先前架构原理介绍文章

在看这文章之前建议先看看先前架构原理介绍文章:



RocketMQ服务器启动


linux环境


  1. 下载编译源码


# 下载$ 
  > wget wget http://mirror.bit.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-source- > 
  # 解压$
  >unzip rocketmq-all-4.7.0-source-release.zip
  > cd rocketmq-all-4.7.0/
  # 编译$
  > mvn -Prelease-all -DskipTests clean install -U
  > cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0
复制代码


  1. 启动 Name Server


# 启动 Name Server 服务
 > nohup sh bin/mqnamesrv &
 # 启动完成后,查看日志$
 > tail -f ~/logs/rocketmqlogs/namesrv.log
  The Name Server boot success...
复制代码


  1. 启动 Brokerconf 目录下,RocketMQ 提供了多种 Broker 的配置文件:


  • broker.conf :单主,异步刷盘。
  • 2m/ :双主,异步刷盘。
  • 2m-2s-async/ :两主两从,异步复制,异步刷盘。
  • 2m-2s-sync/ :两主两从,同步复制,异步刷盘。
  • dledger/ :Dledger 集群,至少三节点


# 启动 Broker服务
 > nohup sh bin/mqbroker -n localhost:9876 &
 # 启动完成后,查看日志$
 > tail -f ~/logs/rocketmqlogs/broker.log 
  The broker[%s, 172.30.30.233:10911] boot success...
复制代码


其中,参数:


  • 通过 -c 参数,配置读取的主 Broker 配置
  • 通过 -n 参数,设置 RocketMQ Namesrv 地址


  1. 关闭服务器


> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664
复制代码


windows环境


  1. 首先去官网下载编译之后的版本,然后解压到本地目录


官网链接:rocketmq.apache.org/dowloading/…


下载目标:Binary: [rocketmq-all-4.7.0-bin-release.zip

  1. 配置ROCKETMQ_HOME到系统环境变量中,启动脚本将读取ROCKETMQ_HOME变量


  1. 分别进入bin目录下 启动如下脚本(需要设置内存参数,防止内存过大,启动失败,具体看<常出现的错误>小节):


3.1 启动namesrv


运行命令:


mqnamesrv.cmd
复制代码


log : The Name Server boot success. serializeType=JSON


3.2 启动brokerserver


运行命令:


mqbroker.cmd -n localhost:9876
复制代码


log : The broker[IQSZ-L01898, 10.111.45.111:10911] boot success. serializeType=JSON and name server is localhost:9876

  1. 关闭服务器


mqshutdown.cmd  broker

log:killing name server

mqshutdown.cmd  namesrv

log:killing broker


RocketMQ发送消息和消费消息


RocketMQ发送消息和消费消息,先启动消费者,然后再启动生产者


添加依赖


<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>
复制代码


发送消息


发送消息--同步


public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
            DefaultMQProducer("test-group");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}
复制代码


发送消息--异步


public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("test—group");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 100; i++) {
                final int index = i;
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
复制代码


发送消息--单向


public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}
复制代码


消费消息


public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //Launch the consumer instance.
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
复制代码


常出现的错误


安装中出现的错误


防止内存设置过大


修改runbroker.cmd配置文件

set "JAVA_OPT=%JAVA_OPT% -server -Xms500m -Xmx500m -Xmn500m"

set "JAVA_OPT=%JAVA_OPT% -XX:MaxDirectMemorySize=1g"


31.png


修改runserver.cmd配置文件

set "JAVA_OPT=%JAVA_OPT% -server -Xms500m -Xmx500m -Xmn500m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"


32.png


启动NAMESERVER报错


unrecognized vm option 'MetasoaceSize=128m'


解决方法:更换jdk版本为1.8即可


启动BROKER报错


错误: 找不到或无法加载主类 xxxxxx’


解决方法:打开runbroker.cmd(windows),然后将‘%CLASSPATH%’加上英文双引


使用过程中出现的错误


No route info of this topic


  1. Broker禁止自动创建Topic,且用户没有通过手工方式创建Topic


查看是否允许自动创建topic


命令:mqbroker.cmd -n localhost:9876 -p


mq开启自动创建topic参数


命令:mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true


  1. Broker 没有正确连接到 Name Server


查看broker.log日志


位置: /安装目录/conf/logback_broker.xml中日志位置


日志信息:broker.log


日志信息:namesrv.log


  1. Producer 没有正确连接到 Name Server


linux环境:查询防火墙是否通


错误分析方法


日志分析法:


  1. 查看broker日志


  • 关注broker是否有注册到nameserver


register broker to name server localhost:9876 OK


  • 关注生产者是否连接到broker


new producer connected, group: test_group channel: ClientChannelInfo ...
复制代码


  • 查看已经创建的topic是否包含自己想要的topic


2020-04-21 15:58:22 INFO main - load exist local topic, TopicConfig [topicName=test_group, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false] ...


  • 查看消费者是否连接到broker


new consumer connected, group: test_group CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo ...

  1. 查看nameserver日志


  • 关注broker是否注册到nameserver


new broker registered, localhost:10911


  • 查看topic消息


2020-04-21 17:03:32 INFO RemotingExecutorThread_1 - new topic registered, test_topic QueueData [brokerName=broker-a, readQueueNums=8, writeQueueNums=8, perm=6, topicSynFlag=0]

...


各位看官还可以吗?喜欢的话,动动手指点个💗,点个关注呗!!谢谢支持!



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
23天前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
90 2
|
1月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
24天前
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
42 1
|
1月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
232 2
|
1月前
|
消息中间件 Java Maven
|
2月前
|
Java Spring 容器
Spring Boot 启动源码解析结合Spring Bean生命周期分析
Spring Boot 启动源码解析结合Spring Bean生命周期分析
80 11
|
1月前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
120 0
RocketMQ—一次连接namesvr失败的案例分析
|
2月前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
55 1
|
2月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
144 1