RocketMQ【RocketMQ(安装与配置、管理命令、整合springboot、架构)】(二)-全面详解(学习总结—从入门到深化)

图片[1] - RocketMQ【RocketMQ(安装与配置、管理命令、整合springboot、架构)】(二)-全面详解(学习总结—从入门到深化) - MaxSSL

作者简介:大家好,我是小童,Java开发工程师,CSDN博客博主,Java领域新星创作者
系列专栏:前端、Java、Java中间件大全、微信小程序、微信支付、若依框架、Spring全家桶
如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步
如果感觉博主的文章还不错的话,请三连支持一下博主哦
博主正在努力完成2023计划中:以梦为马,扬帆起航,2023追梦人

目录

RocketMQ安装与配置

RocketMQ管理命令

RocketMQ整合springboot

RocketMQ架构


RocketMQ安装与配置

图片[2] - RocketMQ【RocketMQ(安装与配置、管理命令、整合springboot、架构)】(二)-全面详解(学习总结—从入门到深化) - MaxSSL

环境搭建与测试

RocketMQ最新版本:5.0

下载地址:https://rocketmq.apache.org/zh/release-notes/

图片[3] - RocketMQ【RocketMQ(安装与配置、管理命令、整合springboot、架构)】(二)-全面详解(学习总结—从入门到深化) - MaxSSL

1、普通安装

修改环境变量

export JAVA_HOME=/usr/local/jdk-11.0.11/export PATH=$PATH:$JAVA_HOME/binexport ROCKETMQ_HOME=/usr/local/rocketmq-5.0.0export PATH=$PATH:$ROCKETMQ_HOME/bin

解压文件

 unzip rocketmq-all-5.0.0-bin-release.zip

启动NameServer

nohup sh mqnamesrv &

启动broker

nohup sh mqbroker -n localhost:9876

RocketMQ管理命令

mqadmin help 命令名称

1、启动namesrv和broker

./mqnamesrv #启动nameserver./mqbroker -n localhost:9876 -c/opt/alibaba-rocketmq/conf/broker.conf  #启动broker

2、查看日志

tail -f ~/logs/rocketmqlogs/namesrv.log  #查看日志tail -f ~/logs/rocketmqlogs/broker.log  #查看日志

3、新增topic

mqadmin updateTopic -n localhost:9876 -cDefaultCluster -t topicWarning

4、查看某个topic的状态

mqadmin topicStatus -n localhost:9876 -ttopicWarning

5、查看所有消费组group

mqadmin consumerProgress -n localhost:9876

6、查看所有topic

mqadmin topicList -n localhost:9876

7、删除topic

mqadmin deleteTopic -n localhost:9876 -cDefaultCluster -t topicWarning

8、关闭namesrv和broker服务

mqshutdown namesrvmqshutdown broker

RocketMQ整合springboot

创建springboot-rocketmq-producer工程

1、在pom.xml文件中添加依赖

                       org.apache.rocketmq            rocketmq-spring-boot-starter            ${rocketmq-spring-boot-starter-version}                            org.projectlombok            lombok            1.18.6                            org.springframework.boot            spring-boot-starter-test            test            

2、配置文件

# application.propertiesspring.application.name=springboot_rocketmq_producer# nameserver的地址rocketmq.name-server=192.168.139.128:9876#指定生产组名称rocketmq.producer.group=my-group

3、测试类

import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTest(classes = {MyRocketProducerApplication.class})public class MyRocketProducerApplicationTest{    @Autowired    private RocketMQTemplate rocketMQTemplate;    @Test    public void testSendMessage() {        // 用于向broker发送消息        // 第一个参数是topic名称        // 第二个参数是消息内容        this.rocketMQTemplate.convertAndSend(                "topic_springboot_01",                "springboot: hello rocketmq..."       );   }}

创建springboot-rocketmq-consumer工程

pom.xml文件同producer工程

application.properties配置

spring.application.name=springboot-rocketmq-consumerrocketmq.name-server=192.168.139.128:9876

启动类

import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class SpringbootRocketmqConsumerApplication {    public static void main(String[] args) {        SpringApplication.run(SpringbootRocketmqConsumerApplication.class, args);   }}

消息监听器

@Slf4j@Component@RocketMQMessageListener(topic="topic_springboot_01",consumerGroup="springboot-rocketmq-consumer-01")public class Consumer implements RocketMQlistener{        @override    public void onMessage(String message){      log.info("Received messsge:" + message);     }}

启动consumer工程,观察控制台窗口打印内容

springboot: hello rocketmq…

RocketMQ架构

技术架构

图片[4] - RocketMQ【RocketMQ(安装与配置、管理命令、整合springboot、架构)】(二)-全面详解(学习总结—从入门到深化) - MaxSSL

图片[5] - RocketMQ【RocketMQ(安装与配置、管理命令、整合springboot、架构)】(二)-全面详解(学习总结—从入门到深化) - MaxSSL

实时效果反馈

1.Broker承担的工作不包括哪个?

A 接收生产者发来的消息

B 存储消息

C 为其它Broker提供路由信息

D 处理来自消费方的请求

2.NameServer承担的工作不包括哪个?

A 定时与broker通信,保存活跃状态的broker列表

B 负责存储转发消息

C 维护所有broker中的topic列表

D 为生产者、消费者提供最新的路由信息

部署架构

图片[6] - RocketMQ【RocketMQ(安装与配置、管理命令、整合springboot、架构)】(二)-全面详解(学习总结—从入门到深化) - MaxSSL

图片[7] - RocketMQ【RocketMQ(安装与配置、管理命令、整合springboot、架构)】(二)-全面详解(学习总结—从入门到深化) - MaxSSL

结合部署架构图,集群工作流程可作如下描述:

1、启动NameServer,通过监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

2、Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic 跟Broker的映射关系。

3、收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

4、Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,包含Topic中所有队列列表然后选择 一个队列,与队列所在的Broker建立长连接再向Broker发消息。

5、Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

实时效果反馈

1.消息的订阅规则由谁制定?

A 消息的生产者

B 消息的消费者

C NameServer

D broker

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享