RocketMQ的常规运维实践应用

简介: RocketMQ的常规运维实践应用

RocketMQ的常规运维实践应用


1. 实验环境说明

实验环境

  1. 体验手册。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验操作手册。

  1. 云产品资源。

a.体验环境的阿里云子账号信息(目前RocketMQ系列实验中没有使用)。

b.实验环境使用的云服务器(ECS实例),并挂载弹性IP, 可通过公网在本地访问。弹性IP需要大家记住如何查看到,后续的实验会用到。

  1. 体验报告。

小伟老师希望大家多多填写,我们多多优化,帮助大家快速方便的通过实验了解RocketMQ。

  1. 实验环境。

实验室为您提供一个云服务器ECS实例,操作系统为Alibaba Cloud Linux 2.1903 64位版本。

  1. 实验体验时间。

体验时间一般为一个小时。

  1. 实验环境功能栏。

功能栏一般包括全屏、切换至Web Terminal、FAQ、热门问题、主题色、钉钉交流群二维码和问题反馈等七个功能。

  1. 编写实验报告。

填写实验报告,帮助大家快速通过实验了解RocketMQ。

实验帮助

如果您在使用RocketMQ实验时有需要咨询的问题,可以扫描二维码加入钉钉钉钉群。

启动一个集群

  1. 启动namesrv。观察到启动成功的日志后, ctrl + c,终止日志输出。
cd /usr/local/services/5-rocketmq/namesrv-01
restart.sh
  1. 启动broker

修改broker配置项brokerIP1为实验公网IP,启动broker。观察到启动成功的日志后, ctrl + c,终止日志输出。

操作命令如下:

cd /usr/local/services/5-rocketmq/broker-01
vim ./conf/broker.conf 
restart.sh


2. 如何编译、部署RocketMQ dashboard

如何编译、部署RocketMQ dashboard。

本教程会以如何利用源码编译并打包RocketMQ为例, 演示如何下载、编译任意版本的RocketMQ.

0. 启动一个集群 (已启动则不用操作)

1. 安装git,jdk, maven等工具(当前实验环境已经安装好。参考或者baidu/google)

  • jdk安装
  • maven安装

2. 下载最新release代码(这里以git为例,如果没有安装git直接从github release页面下载)

在自己电脑上, 进入命令行, 选择一个保存源码的目录, 这里我把源码保存到 /tiger/tmp为例

2.1 创建代码保存目录(已创建则不操作)并进入代码保存目录:

mkdir -p /tiger/tmp
cd /tiger/tmp

2.2 克隆master代码

git clone https://github.com/apache/rocketmq-dashboard.git
or 
git clone https://gitee.com/mirrors_apache/rocketmq-dashboard.git

2.3 进入源码根目录:

cd rocketmq-dashboard

可以看到如下代码信息:

3. 编译和打包源码(这里面包含前后端一起打包,前端yarn, node, npm都已经在实验环境准备好了,及时下载大概率会失败)

  • 修改dashboard配置的namesrv地址
vim src/main/resources/application.yml

  • 执行打包命令
mvn clean package -Dmaven.test.skip=true

看到如下结果就是成功了

编译打包成功后, 我们执行:

cd target
ls -l

查看打包结果

  • 运行dashboard
java -Xms400m -Xmx400m -Xmn400m  -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar


3. 如何查看和扩容Topic

扩容Topic,有多种方式:

  1. 在topic所在broker节点上增加queue数量
  2. 在topic所在broker节点外,已存在的broker节点上增加queue数量。
  3. 新添加broker节点,将增加的queue放到新broker节点。

本节主要讲解第一种, 一般的,增加queue数量可以增大生产和消费吞吐量。

0. 启动一个集群和安装dashboard (已完成则不用操作)

创建topic

通过访问实验环境http://公网IP:30904, 可以访问到实验环境部署的dashboard。创建一个topic, 配置如下

查看topic的队列数

这里显示2个队列都在broker-a上。

增加topic队列数

增加队列和创建一样, 可以选择选择新增加的队列分布在哪些集群或者哪些broker上。 这里我们只有一个broker,

保持集群、broker不变。预期增加的队列都在broker-a上。

增加队列成功后,我们再次查看topic状态, 发现队列已经变化。

一般的, 增加队列数,可以增大消费效率。 同一个消费者组中的消费者个数最大为队列数,一般消费能力随着队列数增大而增加。


4. 如何重置消费位点

重置消费位点,是消费者想重新消费部分消息,此时可以通过社区的dashboard直接操作。

前提是消费者已经做好消费幂等。

下面简单总结了重置消费位点的流程:

通过上图可以看出重置消费位点需要broker,client都支持才能进行,下面我们分享下操作过程。

扩容Topic,这里是狭义的扩容, 指增加topic的queue数量,以此来对生产和消费效率产生影响。

0. 启动一个集群和安装dashboard (已完成则不用操作)

查找到需要重置位点的topic

通过访问实验环境http://公网IP:30904, 可以访问到实验环境部署的dashboard。查找到需要重置位点的topic,点击重置消费位点

选择需要重置消费位点的消费者组和位点起始时间。

注意:

2.1 消费者有消费幂等实现,如果没有的话,位点重置后,可能带来数据重复等问题。

2.2 一个topic可能被多个消费者组订阅,需要准确找到你需要重置的那个消费者组名字

2.3 每一个消息有一个位点对应,每一个位点有一个距离其最近的时间。所以这里的时间并不能完全对应位点值,只是一个预估。

观察重置后旧的消息是否被消费者重新消费。这个过程可能需要一点时间, 耐心等待观察结果

一般的,重置位点一般有2种场景可以用到

  1. 业务消费逻辑有bug,修复后需要重新处理之前业务消息
  2. 流计算中,重放数据。


5. 如何动态修改Broker配置

动态修改Broker配置可以动态生效,并且可以帮你快速了解Broker的全部配置项。rocketmq发布的工具包中有单独的命令可以支持。

下面以maxMsgsNumBatch配置项为例,我们修改下这个配置值。

0. 启动一个集群 (已完成则不用操作)

1. 进入一个broker的根目录,可以看到如下目录结构

2. 查看一个broker全部配置值。

bin/mqadmin getBrokerConfig -b 127.0.0.1:10911

-b : 后面跟broker ip:port

查出的结果会类似:

3. 更新maxMsgsNumBatch的值为64

bin/mqadmin updateBrokerConfig -b 127.0.0.1:10911 -k maxMsgsNumBatch -v 64

更新成功后会打印:

4. 再通过第2步的命令查看,值是否被改变了。

一般的, 只要调用过更新broker配置接口, broker都会持久化一份全部配置到broker.conf(配置文件名以启动命令参数为准),我们登录broker可以看到。

以下是我本地的一份配置demo,大家参考

serverSelectorThreads=3
brokerRole=ASYNC_MASTER
serverSocketRcvBufSize=131072
osPageCacheBusyTimeOutMills=1000
shortPollingTimeMills=1000
clientSocketRcvBufSize=131072
clusterTopicEnable=true
brokerTopicEnable=true
autoCreateTopicEnable=true
maxErrorRateOfBloomFilter=20
maxMsgsNumBatch=64
cleanResourceInterval=10000
commercialBaseCount=1
maxTransferCountOnMessageInMemory=32
brokerFastFailureEnable=true
brokerClusterName=DefaultCluster
flushDiskType=ASYNC_FLUSH
commercialBigCount=1
mappedFileSizeConsumeQueue=6000000
consumerFallbehindThreshold=17179869184
autoCreateSubscriptionGroup=true
transientStorePoolEnable=false
flushConsumerOffsetInterval=5000
waitTimeMillsInHeartbeatQueue=31000
diskMaxUsedSpaceRatio=75
flushCommitLogLeastPages=4
cleanFileForciblyEnable=true
slaveReadEnable=false
msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
expectConsumerNumUseFilter=32
traceTopicEnable=false
useEpollNativeSelector=false
enablePropertyFilter=true
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
deleteCommitLogFilesInterval=100
brokerName=broker-a
maxTransferBytesOnMessageInDisk=65536
listenPort=10911
flushConsumeQueueLeastPages=2
pullMessageThreadPoolNums=20
useReentrantLockWhenPutMessage=false
flushIntervalConsumeQueue=1000
sendThreadPoolQueueCapacity=10000
debugLockEnable=false
haHousekeepingInterval=20000
diskFallRecorded=true
messageIndexEnable=true
clientAsyncSemaphoreValue=65535
clientCallbackExecutorThreads=2
putMsgIndexHightWater=600000
sendMessageThreadPoolNums=1
clientManagerThreadPoolQueueCapacity=1000000
serverSocketSndBufSize=131072
maxDelayTime=40
clientSocketSndBufSize=131072
namesrvAddr=localhost:9876
commercialEnable=true
maxHashSlotNum=5000000
heartbeatThreadPoolNums=2
transactionTimeOut=6000
maxMessageSize=4194304
adminBrokerThreadPoolNums=16
defaultQueryMaxNum=32
maxTransferBytesOnMessageInMemory=262144
forceRegister=true
enableConsumeQueueExt=false
longPollingEnable=true
serverWorkerThreads=8
messageIndexSafe=false
deleteConsumeQueueFilesInterval=100
haSlaveFallbehindMax=268435456
serverCallbackExecutorThreads=0
flushCommitLogThoroughInterval=10000
isEnableBatchPush=false
commercialTimerCount=1
enableDLegerCommitLog=false
useTLS=false
redeleteHangedFileInterval=120000
flushIntervalCommitLog=500
rocketmqHome=/opt/rocketmq
queryMessageThreadPoolNums=10
messageStorePlugIn=
serverChannelMaxIdleTimeSeconds=120
maxIndexNum=20000000
filterDataCleanTimeSpan=86400000
filterServerNums=0
commitCommitLogLeastPages=4
waitTimeMillsInPullQueue=5000
haSendHeartbeatInterval=5000
processReplyMessageThreadPoolNums=20
clientChannelMaxIdleTimeSeconds=120
filterSupportRetry=false
flushDelayOffsetInterval=10000
duplicationEnable=false
replyThreadPoolQueueCapacity=10000
offsetCheckInSlave=false
clientCloseSocketIfTimeout=false
transientStorePoolSize=5
waitTimeMillsInSendQueue=200
warmMapedFileEnable=false
endTransactionThreadPoolNums=12
flushCommitLogTimed=false
flushLeastPagesWhenWarmMapedFile=4096
clientWorkerThreads=4
endTransactionPoolQueueCapacity=100000
registerNameServerPeriod=30000
registerBrokerTimeoutMills=6000
accessMessageInMemoryMaxRatio=40
highSpeedMode=false
transactionCheckMax=15
checkCRCOnRecover=true
destroyMapedFileIntervalForcibly=120000
brokerIP2=xxx.xxx.xxx.xxx
brokerIP1=xxx.xxx.xxx.xxx
commitIntervalCommitLog=200
clientOnewaySemaphoreValue=65535
storeReplyMessageEnable=true
traceOn=true
clientManageThreadPoolNums=32
channelNotActiveInterval=60000
mappedFileSizeConsumeQueueExt=50331648
consumerManagerThreadPoolQueueCapacity=1000000
serverOnewaySemaphoreValue=256
haListenPort=10912
enableCalcFilterBitMap=false
clientPooledByteBufAllocatorEnable=false
aclEnable=false
storePathRootDir=/root/store
syncFlushTimeout=5000
rejectTransactionMessage=false
commitCommitLogThoroughInterval=200
connectTimeoutMillis=3000
queryThreadPoolQueueCapacity=20000
regionId=DefaultRegion
consumerManageThreadPoolNums=32
disableConsumeIfConsumerReadSlowly=false
flushConsumerOffsetHistoryInterval=60000
fetchNamesrvAddrByAddressServer=false
haTransferBatchSize=32768
compressedRegister=false
storePathCommitLog=/root/store/commitlog
commercialTransCount=1
transactionCheckInterval=60000
mappedFileSizeCommitLog=1073741824
startAcceptSendRequestTimeStamp=0
serverPooledByteBufAllocatorEnable=true
serverAsyncSemaphoreValue=64
autoDeleteUnusedStats=false
waitTimeMillsInTransactionQueue=3000
heartbeatThreadPoolQueueCapacity=50000
deleteWhen=05
bitMapLengthConsumeQueueExt=112
fastFailIfNoBufferInStorePool=false
defaultTopicQueueNums=8
flushConsumeQueueThoroughInterval=60000
notifyConsumerIdsChangedEnable=true
brokerPermission=6
fileReservedTime=48
transferMsgByHeap=true
pullThreadPoolQueueCapacity=100000
brokerId=0
maxTransferCountOnMessageInDisk=8

实验链接:https://developer.aliyun.com/adc/scenario/14828d336c1547b0ab3f7514ca229ea2


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2天前
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
17 3
|
2天前
|
运维 监控 安全
构建高效运维体系:从监控到自动化的全方位实践
本文深入探讨了构建高效运维体系的关键要素,从监控、日志管理、自动化工具、容器化与微服务架构、持续集成与持续部署(CI/CD)、虚拟化与云计算以及安全与合规等方面进行了全面阐述。通过引入先进的技术和方法,结合实际案例和项目经验,为读者提供了一套完整的运维解决方案,旨在帮助企业提升运维效率,降低运营成本,确保业务稳定运行。
|
2天前
|
存储 运维 网络安全
自动化运维工具:Ansible入门与实践
【9月更文挑战第17天】本文将介绍Ansible的基本概念、安装和简单使用,以及如何编写一个简单的Ansible playbook。通过本文,您可以了解到Ansible的基本原理和使用方法,以及如何在实际工作中应用Ansible进行自动化运维。
|
4天前
|
运维 应用服务中间件 网络安全
自动化运维之路:Ansible在配置管理中的应用
【9月更文挑战第15天】本文深入探讨了自动化运维工具Ansible的基本原理和实际应用,通过实例演示如何利用Ansible进行高效的配置管理和批量部署。文章不仅涵盖了Ansible的安装、配置以及基础使用,还详细介绍了如何编写有效的Playbook来自动化日常任务,并讨论了Ansible的最佳实践和常见问题的解决策略,为读者提供了一套完整的解决方案。
|
6天前
|
运维 应用服务中间件 Linux
自动化运维的利剑——Ansible在配置管理中的应用
【9月更文挑战第13天】 随着IT基础设施的日益复杂,手动进行系统配置和管理变得越来越低效且容易出错。Ansible,一个开源的IT自动化工具,因其简单易用和高效的特性成为运维工程师的新宠。本文将通过浅显易懂的语言和具体案例,带你了解如何利用Ansible简化日常的运维任务,实现快速、一致的配置部署与管理。
|
7天前
|
运维 Cloud Native Devops
云原生架构的崛起与实践云原生架构是一种通过容器化、微服务和DevOps等技术手段,帮助应用系统实现敏捷部署、弹性扩展和高效运维的技术理念。本文将探讨云原生的概念、核心技术以及其在企业中的应用实践,揭示云原生如何成为现代软件开发和运营的主流方式。##
云原生架构是现代IT领域的一场革命,它依托于容器化、微服务和DevOps等核心技术,旨在解决传统架构在应对复杂业务需求时的不足。通过采用云原生方法,企业可以实现敏捷部署、弹性扩展和高效运维,从而大幅提升开发效率和系统可靠性。本文详细阐述了云原生的核心概念、主要技术和实际应用案例,并探讨了企业在实施云原生过程中的挑战与解决方案。无论是正在转型的传统企业,还是寻求创新的互联网企业,云原生都提供了一条实现高效能、高灵活性和高可靠性的技术路径。 ##
18 3
|
6天前
|
运维 应用服务中间件 nginx
自动化运维的利器:Ansible入门与实践
【9月更文挑战第13天】在这个快速发展的IT时代,自动化运维已成为提升效率、减少失误的关键。本文将带你了解Ansible,一个强大的自动化工具,它简化了配置管理、应用部署和任务自动化。通过实际案例,我们将探索Ansible的基本概念、安装步骤、关键组件以及如何编写Playbook来自动化日常任务。无论你是新手还是有经验的运维专家,这篇文章都将为你提供宝贵的见解和技巧,让你在自动化运维的道路上更进一步。
|
5天前
|
存储 运维 监控
构建高效运维体系:从监控到自动化的全方位实践
在当今信息技术飞速发展的时代,运维作为保障信息系统稳定运行的关键环节,其重要性不言而喻。本文将围绕如何构建一个高效的运维体系进行深入探讨,内容涵盖从监控、日志分析到自动化运维工具的选择与应用,以及在实际工作中的经验和案例分享。通过本文的介绍,读者将能够了解到如何在复杂多变的技术环境中,确保系统的高可用性、高性能和安全性,为业务连续性提供坚实保障。
|
6天前
|
运维 监控 数据可视化
高效运维的秘密武器:自动化工具链的构建与实践在当今数字化时代,IT系统的复杂性和规模不断增加,使得传统的手动运维方式难以应对日益增长的业务需求。因此,构建一套高效的自动化工具链成为现代运维的重要任务。本文将深入探讨如何通过自动化工具链提升IT运维效率,确保系统稳定运行,并实现快速响应和故障恢复。
随着企业IT架构的不断扩展和复杂化,传统的手动运维已无法满足业务需求。自动化工具链的构建成为解决这一问题的关键。本文介绍了自动化工具链的核心概念、常用工具及其选择依据,并通过实际案例展示了自动化工具链在提升运维效率、减少人为错误、优化资源配置等方面的显著效果。从监控系统到自动化运维平台,再到持续集成/持续部署(CI/CD)的流程,我们将一步步揭示如何成功实施自动化工具链,助力企业实现高效、稳定、可靠的IT运维管理。
|
11天前
|
存储 弹性计算 运维
自动化监控和响应ECS系统事件
阿里云提供的ECS系统事件用于记录云资源信息,如实例启停、到期通知等。为实现自动化运维,如故障处理与动态调度,可使用云助手插件`ecs-tool-event`。该插件定时获取并转化ECS事件为日志存储,便于监控与响应,无需额外开发,适用于大规模集群管理。详情及示例可见链接文档。

相关产品

  • 云消息队列 MQ