RabbitMQ一、MQ1.同步调用的优缺点

同步调用的优点:

  • 时效性较强,可以立即得到结果

同步调用的问题:

  • 耦合度高
  • 性能和吞吐能力下降
  • 有额外的资源消耗
  • 有级联失败问题

2.异步调用

异步调用常见实现就是事件驱动模式

好处:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速

  • 故障隔离:服务没有直接调用,不存在级联失败问题

  • 调用间没有阻塞,不会造成无效的资源占用

  • 耦合度极低,每个服务都可以灵活插拔,可替换

  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理
  • 需要依赖于Broker的可靠、安全、性能

3.MQ实现

MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

二、Linux安装RabbitMQ1.1.下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

在课前资料已经提供了镜像包:

上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar

1.2.安装MQ

执行下面的命令来运行MQ容器:

docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management

注意5672是MQ消息通信的端口,15672是ui端口

三、RabbitMQ1.种类

大致分为两种

1.**无交换机的 **基本消息队列 和 工作消息队列

2.有交换机的发布订阅,路由,主题

2.基于SpringAMQP使用2.1基础消息队列

  1. 先使用测试类生成队列

    public class ConsumerTest {    public static void main(String[] args) throws IOException, TimeoutException {        // 1.建立连接        ConnectionFactory factory = new ConnectionFactory();        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码        factory.setHost("116.62.32.68");        factory.setPort(5672);        factory.setVirtualHost("/");        factory.setUsername("itcast");        factory.setPassword("123321");        // 1.2.建立连接        Connection connection = factory.newConnection();        // 2.创建通道Channel        Channel channel = connection.createChannel();        // 3.创建队列        String queueName = "simple.queue";        channel.queueDeclare(queueName, false, false, false, null);        // 4.订阅消息        channel.basicConsume(queueName, true, new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body) throws IOException {                // 5.处理消息                String message = new String(body);                System.out.println("接收到消息:【" + message + "】");            }        });        System.out.println("等待接收消息。。。。");    }}
  2. 引入依赖

        org.springframework.boot    spring-boot-starter-amqp
  3. 生产者编写yaml文件的MQ地址,端口号,用户名密码

    spring:  rabbitmq:    host: 192.168.150.101 # 主机名    port: 5672 # 端口    virtual-host: / # 虚拟主机    username: itcast # 用户名    password: 123321 # 密码
  4. 编写生产者代码

    @RunWith(SpringRunner.class)@SpringBootTestpublic class SpringAmqpTest {    //类似于redisTemplate    @Autowired    private RabbitTemplate rabbitTemplate;    @Test    public void testSimpleQueue(){        //队列名        String queueName = "simple.queue";        //消息        String message = "hello,SpringAmqp";        //发送消息到队列        rabbitTemplate.convertAndSend(queueName,message);    }}
  5. 消费者编写yaml文件的MQ地址,端口号,用户名密码

    同上

  6. 编写消费者代码

    @Componentpublic class SpringRabbitListener {    @RabbitListener(queues = "simple.queue")    public void listenSimpleQueueMessage(String msg) throws InterruptedException {        System.out.println("spring 消费者接收到消息:【" + msg + "】");    }}

2.2工作消息队列

消费者:

//工作队列@RabbitListener(queues = "simple.queue")public void listenWorkQueue1Message(String msg) throws InterruptedException {    System.out.println("spring 消费者1接收到消息:【" + msg + "】");}@RabbitListener(queues = "simple.queue")public void listenSimpleQueue2Message(String msg) throws InterruptedException {    System.out.println("spring 消费者2接收到消息:【" + msg + "】");}
logging:  pattern:    dateformat: MM-dd HH:mm:ss:SSSspring:  rabbitmq:    host:  # 主机名    port: 5672 # 端口    virtual-host: / # 虚拟主机    username: itcast # 用户名    password: 123321 # 密码    listener:      direct:        prefetch: 1 # 消息预期,之前把所有消息预期导致平分消息,现在是处理一个取一个

2.3广播Fanout

多了一个交换机,交换机决定将生产者的消息发送给哪个队列,这决定就是交换机种类不同规则不同,因此分为三种,广播、路由和主题。

广播:只要队列绑定了交换机,就发送消息到队列

1.编写消费者:使用注解声明队列和交换机(同时绑定消费者的队列和交换机)

//3.广播(绑定队列,同时绑定交换机)@RabbitListener(bindings = @QueueBinding(        value = @Queue(name = "fanout.queue1"),        exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT)))public void listenFanoutQueue1(String message){    System.out.println("广播方式队列1接受消息:"+message);}@RabbitListener(bindings = @QueueBinding(        value = @Queue(name = "fanout.queue2"),        exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT)))public void listenFanoutQueue2(String message){    System.out.println("广播方式队列2接受消息:"+message);}

2.生产者

//广播类型交换机@Testpublic void testFanoutExchangeQueue(){    //交换机名称    String exchangeName = "fanout.exange";    //消息    String message = "广播消息";    //发送,第二个参数是队列的key,在路由类型中可以使用到    rabbitTemplate.convertAndSend(exchangeName,"",message);}

2.4路由Route

和广播相比,在队列中多了一个key属性。

交换机通过消息的key来对应路由到相同key的队列上。

1.消费者:

/**    4.路由 */@RabbitListener(bindings = @QueueBinding(        value = @Queue(name = "direct.queue1"),        exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),        key = {"red","blue"}))public void listenDirectQueue1(String message){    System.out.println("路由红、蓝队列接受消息:"+message);}@RabbitListener(bindings = @QueueBinding(        value = @Queue(name = "direct.queue2"),        exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),        key = {"red","green"}))public void listenDirectQueue2(String message){    System.out.println("路由红、绿队列接受消息:"+message);}

2.生产者

//路由类型交换机@Testpublic void testDirectExchangeQueue(){    //交换机名称    String exchangeName = "direct.exchange";    //消息    String message = "路由消息";    //发送,第二个参数是队列的key,在路由类型中可以使用到    rabbitTemplate.convertAndSend(exchangeName,"red",message);}

2.5主题Topic

key不再是一组单词,而是一个规则

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

1.消费者

/** * 主题 */@RabbitListener(bindings = @QueueBinding(        value = @Queue(name = "topic.queue1"),        exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),        key = "china.#"))public void listenTopicQueue1(String message){    System.out.println("中国队列接受消息:"+message);}@RabbitListener(bindings = @QueueBinding(        value = @Queue(name = "topic.queue2"),        exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),        key = "#.news"))public void listenTopicQueue2(String message){    System.out.println("新闻队列接受消息:"+message);}

2.生产者

//主题类型交换机@Testpublic void testTopicExchangeQueue(){    //交换机名称    String exchangeName = "topic.exchange";    //消息    String message = "china.lala消息";    //发送,第二个参数是队列的key,在路由类型中可以使用到    rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}

3.消息转换器

Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

因此需要配置一个Json转换器

步骤

  1. 在publisher和consumer两个服务中都引入依赖:
    com.fasterxml.jackson.dataformat    jackson-dataformat-xml    2.9.10
  1. 配置消息转换器。在启动类中添加一个Bean即可:
@Beanpublic MessageConverter jsonMessageConverter(){    return new Jackson2JsonMessageConverter();}