根据amqp协议、rabbitmq入门、springboot集成rabbitmq 可知,rabbitmq的广播模式关键是使用fanout类型的exchange,fanout exchange会忽略message中的routing-key、queue中的binding-key,发给绑定exchange的全部queue。
创建fanout类型的exchange
import org.springframework.amqp.core.*;@Configurationpublic class MqConfig {/** * 定义广播交换机 * @return */@Beanpublic FanoutExchange fanoutExchange() {final FanoutExchange fanoutExchange = new FanoutExchange("自定义广播类型的交换机名称");return fanoutExchange;}}
发送
@Autowired private AmqpTemplate amqpTemplate; //发送到订阅数据的exchange中 amqpTemplate.convertAndSend("自定义广播类型的交换机名称", //fanout类型的exchange会忽略routing-key,所以这里的binding key传空字符串"",message);
消费
import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;/*** 将数据发送给队列1* @param message*/ @RabbitListener(bindings = @QueueBinding(value = @Queue(“自定义队列1”), exchange = @Exchange(value = "自定义广播类型的交换机名称", type = ExchangeTypes.FANOUT), //fanout类型exchange会忽略binding-key key = "")) public void doSynAddDataToJD(String message) { log.info("广播模式,同步数据给订阅方"); }/*** 将数据发送给队列2 * @param message */@RabbitListener(bindings = @QueueBinding(value = @Queue(“自定义队列2”),exchange = @Exchange(value = "自定义广播类型的交换机名称",type = ExchangeTypes.FANOUT),key = ""))public void doSynAddDataToJD(String message) {log.info("广播模式,同步数据给订阅方");}
总结
实现发布订阅(广播模式)的关键在于对exchange类型的理解,可参考amqp协议、rabbitmq入门、springboot集成rabbitmq 、AMQP 0-9-1 Model Explained,源码中的类型有如下几种
package org.springframework.amqp.core;/** * Constants for the standard Exchange type names. * * @author Mark Fisher * @author Gary Russell */public abstract class ExchangeTypes {/** * Direct exchange. * routing key和binding key完全匹配 */public static final String DIRECT = "direct";/** * Topic exchange. * binding key可使用通配符来匹配routing key */public static final String TOPIC = "topic";/** * Fanout exchange. * 会忽略routing key、binding key,消息发送到绑定exchange的全部queue */public static final String FANOUT = "fanout";/** * Headers exchange. * 使用headers中的属性来匹配,有只匹配一项或者全部匹配可选 */public static final String HEADERS = "headers";/** * System exchange. * 这个类型,暂时缺乏相关资料。 */public static final String SYSTEM = "system";}