RocketMq消息队列使用

简介: 最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性,目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景比kafka还是有过之无不及,其实kafka文档很丰富但RocketMQ网上的...

最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性,

目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景

比kafka还是有过之无不及,其实kafka文档很丰富

但RocketMQ网上的文章太少,找不到相关的操作教程

于是研究了下源码 做个单机操作的教程,如果你也对此有兴趣不妨共同研究

下载源码的地址 https://github.com/alibaba/RocketMQ/releases

  • 首选通过在java项目里面Maven依赖方式引用RocketMQ Java SDK

    <dependency>
        <groupId>com.alibaba.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>3.2.6</version>
    </dependency>

Downloads

在linux 下用wget 下载源码然后解压出来

在runserver.sh里面可以配置 jvm启动的参数 JAVA_OPT_1="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"  

可以 vi runserver.sh

分别给 mqnamesrv mqbroker play.sh 执行的权限

chmod +x  mqnamersrv 

chmod +x  mqbroker 

chmod +x  play.sh 

下面红线框的这段 命令输入错误了,忽略不用看

通过 nohup sh mqnamesrv& 启动 RocketMq

目前没看到结束的命令,也没找到相关的介绍,

我这里用的 ps -ef|grep rocketmq  查到进程pid

然后kill pid号

或则pkill -9 java [慎用]

用jps -v 查看下java进程的参数

 rocketmq启动后监听 9876端口,这里还是在看源码里面看到的,资料实在是太少了

在防火墙配置里面加上 9876端口,设置iptables对外开放

部署Broker 

nohup sh mqbroker -n "127.0.0.1:9876" -c ../conf/2m-2s-async/broker-a.properties & 

这里ip换成本机的就是单机实例,如果配置主从 这里可以配其他的ip

 Master和Slave的配置文件参考conf目录下的配置文件

 Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数

 一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分

 部署一Master一Slave,集群采用异步复制方式:

 Master: nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a.properties &  

Slave:   nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a-s.properties &  

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package  com.pgsqlmybatis.common.rocketmq; /*
***************************************************************
* 公司名称    :
* 系统名称    :信用管家专业版
* 类 名 称    :Ios渠道idfa统计,推广统计用
* 功能描述    :
* 业务描述    :
* 作 者 名    :@Author Royal
* 开发日期    :2016-05-15
* Created     :IntelliJ IDEA
***************************************************************
* 修改日期    :
* 修 改 者    :
* 修改内容    :
***************************************************************
*/
 
import  com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import  com.alibaba.rocketmq.client.producer.SendResult;
import  com.alibaba.rocketmq.common.message.Message;
 
public  class  Producer {
     public  static  void  main(String[] args) {
         DefaultMQProducer producer =  new  DefaultMQProducer( "Producer" );
         producer.setNamesrvAddr( "xxxxxxxxxx:9876" );
         try  {
             producer.start();
 
             String pushMsg= "kafka activeMq rocketMq 消息队列使用1" ;
             Message msg =  new  Message( "PushTopic" , "push" , "1" ,
                     pushMsg.getBytes( "UTF-8" ));
 
             SendResult result = producer.send(msg);
             System.out.println( "id:"  + result.getMsgId() +
                     " result:"  + result.getSendStatus());
 
             String pushMsg2= "海量级消息记录单机测试2" ;
             msg =  new  Message( "PushTopic" , "push" , "2" ,pushMsg2.getBytes( "UTF-8" ));
 
             result = producer.send(msg);
             System.out.println( "id:"  + result.getMsgId() +
                     " result:"  + result.getSendStatus());
 
             String pushMsg3= "海量级消息记录单机测试3" ;
             msg =  new  Message( "PullTopic" , "pull" , "1" ,pushMsg3.getBytes());
 
             result = producer.send(msg);
             System.out.println( "id:"  + result.getMsgId() +
                     " result:"  + result.getSendStatus());
         catch  (Exception e) {
             e.printStackTrace();
         finally  {
             producer.shutdown();
         }
     }
}

  

启动生成者

 

启动消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package  com.pgsqlmybatis.common.rocketmq; /*
***************************************************************
* 公司名称    :
* 系统名称    :信用管家专业版
* 类 名 称    :Ios渠道idfa统计,推广统计用
* 功能描述    :
* 业务描述    :
* 作 者 名    :@Author Royal
* 开发日期    :2016-05-15
* Created     :IntelliJ IDEA
***************************************************************
* 修改日期    :
* 修 改 者    :
* 修改内容    :
***************************************************************
*/
 
import  java.io.UnsupportedEncodingException;
import  java.util.List;
 
import  com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import  com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import  com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import  com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import  com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import  com.alibaba.rocketmq.common.message.Message;
import  com.alibaba.rocketmq.common.message.MessageExt;
 
public  class  Consumer {
     public  static  void  main(String[] args){
         DefaultMQPushConsumer consumer =
                 new  DefaultMQPushConsumer( "PushConsumer" );
         consumer.setNamesrvAddr( "xxxxxxxxxxxx:9876" );
         try  {
             consumer.subscribe( "PushTopic" "push" );
             /**
              * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
              * 如果非第一次启动,那么按照上次消费的位置继续消费
              */
             consumer.setConsumeFromWhere(
                     ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
             consumer.registerMessageListener(
                     new  MessageListenerConcurrently() {
                         public  ConsumeConcurrentlyStatus consumeMessage(
                                 List<MessageExt> list,
                                 ConsumeConcurrentlyContext Context) {
                             Message msg = list.get( 0 );
                             System.out.println(msg.toString());
                             String recString=  null ;
                             try  {
                                 recString =  new  String(msg.getBody() , "UTF-8" );
                             catch  (UnsupportedEncodingException e) {
                                 e.printStackTrace();
                             }
                             System.out.println(recString);
                             return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                         }
                     }
             );
             consumer.start();
         catch  (Exception e) {
             e.printStackTrace();
         }
     }
}

   

 

以上为单机实例配置

如果你遇到什么问题可以私信我,如果觉得此文对你很有帮助,点下赞推荐下额^_^ 

参考:http://blog.csdn.net/a19881029/article/details/34446629

        http://sofar.blog.51cto.com/353572/1540874

        http://blog.csdn.net/loongshawn/article/details/51086876

        RocketMq最佳实践

       《RocketMQ原理简介》

       分布式开放消息系统(RocketMQ)的原理与实践      

       《RocketMQ用户指南》

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 缓存
分布式中间件核心原理与RocketMQ最佳实践
随着互联网业务的不断扩展和复杂化,分布式系统的需求也越来越迫切。为了满足这一需求,分布式中间件应运而生。在分布式系统中,中间件的角色是协调和管理各个节点之间的通信和数据交换,它起到了桥梁的作用。本文将介绍分布式中间件的核心原理和RocketMQ最佳实践,帮助读者更好地理解和应用分布式中间件。
876 105
|
机器学习/深度学习 PyTorch 算法框架/工具
PyTorch 深度学习实用指南:1~5
PyTorch 深度学习实用指南:1~5
417 0
|
安全 API 开发工具
阿里云如何开通子账号
阿里云如何开通子账号
8393 1
|
数据采集 自然语言处理 安全
控制电脑手机的智能体人人都能造,微软开源OmniParser
微软研究团队推出OmniParser,旨在提升GPT-4V等多模态模型在用户界面操作方面的性能。通过解析用户界面截图为结构化元素,OmniParser显著增强了模型的交互能力,使其在多种基准测试中表现出色。该技术开源,促进了社区合作与技术创新,但同时也面临数据质量、计算资源及安全隐私等挑战。
465 14
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
8378 109
|
9月前
|
人工智能 自然语言处理 安全
Anus:公开整活!完全用 Manus 复刻 Manus 功能的开源 AI 智能体项目
Anus 是一个开源 AI 智能体项目,复刻了 Manus 的部分功能,支持自然语言指令执行、多代理协作、多模态输入处理等功能,旨在为开发者提供强大且灵活的工具。
1113 1
Anus:公开整活!完全用 Manus 复刻 Manus 功能的开源 AI 智能体项目
|
8月前
|
SQL 人工智能 自然语言处理
Text2SQL圣经:从0到1精通Text2Sql(Chat2Sql)的原理,以及Text2Sql开源项目的使用
Text2SQL圣经:从0到1精通Text2Sql(Chat2Sql)的原理,以及Text2Sql开源项目的使用
Text2SQL圣经:从0到1精通Text2Sql(Chat2Sql)的原理,以及Text2Sql开源项目的使用
|
11月前
|
人工智能 关系型数据库 分布式数据库
PolarDB-PG AI最佳实践3 :PolarDB AI多模态相似性搜索最佳实践
本文介绍了如何利用PolarDB结合多模态大模型(如CLIP)实现数据库内的多模态数据分析和查询。通过POLAR_AI插件,可以直接在数据库中调用AI模型服务,无需移动数据或额外的工具,简化了多模态数据的处理流程。具体应用场景包括图像识别与分类、图像到文本检索和基于文本的图像检索。文章详细说明了技术实现、配置建议、实战步骤及多模态检索示例,展示了如何在PolarDB中创建模型、生成embedding并进行相似性检索
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
消息中间件 存储 监控
RocketMQ的性能优势?
【8月更文挑战第29天】RocketMQ的性能优势?
465 2

相关产品

  • 云消息队列 MQ