1.RabbitMQ 架构原理
由于 RabbitMQ 实现了 AMQP 协议,所以 RabbitMQ 的工作模型也是基于
AMQP 的。理解这张图片至关重要。
1.1 Broker 中介
我们要使用 RabbitMQ 来收发消息,必须要安装一个 RabbitMQ 的服务,可以安
装在 Windows 上面也可以安装在 Linux 上面,默认是 5672 的端口。这台 RabbitMQ
的服务器我们把它叫做 Broker,中文翻译是代理/中介,因为 MQ 服务器帮助我们做
的事情就是存储、转发消息。
1.2 Connection 连接
无论是生产者发送消息,还是消费者接收消息,都必须要跟 Broker 之间建立一个
连接,这个连接是一个 TCP 的长连接。
1.3 Channel 通道
https://www.rabbitmq.com/api-guide.html#concurrency
https://www.rabbitmq.com/channels.html
https://stackoverflow.com/questions/18418936/rabbitmq-and-relationshi
p-between-channel-and-connection
如果所有的生产者发送消息和消费者接收消息,都直接创建和释放 TCP 长连接的
话,对于 Broker 来说肯定会造成很大的性能损耗,也会浪费时间。
所以在 AMQP 里面引入了 Channel 的概念,它是一个虚拟的连接。我们把它翻
译成通道,或者消息信道。这样我们就可以在保持的 TCP 长连接里面去创建和释放
Channel,大大了减少了资源消耗。
不同的 Channel 是相互隔离的,每个 Channel 都有自己的编号。对于每个客户端
线程来说,Channel 就没必要共享了,各自用自己的 Channel。
另外一个需要注意的是,Channel 是 RabbitMQ 原生 API 里面的最重要的编程接
口,也就是说我们定义交换机、队列、绑定关系,发送消息,消费消息,调用的都是
Channel 接口上的方法。
1.4 Queue 队列
连接到 Broker 以后,就可以收发消息了。
在 Broker 上有一个对象用来存储消息,在 RabbitMQ 里面这个对象叫做 Queue。
实际上 RabbitMQ 是用数据库来存储消息的,这个数据库跟 RabbitMQ 一样是用
Erlang 开发的,名字叫 Mnesia。我们可以在磁盘上找到 Mnesia 的存储路径。 Windows 系统保存在用户目录下:
C:\Users\用户名\AppData\Roaming\RabbitMQ\db\rabbit@用户名-mnesia
CentOS 保存在/var/lib 目录下:
/var/lib/rabbitmq/mnesia
队列也是生产者和消费者的纽带,生产者发送的消息到达队列,在队列中存储。
消费者从队列消费消息。
1.5 Consumer 消费者
消息到底是 Broker 推送给消费者的?还是消费者主动获取的?消费者消费消息
有两种模式。
一种是 Pull 模式,对应的方法是 basicGet。消息存放在服务端,只有消费者主动
获取才能拿到消息。如果每隔一段时间获取一次消息,消息的实时性会降低。但是好
处是可以根据自己的消费能力决定获取消息的频率。
另一种是 Push 模式,对应的方法是 basicConsume,只要生产者发消息到服务
器,就马上推送给消费者,消息保存在客户端,实时性很高,如果消费不过来有可能
会造成消息积压。Spring AMQP 是 push 方式,通过事件机制对队列进行监听,只要
有消息到达队列,就会触发消费消息的方法。
RabbitMQ 中 pull 和 push 都有实现。而 kafka 和 RocketMQ 只有 pull。
https://www.rabbitmq.com/api-guide.html#consuming
由于队列有 FIFO 的特性,只有确定前一条消息被消费者接收之后,Broker 才会
把这条消息从数据库删除,继续投递下一条消息。
一个消费者是可以监听多个队列的,一个队列也可以被多个消费者监听。
但是在生产环境中,我们一般是建议一个消费者只处理一个队列的消息。如果需
要提升处理消息的能力,可以增加多个消费者。这个时候消息会在多个消费者之间轮
询。
1.6 Exchange 交换机
现在我们来思考一个问题,如果要把一条消息发送给多个队列,被多个消费者消
费,应该怎么做?生产者是不是必须要调用多次 basicPublish 的方法,依次发送给多
个队列?就像消息推送的这种场景,有成千上万个队列的时候,对生产者来说压力太
大了。
有没有更好的办法呢?其实,RabbitMQ 已经为我们考虑到了这一点,它设计了
一个帮我们路由消息的组件,叫做 Exchange。
也就是说,不管有多少个队列需要接收消息,我都只需要发送到 Exchange 就 OK
了,由它帮我来分发。Exchange 是不会存储消息的,它只做一件事情,根据规则分发
消息。
那么,Exchange 和这些需要接收消息的队列必须建立一个绑定关系,并且为每个
队列指定一个特殊的标识。
Exchange 和队列是多对多的绑定关系,也就说,一个交换机的消息一个路由给多
个队列,一个队列也可以接收来自多个交换机的消息。
绑定关系建立好之后,生产者发送消息到 Exchange,也会携带一个特殊的标识。
当这个标识跟绑定的标识匹配的时候,消息就会发给一个或者多个符合规则的队列。
那 Exchange 有哪些可设置的字段呢?
这个特殊的标识是怎么运作的,后面再详细展开来讲。
1.7 Vhost 虚拟机
我们每个需要实现基于 RabbitMQ 的异步通信的系统,都需要在 Broker 上创建
自己要用到的交换机、队列和它们的绑定关系。如果某个业务系统不想跟别人混用一
个 Broker,怎么办?再采购一台硬件服务器单独安装一个 RabbitMQ 服务?这种方式
成本太高了。在同一个硬件服务器上安装多个 RabbitMQ 的服务呢?比如再运行一个
5673 的端口?
没有必要这样做,因为 RabbitMQ 也考虑到了这一点,设计了虚拟主机 VHOST。
VHOST 除了可以提高硬件资源的利用率之外,还可以实现资源的隔离和权限的控
制。它的作用类似于其他编程语言中的 namespace 和 package,不同的 VHOST 中
可以有同名的 Exchange 和 Queue,它们是完全透明的。
这个时候,我们可以为不同的业务系统创建专属于他们自己的 VHOST,然后再为
他们创建专属的用户,给用户分配对应的 VHOST 的权限。比如给风控系统的用户分
配风控系统的 VHOST 的权限,这个用户可以访问里面的交换机和队列。给超级管理
员分配所有 VHOST 的权限。
我们安装 RabbitMQ 的时候会自带一个默认的 VHOST,名字是“/”。
2、消息分发机制
我们说到 RabbitMQ 引入 Exchange 是为了实现消息的灵活路由,到底有哪些路
由方式?
RabbitMQ 中一共有四种类型的交换机,Direct、Topic、Fanout、Headers。其
中 Headers 不常用。交换机的类型可以在创建的时候指定,网页或者代码中。
direct: 直连 fanout: 广播 topic: 根据key匹配 headers: 根据消息头匹配
topic: # 代表的所有的 * 代表只能匹配一个节点
改了配置要重启才能生效
3、RabbitMQ 持久化与内存管理
3.1 持久化机制
RabbitMQ 的持久化分为消息持久化、队列持久化、交换器持久化。无论是持久
化消息还是非持久化消息都可以被写入磁盘。
当 RabbitMQ 收到消息时,如果是持久化消息,则会储存在内存中,同时也会写
入磁盘;如果是非持久化消息,则只会存在内存中。当内存使用达到 RabbitMQ 的临
界值时,内存中的数据会被交换到磁盘,持久化消息由于本就存在于磁盘中,不会被
重复写入。
消息的持久化是在发消息时,通过 deliveryMode 设置,队列、交换器也可以通
过参数持久化,非持久化的消息、队列、交换器在服务重启后会消失,即使已经被写
入磁盘。
3.2 内存控制
RabbitMQ 中通过内存阈值参数控制内存的使用量,当内存使用超过配置的阈值
时,RabbitMQ 会阻塞客户端的连接并停止接收从客户端发来的消息,以免服务崩溃,
同时,会发出内存告警,此时客户端于与服务端的心跳检测也会失效。
当出现内存告警时,可以通过管理命令临时调整。
rabbitmqctl set
_vm
_memory_high
_watermark
fraction 为内存阈值,默认是 0.4,表示 RabbitMQ 使用的内存超过系统内存的
40%时,会产生内存告警,通过此命令修改的阈值在重启后会失效。可以通过修改配
置文件的方式,使之永久生效,但是需要重启服务。
3.4 内存换页
在 RabbitMQ 达到内存阈值并阻塞生产者之前,会尝试将内存中的消息换页到磁
盘,以释放内存空间。内存换页由换页参数控制,默认为 0.5,表示当内存使用量达到
内存阈值的 50%时会进行换页,也就是 0.4*0.5=0.2。
vm
_memory_high
_watermark
_paging_ratio=0.5
当换页阈值大于 1 时,相当于禁用了换页功能
3.5 磁盘控制
RabbitMQ 通过磁盘阈值参数控制磁盘的使用量,当磁盘剩余空间小于磁盘阈值
时,RabbitMQ 同样会阻塞生产者,避免磁盘空间耗尽。
磁盘阈值默认 50M,由于是定时检测磁盘空间,不能完全消除因磁盘耗尽而导致
崩溃的可能性,比如在两次检测之间,磁盘空间从大于 50M 变为 0M。
一种相对谨慎的做法是将磁盘阈值大小设置与内存相等
rabbitmqctl set_disk_free_limit rabbitmqctl set_disk_free_limit mem_relative # limit 为绝对值,KB、MB、GB# fraction 为相对值,建议 1.0~2.0 之间# rabbitmq.confdisk_free_limit.relative=1.5# disk_free_limit.absolute=50MB
4、RabbiMQ 插件管理
RabbitMQ 插件机制的设计,主要是用于面对特殊场景的需求,可以实现任意扩
展。
4.1 插件列表管理命令
rabbitmq-plugins list
此命令,列出当前可以安装使用的插件。插件前面[ ] 为空说明,没有安装。有 e*
说明插件是安装了的。
4.2 插件安装使用,
我们以 rabbitmq_management 为例。
rabbitmq-plugins enable rabbitmq_management
此命令,就是安装启用管理。
4.3 插件卸载
同上,插件卸载,只需要将我们的命令,enable 改为,disbale 即可。
rabbitmq-plugins disable rabbitmq_management
5、RabbitMQ 配置备查
RabbitMQ 常用的三种自定义服务器的通用方法:
配置文件 rabbitmq.conf
环境变量文件 rabbitmq-env.conf
补充配置文件 advanced.config
rabbitmq.conf 和 rabbitmq-env.conf 的位置
在二进制安装中路径是在 :安装目录下的/etc/rabbitmq/
rpm 安装: /etc/rabbitmq/
如果 rabbitmq.conf 和 rabbitmq-env.conf 的两个文件不存在,那么我们可以创建该文
件,然后我们可以通过环境变量
指定该文件的位置。
补充 :
rabbitmqctl rabbitmqctl 是管理虚拟主机和用户权限的工具
rabbitmq-plugins 是管理插件的工具
5.1 rabbitmq.conf
在 rabbitmq 3.7.0 之前,rabbitmq.conf 使用了 Erlang 语法配置格式,新的版本使用
了 sysctl 格式. sysctl 语法:
单个信息都在一行里面
配置信息以 key value 的形式保存。
‘#’开头表示注释。
配置属性和描述(官网链接:https://www.rabbitmq.com/configure.html)