RocketMQ中使用Java客户端发送消息和消费的应用

简介: 本教程将总结使用java客户端消息发送和消费各种场景, 并Demo演示

RocketMQ中使用Java客户端发送消息和消费的应用


1. 实验环境说明

实验环境

  1. 体验手册。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验操作手册。

  1. 云产品资源。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验环境使用的云服务器(ECS实例),并挂载弹性IP, 可通过公网在本地访问。弹性IP需要大家记住如何查看到,后续的实验会用到。

  1. 体验报告。

小伟老师希望大家多多填写,我们多多优化,帮助大家快速方便的通过实验了解RocketMQ。

  1. 实验环境。

实验室为您提供一个云服务器ECS实例,操作系统为Alibaba Cloud Linux 2.1903 64位版本。

  1. 实验体验时间。

体验时间一般为一个小时。

  1. 实验环境功能栏。

功能栏一般包括全屏、切换至Web Terminal、FAQ、热门问题、主题色、钉钉交流群二维码和问题反馈等七个功能。

  1. 编写实验报告。

填写实验报告,帮助大家快速通过实验了解RocketMQ。

实验帮助

如果您在使用RocketMQ实验时有需要咨询的问题,可以扫描二维码加入钉钉钉钉群。


2. 启动RocketMQ集群

本步骤将带您启动RocketMQ集群。

说明:当前实验环境已经为您下载、编译RocketMQ源码,您只需要启动RocketMQ集群即可。

  1. 执行如下命令,进入namesrv目录,并启动namesrv。
cd /usr/local/services/5-rocketmq/namesrv-01
./restart.sh

返回结果如下,当您观察到启动成功的日志后, 按Ctrl+C键,终止日志输出。

  1. 启动broker。

2.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

2.2执行如下命令,修改broker配置项。

vim ./conf/broker.conf

2.3按i键,进入编辑模式,将brokerIP1参数改为实验室云服务器ECS的弹性IP。修改完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

说明:您可在云产品资源列表中查看到实验室云服务器ECS的弹性IP。

2.4执行如下命令,启动broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入dashboard目录,并启动dashboard。
cd /usr/local/services/7-rocketmq-datashboard
./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 验证集群启动情况。

在您的本机浏览器中,打开新页签,访问http://实验室云服务器ECS的弹性IP:30904#/cluster

返回如下页面,您可以查看到集群节点信息,表示集群已正常启动。


3. 如何发送和消费并发消息

并发消息,也叫普通消息,是相对顺序消息而言的,普通消息的效率最高。

本步骤将指导您如何使用纯java client发送和消费消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 执行如下命令,进入/data/demos目录,并下载全部java代码Demo。

说明:后续步骤也将使用java代码Demo,只需要您下载一次即可。

cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ConcurrentMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可看到生产者可以并发的向topic中发送消息, 消费端不区分顺序的消息。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

并发消息,意思是生产者可以并发的向topic中发送消息, 消费端不区分顺序的消息,这种模式效率最好。生产者Demo代码如下:最后留一个思考题给大家: 生产者实例和消费者实例, 都是线程安全的吗?


4. 如何发送和消费顺序消息

顺序消息分为分区有序和全局有序。生产消费代码都是一样的, 区别在于分区有序的topic中queue个数可以是任意有效值,全局有序的topic要求queue的个数为1。顺序消息的实现非常简单易懂,但牺牲了可用性,单节点故障会直接影响顺序消息。

什么是分区有序消息,什么场景应该使用呢,又该如何发送分区有序消息?

分区有序消息表示在一个queue中的消息是有序的,发送消息时设置设置了相同key的消息会被发送到同一个queue中。

本步骤将指导您如何使用纯java client发送和消费顺序消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.OrderMessageDemo1" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 消费输出时,请您注意看相同queueId的消息输出内容中的数字,是按照从小到大的。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

  • 生产者说明

生产者会根据设置的keys做hash,相同hash值的消息会发送到相同的queue中。所以相同hash值的消息需要保证在同一个线程中顺序的发送。

  • 消费者说明

消费者使用相对比较简单, 消息监听类实现org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly接口即可。相同queue的消息需要串行处理,这样救保证消费的顺序性


5. 如何发送和消费延迟消息

延迟消息,对于一些特殊场景比如订票后30分钟不支付自动取消等类似场景比较有用。

本步骤将指导您如何使用纯java client发送和消费延迟消息。当前环境已经安装了一个1 Namesrv+1 Broker的集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.DelayMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 目前RocketMQ支持多种延迟级别, 不过每种延迟级别都是基于RocketMQ自身,实际延迟时间会加上Broker-Client端的网络情况不同而略有差异。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

  • 生产者说明

生产者在发送消息的时候需要设置延迟级别,RocketMQ支持多种延迟级别。如果把延迟时间算作一个以空格分割的数组,延迟级别就是延迟时间数组的下标index+1。更多知识,详情请参考RocketMQ如何解析延迟级别和延迟时间映射关系。

  • 消费者说明: 消费者按照并发消息消费即可。


6. 如何发送和消费事务消息

事务消息,是RocketMQ解决分布式事务的一种实现,极其简单好用。

一个事物消息大致的生命周期如下图。

概括为如下几个重要点:

  1. 生产者发送half消息(事物消息)
  2. Broker存储half消息
  3. 生产者处理本地事物,处理成功后commit事物
  4. 消费者消费到事物消息

本步骤将指导您如何使用纯java client发送和消费事物消息。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.TransactionMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

在事物消息中,消费代码和普通消息的消费一样,主要代码在生产者端。

生产者端的主要代码包含3个步骤:

6.1初始化生产者,设置回调线程池、设置本地事物处理监听类。

这里注意事物消息的生产者类是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是普通生产者类。

6.2事物监听类需要实现2个方法,这里的逻辑都是mock的,实际使用的时候需要根据实际修改。

6.3发送事物消息。调用sendMessageInTransaction()方法发送事物消息, 而不是以前的send()方法。


7. 生产者消费者如何同步发送、消费消息(Request-Reply)

request-reply模式,可以满足目前类似RPC同步调用的场景。

本步骤将指导您如何使用该模式。当前环境已经安装了一个1 Namesrv+1 Broker的RocketMQ集群。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行Demo代码。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.RequestReplyMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,通过代码结果和代码比较, 我们得知request-reply类似RPC同步调用的效果。按Ctrl+C键终止日志输出。

建议:需要同步调用就用RPC, 不要走RocketMQ,毕竟两者是完全不同的目标的产品,专业的事情交给专业的产品。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

request-reply模式,在生产者和消费者两端都和一般的生产消费有区别,下面分别介绍下demo代码。

生产者demo主要代码, 主要区别在于调用request(),而不是send()方法。

消费者demo主要代码: 消费代码主要增加了“回复”逻辑。回复是利用消息发送直接向生产者发送一条消息。 有点类似事物消息中broker回查生产者。

一个小问题:事物消息和request-reply消息时,生产者的生产者组名有什么要求嘛?


8. 如何有选择性的消费消息

有时候我们只想消费部分消息,当然全部消费,在代码中过滤。 假如消息海量时,会有很多资源浪费,比如浪费不必要的带宽。我们可以通过tag,sql92表达式来选择性的消费.

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 修改broker配置,支持消息和属性过滤。

3.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

3.2执行如下命令,编辑配置文件broker.conf。

vim conf/broker.conf

3.3按i键,进入编辑模式,在文件尾行添加两个如下内容的broker配置项添加完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

#是否支持重试消息也过滤
filterSupportRetry=true
#支持属性过滤
enablePropertyFilter=true

添加完成后的文件内容如下。

3.4执行如下命令,重启broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行tag过滤代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876 tag" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键,终止日志输出。

  1. 执行如下命令,执行sql过滤代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876 sql" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。按Ctrl+C键,终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

以下分别介绍生产者和消费者主要demo代码。

  • 生产者

在生产tag消息的时候, 消息中需要加上发送tag;sql92过滤的时候,加上自定义k-v。

  • 消费者

tag过滤消费时,在订阅topic时, 也添加上tag订阅

SQL过滤时,添加上SQL过滤订阅。至于SQL除了等号,还是支持什么,大家可以自行自行查看或者到群里问。


9. 如何使用ACL客户端生产消费消息

ACL,全称是Access Control List,是RocketMQ设计来做访问和权限控制的。更多内容,详情请参见github wiki:https://github.com/apache/rocketmq/wiki/RIP-5-RocketMQ-ACL

本步骤将指导您如何使用ACL客户端生产消费消息。

  1. 启动一个RocketMQ集群。

说明:已启动一个RocketMQ集群,本操作请您忽略。

  1. 进入/data/demos目录,并下载全部java代码Demo。

说明:已下载java代码Demo,本操作请您忽略。

  1. 修改broker配置。

3.1执行如下命令,进入broker目录。

cd /usr/local/services/5-rocketmq/broker-01

3.2执行如下命令,编辑配置文件broker.conf。

vim conf/broker.conf

3.3按i键,进入编辑模式,在文件尾行添加一个如下内容的broker配置项添加完成后,按ECS键输入退出编辑模式,输入:wq,按回车键保存。

aclEnable=true

添加完成后的文件内容如下。

3.4执行如下命令,重启broker。

./restart.sh

返回结果如下,当您观察到启动成功的日志后,按Ctrl+C键,终止日志输出。

  1. 执行如下命令,进入Demo代码目录。
cd /data/demos/06-all-java-demos/
  1. 执行如下命令,打包Demo代码。
mvn clean package
  1. 执行如下命令,执行代码Demo。
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ACLDemo" -Dexec.classpathScope=runtime

返回结果如下,您可以看到正常生产和消费输出。 Demo代码使用的admin权限发送和消费,实际使用需要对于每个topic,消费者组授权,才能正常生产消费。按Ctrl+C键,终止日志输出。

  1. Demo代码说明。

如果您想查看全部代码,您可以查看实验本地或者访问Demo代码

带ACL的生产者和消费者在初始化的时候,都必须给一个hook实例,构建方法如下:

static RPCHook getAclRPCHook(String accessKey, String secretKey) {
      return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}

在broker端secret key用来校验信息的完整性, access key用来校验用户权限。二者缺一不可。

实验链接:https://developer.aliyun.com/adc/scenario/fb1b72ee956a4068a95228066c3a40d6

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
人工智能 安全 Java
Java和Python在企业中的应用情况
Java和Python在企业中的应用情况
70 7
|
2月前
|
JSON Java Apache
非常实用的Http应用框架,杜绝Java Http 接口对接繁琐编程
UniHttp 是一个声明式的 HTTP 接口对接框架,帮助开发者快速对接第三方 HTTP 接口。通过 @HttpApi 注解定义接口,使用 @GetHttpInterface 和 @PostHttpInterface 等注解配置请求方法和参数。支持自定义代理逻辑、全局请求参数、错误处理和连接池配置,提高代码的内聚性和可读性。
187 3
|
19天前
|
消息中间件 存储 监控
说说MQ在你项目中的应用(一)
本文总结了消息队列(MQ)在项目中的应用,主要围绕异步处理、系统解耦和流量削峰三大功能展开。通过分析短信通知和业务日志两个典型场景,介绍了MQ的实现方式及其优势。短信通知中,MQ用于异步发送短信并处理状态更新;业务日志中,Kafka作为高吞吐量的消息系统,负责收集和传输系统及用户行为日志,确保数据的可靠性和高效处理。MQ不仅提高了系统的灵活性和响应速度,还提供了重试机制和状态追踪等功能,保障了业务的稳定运行。
59 6
|
16天前
|
安全 算法 Java
Java CAS原理和应用场景大揭秘:你掌握了吗?
CAS(Compare and Swap)是一种乐观锁机制,通过硬件指令实现原子操作,确保多线程环境下对共享变量的安全访问。它避免了传统互斥锁的性能开销和线程阻塞问题。CAS操作包含三个步骤:获取期望值、比较当前值与期望值是否相等、若相等则更新为新值。CAS广泛应用于高并发场景,如数据库事务、分布式锁、无锁数据结构等,但需注意ABA问题。Java中常用`java.util.concurrent.atomic`包下的类支持CAS操作。
46 2
|
2月前
|
人工智能 前端开发 Java
基于开源框架Spring AI Alibaba快速构建Java应用
本文旨在帮助开发者快速掌握并应用 Spring AI Alibaba,提升基于 Java 的大模型应用开发效率和安全性。
252 12
基于开源框架Spring AI Alibaba快速构建Java应用
|
2月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
180 6
|
1月前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
40 2
|
19天前
|
消息中间件 存储 中间件
说说MQ在你项目中的应用(二)商品支付
本文总结了消息队列(MQ)在支付订单业务中的应用,重点分析了RabbitMQ的优势。通过异步处理、系统解耦和流量削峰等功能,RabbitMQ确保了支付流程的高效与稳定。具体场景包括用户下单、支付请求、商品生产和物流配送等环节。相比Kafka,RabbitMQ在低吞吐量、高实时性需求下表现更优,提供了更低延迟和更高的可靠性。
31 0
|
2月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
71 6
|
2月前
|
关系型数据库 MySQL Java
MySQL索引优化与Java应用实践
【11月更文挑战第25天】在大数据量和高并发的业务场景下,MySQL数据库的索引优化是提升查询性能的关键。本文将深入探讨MySQL索引的多种类型、优化策略及其在Java应用中的实践,通过历史背景、业务场景、底层原理的介绍,并结合Java示例代码,帮助Java架构师更好地理解并应用这些技术。
61 2

相关产品

  • 云消息队列 MQ