RocketMQ快速入门
RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,在阿里内部,RocketMQ承接了例如“双11”等高并发场景的消息流转,能够处理万亿级别的消息。
2.1 准备工作
2.1.1 下载RocketMQ
RocketMQ最新版本:4.5.1
下载地址
2.2.2 环境要求
- Linux64位系统
- JDK1.8(64位)
- 源码安装需要安装Maven 3.2.x
2.2 安装RocketMQ
2.2.1 安装步骤
本教程以二进制包方式安装
- 解压安装包
- 进入安装目录
2.2.2 目录介绍
- bin:启动脚本,包括shell脚本和CMD脚本
- conf:实例配置文件 ,包括broker配置文件、logback配置文件等
- lib:依赖jar包,包括Netty、commons-lang、FastJSON等
2.3 启动RocketMQ
- 启动NameServer
# 1.启动NameServernohup sh bin/mqnamesrv rocketmq-nameserver2:9876在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
Broker 对外服务的监听端口 同一台机器的端口号要保证不一样
listenPort=10911
删除文件时间点,默认凌晨 4点
deleteWhen=04
文件保留时间,默认 48 小时
fileReservedTime=120
commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
存储路径
storePathRootDir=/usr/local/rocketmq/store
commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
限制的消息大小
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
Broker 的角色
- ASYNC_MASTER 异步复制Master
- SYNC_MASTER 同步双写Master
- SLAVE
brokerRole=SYNC_MASTER
刷盘方式
- ASYNC_FLUSH 异步刷盘
- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
checkTransactionMessageEnable=false
发消息线程池数量
sendMessageThreadPoolNums=128
拉消息线程池数量
pullMessageThreadPoolNums=128
```
2)slave2
服务器:192.168.25.135
cmd vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties
修改配置如下:
```properties
所属集群名字
brokerClusterName=rocketmq-cluster
broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
0 表示 Master,>0 表示 Slave
brokerId=1
nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
Broker 对外服务的监听端口
listenPort=11011 同一台机器的端口号要保证不一样
删除文件时间点,默认凌晨 4点
deleteWhen=04
文件保留时间,默认 48 小时
fileReservedTime=120
commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
存储路径
storePathRootDir=/usr/local/rocketmq/store
commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
限制的消息大小
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
Broker 的角色
- ASYNC_MASTER 异步复制Master
- SYNC_MASTER 同步双写Master
- SLAVE
brokerRole=SLAVE
刷盘方式
- ASYNC_FLUSH 异步刷盘
- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
checkTransactionMessageEnable=false
发消息线程池数量
sendMessageThreadPoolNums=128
拉消息线程池数量
pullMessageThreadPoolNums=128
```
3)master2
服务器:192.168.25.138
properties vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b.properties
修改配置如下:
```properties
所属集群名字
brokerClusterName=rocketmq-cluster
broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
0 表示 Master,>0 表示 Slave
brokerId=0
nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
Broker 对外服务的监听端口 同一台机器的端口号要保证不一样
listenPort=10911
删除文件时间点,默认凌晨 4点
deleteWhen=04
文件保留时间,默认 48 小时
fileReservedTime=120
commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
存储路径
storePathRootDir=/usr/local/rocketmq/store
commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
限制的消息大小
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
Broker 的角色
- ASYNC_MASTER 异步复制Master
- SYNC_MASTER 同步双写Master
- SLAVE
brokerRole=SYNC_MASTER
刷盘方式
- ASYNC_FLUSH 异步刷盘
- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
checkTransactionMessageEnable=false
发消息线程池数量
sendMessageThreadPoolNums=128
拉消息线程池数量
pullMessageThreadPoolNums=128
```
4)slave1
服务器:192.168.25.138
cmd vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties
修改配置如下:
```properties
所属集群名字
brokerClusterName=rocketmq-cluster
broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
0 表示 Master,>0 表示 Slave
brokerId=1
nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
Broker 对外服务的监听端口 同一台机器的端口号要保证不一样
listenPort=11011
删除文件时间点,默认凌晨 4点
deleteWhen=04
文件保留时间,默认 48 小时
fileReservedTime=120
commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
存储路径
storePathRootDir=/usr/local/rocketmq/store
commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
限制的消息大小
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
Broker 的角色
- ASYNC_MASTER 异步复制Master
- SYNC_MASTER 同步双写Master
- SLAVE
brokerRole=SLAVE
刷盘方式
- ASYNC_FLUSH 异步刷盘
- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
checkTransactionMessageEnable=false
发消息线程池数量
sendMessageThreadPoolNums=128
拉消息线程池数量
pullMessageThreadPoolNums=128
```
配置详解
brokerRole分为两种
SYNC_MASTER:如果是同步模式,master和slave之间的数据同步要求较为严格,保证尽量不丢消息,性能会有损耗
ASYNC_MASTER:如果是异步模式,master和slave之间的数据同步要求较为宽松,极端情况下可能会丢消息,但是性能较好
flushDiskType也分两种:
SYNC_FLUSH:同步刷盘模式,当消息来了之后,尽可能快地从内存持久化到磁盘上,保证尽量不丢消息,性能会有损耗
ASYNC_FLUSH:异步刷盘模式,消息到了内存之后,不急于马上落盘,极端情况可能会丢消息,但是性能较好。
配置优化 之前我搭建的broker的配置中,配置的是同步复制和同步刷盘:
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
然后我修改为:
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
修改完之后,依次重启broker(注意时间重启多个broker之间要保留一定的间隔时间)
1)异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入
2)同步刷盘方式:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态
此处配置的是主同步刷盘方式,保证主节点写入磁盘成功后,才向从节点写入,从节点使用的是异步写盘方式,从节点可能没有入盘,只是写入内存就返回,当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入
3.3.9 修改启动脚本文件
1)runbroker.sh
sh vi /usr/local/rocketmq/bin/runbroker.sh
需要根据内存大小进行适当的对JVM参数进行调整:
```cmd
===================================================
开发环境配置 JVM Configuration
JAVAOPT="${JAVAOPT} -server -Xms256m -Xmx256m -Xmn128m" ```
2)runserver.sh
sh vim /usr/local/rocketmq/bin/runserver.sh
cmd JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
3.3.10 服务启动
1)启动NameServe集群
分别在192.168.25.135和192.168.25.138启动NameServer
cmd cd /usr/local/rocketmq/bin nohup sh mqnamesrv &
2)启动Broker集群
- 在192.168.25.135上启动master1和slave2
master1:
cmd cd /usr/local/rocketmq/bin nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a.properties &
slave2:
cmd cd /usr/local/rocketmq/bin nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
- 在192.168.25.138上启动master2和slave1
master2
cmd cd /usr/local/rocketmq/bin nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties &
slave1
cmd cd /usr/local/rocketmq/bin nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
3.3.11 查看进程状态
启动后通过JPS查看启动进程
3.3.12 查看日志
查看nameServer日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log
3.4 mqadmin管理工具
3.4.1 使用方式
进入RocketMQ安装位置,在bin目录下执行./mqadmin {command} {args}
3.4.2 命令介绍
1)Topic相关
名称 含义 命令选项 说明 updateTopic 创建更新Topic配置 -b Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port -c cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询) -h- 打印帮助 -n NameServer服务地址,格式 ip:port -p 指定新topic的读写权限( W=2|R=4|WR=6 ) -r 可读队列数(默认为 8) -w 可写队列数(默认为 8) -t topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) deleteTopic 删除Topic -c cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询) -h 打印帮助 -n NameServer 服务地址,格式 ip:port -t topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) topicList 查看 Topic 列表信息 -h 打印帮助 -c 不配置-c只返回topic列表,增加-c返回clusterName, topic, consumerGroup信息,即topic的所属集群和订阅关系,没有参数 -n NameServer 服务地址,格式 ip:port topicRoute 查看 Topic 路由信息 -t topic 名称 -h 打印帮助 -n NameServer 服务地址,格式 ip:port topicStatus 查看 Topic 消息队列offset -t topic 名称 -h 打印帮助 -n NameServer 服务地址,格式 ip:port topicClusterList 查看 Topic 所在集群列表 -t topic 名称 -h 打印帮助 -n NameServer 服务地址,格式 ip:port updateTopicPerm 更新 Topic 读写权限 -t topic 名称 -h 打印帮助 -n NameServer 服务地址,格式 ip:port -b Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port -p 指定新 topic 的读写权限( W=2|R=4|WR=6 ) -c cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b优先,如果没有-b,则对集群中所有Broker执行命令 updateOrderConf 从NameServer上创建、删除、获取特定命名空间的kv配置,目前还未启用 -h 打印帮助 -n NameServer 服务地址,格式 ip:port -t topic,键 -v orderConf,值 -m method,可选get、put、delete allocateMQ 以平均负载算法计算消费者列表负载消息队列的负载结果 -t topic 名称 -h 打印帮助 -n NameServer 服务地址,格式 ip:port -i ipList,用逗号分隔,计算这些ip去负载Topic的消息队列 statsAll 打印Topic订阅关系、TPS、积累量、24h读写总量等信息 -h 打印帮助 -n NameServer 服务地址,格式 ip:port -a 是否只打印活跃topic -t 指定topic
2)集群相关
名称 含义 命令选项 说明 clusterList 查看集群信息,集群、BrokerName、BrokerId、TPS等信息 -m 打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday) -h 打印帮助 -n NameServer 服务地址,格式 ip:port -i 打印间隔,单位秒 clusterRT 发送消息检测集群各Broker RT。消息发往${BrokerName} Topic。 -a amount,每次探测的总数,RT = 总时间 / amount -s 消息大小,单位B -c 探测哪个集群 -p 是否打印格式化日志,以|分割,默认不打印 -h 打印帮助 -m 所属机房,打印使用 -i 发送间隔,单位秒 -n NameServer 服务地址,格式 ip:port
3)Broker相关
名称 含义 命令选项 说明 updateBrokerConfig 更新 Broker 配置文件,会修改Broker.conf -b Broker 地址,格式为ip:port -c cluster 名称 -k key 值 -v value 值 -h 打印帮助 -n NameServer 服务地址,格式 ip:port brokerStatus 查看 Broker 统计信息、运行状态(你想要的信息几乎都在里面) -b Broker 地址,地址为ip:port -h 打印帮助 -n NameServer 服务地址,格式 ip:port brokerConsumeStats Broker中各个消费者的消费情况,按Message Queue维度返回Consume Offset,Broker Offset,Diff,TImestamp等信息 -b Broker 地址,地址为ip:port -t 请求超时时间 -l diff阈值,超过阈值才打印 -o 是否为顺序topic,一般为false -h 打印帮助 -n NameServer 服务地址,格式 ip:port getBrokerConfig 获取Broker配置 -b Broker 地址,地址为ip:port -n NameServer 服务地址,格式 ip:port wipeWritePerm 从NameServer上清除 Broker写权限 -b Broker 地址,地址为ip:port -n NameServer 服务地址,格式 ip:port -h 打印帮助 cleanExpiredCQ 清理Broker上过期的Consume Queue,如果手动减少对列数可能产生过期队列 -n NameServer 服务地址,格式 ip:port -h 打印帮助 -b Broker 地址,地址为ip:port -c 集群名称 cleanUnusedTopic 清理Broker上不使用的Topic,从内存中释放Topic的Consume Queue,如果手动删除Topic会产生不使用的Topic -n NameServer 服务地址,格式 ip:port -h 打印帮助 -b Broker 地址,地址为ip:port -c 集群名称 sendMsgStatus 向Broker发消息,返回发送状态和RT -n NameServer 服务地址,格式 ip:port -h 打印帮助 -b BrokerName,注意不同于Broker地址 -s 消息大小,单位B -c 发送次数
4)消息相关
名称 含义 命令选项 说明 queryMsgById 根据offsetMsgId查询msg,如果使用开源控制台,应使用offsetMsgId,此命令还有其他参数,具体作用请阅读QueryMsgByIdSubCommand。 -i msgId -h 打印帮助 -n NameServer 服务地址,格式 ip:port queryMsgByKey 根据消息 Key 查询消息 -k msgKey -t Topic 名称 -h 打印帮助 -n NameServer 服务地址,格式 ip:port queryMsgByOffset 根据 Offset 查询消息 -b Broker 名称,(这里需要注意 填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到) -i query 队列 id -o offset 值 -t topic 名称 -h 打印帮助 -n NameServer 服务地址,格式 ip:port queryMsgByUniqueKey 根据msgId查询,msgId不同于offsetMsgId,区别详见常见运维问题。-g,-d配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果 -h 打印帮助 -n NameServer 服务地址,格式 ip:port -i uniqe msg id -g consumerGroup -d clientId -t topic名称 checkMsgSendRT 检测向topic发消息的RT,功能类似clusterRT -h 打印帮助 -n NameServer 服务地址,格式 ip:port -t topic名称 -a 探测次数 -s 消息大小 sendMessage 发送一条消息,可以根据配置发往特定Message Queue,或普通发送。 -h 打印帮助 -n NameServer 服务地址,格式 ip:port -t topic名称 -p body,消息体 -k keys -c tags -b BrokerName -i queueId consumeMessage 消费消息。可以根据offset、开始&结束时间戳、消息队列消费消息,配置不同执行不同消费逻辑,详见ConsumeMessageCommand。 -h 打印帮助 -n NameServer 服务地址,格式 ip:port -t topic名称 -b BrokerName -o 从offset开始消费 -i queueId -g 消费者分组 -s 开始时间戳,格式详见-h -d 结束时间戳 -c 消费多少条消息 printMsg 从Broker消费消息并打印,可选时间段 -h 打印帮助 -n NameServer 服务地址,格式 ip:port -t topic名称 -c 字符集,例如UTF-8 -s subExpress,过滤表达式 -b 开始时间戳,格式参见-h -e 结束时间戳 -d 是否打印消息体 printMsgByQueue 类似printMsg,但指定Message Queue -h 打印帮助 -n NameServer 服务地址,格式 ip:port -t topic名称 -i queueId -a BrokerName -c 字符集,例如UTF-8 -s subExpress,过滤表达式 -b 开始时间戳,格式参见-h -e 结束时间戳 -p 是否打印消息 -d 是否打印消息体 -f 是否统计tag数量并打印 resetOffsetByTime 按时间戳重置offset,Broker和consumer都会重置 -h 打印帮助 -n NameServer 服务地址,格式 ip:port -g 消费者分组 -t topic名称 -s 重置为此时间戳对应的offset -f 是否强制重置,如果false,只支持回溯offset,如果true,不管时间戳对应offset与consumeOffset关系 -c 是否重置c++客户端offset
5)消费者、消费组相关
名称 含义 命令选项 说明 consumerProgress 查看订阅组消费状态,可以查看具体的client IP的消息积累量 -g 消费者所属组名 -s 是否打印client IP -h 打印帮助 -n NameServer 服务地址,格式 ip:port consumerStatus 查看消费者状态,包括同一个分组中是否都是相同的订阅,分析Process Queue是否堆积,返回消费者jstack结果,内容较多,使用者参见ConsumerStatusSubCommand -h 打印帮助 -n NameServer 服务地址,格式 ip:port -g consumer group -i clientId -s 是否执行jstack getConsumerStatus 获取 Consumer 消费进度 -g 消费者所属组名 -t 查询主题 -i Consumer 客户端 ip -n NameServer 服务地址,格式 ip:port -h 打印帮助 updateSubGroup 更新或创建订阅关系 -n NameServer 服务地址,格式 ip:port -h 打印帮助 -b Broker地址 -c 集群名称 -g 消费者分组名称 -s 分组是否允许消费 -m 是否从最小offset开始消费 -d 是否是广播模式 -q 重试队列数量 -r 最大重试次数 -i 当slaveReadEnable开启时有效,且还未达到从slave消费时建议从哪个BrokerId消费,可以配置备机id,主动从备机消费 -w 如果Broker建议从slave消费,配置决定从哪个slave消费,配置BrokerId,例如1 -a 当消费者数量变化时是否通知其他消费者负载均衡 deleteSubGroup 从Broker删除订阅关系 -n NameServer 服务地址,格式 ip:port -h 打印帮助 -b Broker地址 -c 集群名称 -g 消费者分组名称 cloneGroupOffset 在目标群组中使用源群组的offset -n NameServer 服务地址,格式 ip:port -h 打印帮助 -s 源消费者组 -d 目标消费者组 -t topic名称 -o 暂未使用
6)连接相关
名称 含义 命令选项 说明 consumerConnec tion 查询 Consumer 的网络连接 -g 消费者所属组名 -n NameServer 服务地址,格式 ip:port -h 打印帮助 producerConnec tion 查询 Producer 的网络连接 -g 生产者所属组名 -t 主题名称 -n NameServer 服务地址,格式 ip:port -h 打印帮助
7)NameServer相关
名称 含义 命令选项 说明 updateKvConfig 更新NameServer的kv配置,目前还未使用 -s 命名空间 -k key -v value -n NameServer 服务地址,格式 ip:port -h 打印帮助 deleteKvConfig 删除NameServer的kv配置 -s 命名空间 -k key -n NameServer 服务地址,格式 ip:port -h 打印帮助 getNamesrvConfig 获取NameServer配置 -n NameServer 服务地址,格式 ip:port -h 打印帮助 updateNamesrvConfig 修改NameServer配置 -n NameServer 服务地址,格式 ip:port -h 打印帮助 -k key -v value
8)其他
名称 含义 命令选项 说明 startMonitoring 开启监控进程,监控消息误删、重试队列消息数等 -n NameServer 服务地址,格式 ip:port -h 打印帮助
3.4.3 注意事项
- 几乎所有命令都需要配置-n表示NameServer地址,格式为ip:port
- 几乎所有命令都可以通过-h获取帮助
- 如果既有Broker地址(-b)配置项又有clusterName(-c)配置项,则优先以Broker地址执行命令;如果不配置Broker地址,则对集群中所有主机执行命令
3.5 集群监控平台搭建
3.5.1 概述
RocketMQ
有一个对其扩展的开源项目incubator-rocketmq-externals,这个项目中有一个子模块叫rocketmq-console
,这个便是管理控制台项目了,先将incubator-rocketmq-externals拉到本地,因为我们需要自己对rocketmq-console
进行编译打包运行。
3.5.2 下载并编译打包
sh git clone https://github.com/apache/rocketmq-externals cd rocketmq-console mvn clean package -Dmaven.test.skip=true
注意:打包前在rocketmq-console
中配置namesrv
集群地址:
sh rocketmq.config.namesrvAddr=192.168.25.135:9876;192.168.25.138:9876
启动rocketmq-console:
sh java -jar rocketmq-console-ng-1.0.0.jar
启动成功后,我们就可以通过浏览器访问http://localhost:8080
进入控制台界面了,如下图:
集群状态: