作者简介:大家好,我是小童,Java开发工程师,CSDN博客博主,Java领域新星创作者
系列专栏:前端、Java、Java中间件大全、微信小程序、微信支付、若依框架、Spring全家桶
如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步
如果感觉博主的文章还不错的话,请三连支持一下博主哦
博主正在努力完成2023计划中:以梦为马,扬帆起航,2023追梦人
目录
RocketMQ安装与配置
RocketMQ管理命令
RocketMQ整合springboot
RocketMQ架构
RocketMQ安装与配置
环境搭建与测试
RocketMQ最新版本:5.0
下载地址:https://rocketmq.apache.org/zh/release-notes/
1、普通安装
修改环境变量
export JAVA_HOME=/usr/local/jdk-11.0.11/export PATH=$PATH:$JAVA_HOME/binexport ROCKETMQ_HOME=/usr/local/rocketmq-5.0.0export PATH=$PATH:$ROCKETMQ_HOME/bin
解压文件
unzip rocketmq-all-5.0.0-bin-release.zip
启动NameServer
nohup sh mqnamesrv &
启动broker
nohup sh mqbroker -n localhost:9876
RocketMQ管理命令
mqadmin help 命令名称
1、启动namesrv和broker
./mqnamesrv #启动nameserver./mqbroker -n localhost:9876 -c/opt/alibaba-rocketmq/conf/broker.conf #启动broker
2、查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log #查看日志tail -f ~/logs/rocketmqlogs/broker.log #查看日志
3、新增topic
mqadmin updateTopic -n localhost:9876 -cDefaultCluster -t topicWarning
4、查看某个topic的状态
mqadmin topicStatus -n localhost:9876 -ttopicWarning
5、查看所有消费组group
mqadmin consumerProgress -n localhost:9876
6、查看所有topic
mqadmin topicList -n localhost:9876
7、删除topic
mqadmin deleteTopic -n localhost:9876 -cDefaultCluster -t topicWarning
8、关闭namesrv和broker服务
mqshutdown namesrvmqshutdown broker
RocketMQ整合springboot
创建springboot-rocketmq-producer工程
1、在pom.xml文件中添加依赖
org.apache.rocketmq rocketmq-spring-boot-starter ${rocketmq-spring-boot-starter-version} org.projectlombok lombok 1.18.6 org.springframework.boot spring-boot-starter-test test
2、配置文件
# application.propertiesspring.application.name=springboot_rocketmq_producer# nameserver的地址rocketmq.name-server=192.168.139.128:9876#指定生产组名称rocketmq.producer.group=my-group
3、测试类
import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTest(classes = {MyRocketProducerApplication.class})public class MyRocketProducerApplicationTest{ @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void testSendMessage() { // 用于向broker发送消息 // 第一个参数是topic名称 // 第二个参数是消息内容 this.rocketMQTemplate.convertAndSend( "topic_springboot_01", "springboot: hello rocketmq..." ); }}
创建springboot-rocketmq-consumer工程
pom.xml文件同producer工程
application.properties配置
spring.application.name=springboot-rocketmq-consumerrocketmq.name-server=192.168.139.128:9876
启动类
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class SpringbootRocketmqConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringbootRocketmqConsumerApplication.class, args); }}
消息监听器
@Slf4j@Component@RocketMQMessageListener(topic="topic_springboot_01",consumerGroup="springboot-rocketmq-consumer-01")public class Consumer implements RocketMQlistener{ @override public void onMessage(String message){ log.info("Received messsge:" + message); }}
启动consumer工程,观察控制台窗口打印内容
springboot: hello rocketmq…
RocketMQ架构
技术架构
实时效果反馈
1.Broker承担的工作不包括哪个?
A 接收生产者发来的消息
B 存储消息
C 为其它Broker提供路由信息
D 处理来自消费方的请求
2.NameServer承担的工作不包括哪个?
A 定时与broker通信,保存活跃状态的broker列表
B 负责存储转发消息
C 维护所有broker中的topic列表
D 为生产者、消费者提供最新的路由信息
部署架构
结合部署架构图,集群工作流程可作如下描述:
1、启动NameServer,通过监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
2、Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic 跟Broker的映射关系。
3、收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
4、Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,包含Topic中所有队列列表然后选择 一个队列,与队列所在的Broker建立长连接再向Broker发消息。
5、Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
实时效果反馈
1.消息的订阅规则由谁制定?
A 消息的生产者
B 消息的消费者
C NameServer
D broker