RabbitMQ手动应答与持久化


1.SleepUtil线程睡眠工具类

package com.hong.utils;/** * @Description: 线程睡眠工具类 * @Author: hong * @Date: 2023-12-16 23:10 * @Version: 1.0 **/public class SleepUtil {public static void sleep(int second) {try {Thread.sleep(1000*second);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}

2.消息生产者

package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;import com.rabbitmq.client.Channel;import java.util.Scanner;/** * @Description: 消息手动应答时不丢失,放回队列重新消费 * @Author: hong * @Date: 2023-12-16 22:33 * @Version: 1.0 **/public class Task3 {public static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);Scanner scanner = new Scanner(System.in);System.out.println("请输入:");while (scanner.hasNext()){String message = scanner.next();channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));System.out.println("消息发送完成------" + message);}}}

3.两个消费者

模拟一个处理速度快(Worker3),另一个处理速度慢(Worker4)

3.1.处理时间短

package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;import com.hong.utils.SleepUtil;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;/** * @Description: 消息手动应答时不丢失,放回队列重新消费 * @Author: hong * @Date: 2023-12-16 23:05 * @Version: 1.0 **/public class Worker3 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker3等待接收消息,处理速度快");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(1);System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));//手动应答/** * 第一个参数:消息标识 * 第二个参数是否批量:true批量 */channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");//手动应答false channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}}

3.2.处理时间长

package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;import com.hong.utils.SleepUtil;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;/** * @Description: 消息手动应答时不丢失, 放回队列重新消费 * @Author: hong * @Date: 2023-12-16 23:05 * @Version: 1.0 **/public class Worker4 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker4等待接收消息,处理速度慢");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(20);System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));//手动应答/** * 第一个参数:消息标识 * 第二个参数是否批量:true批量 */channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");//手动应答false channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}}

4.结果

启动生产者后启动2个消费者,等消息bb接收到后,发送cc和dd
图片[1] - RabbitMQ手动应答与持久化 - MaxSSL
图片[2] - RabbitMQ手动应答与持久化 - MaxSSL
等Worker4接收到消息bb后将其关闭,发现原本该Worker4消费的消息dd并未丢失,重回队列被Worker3消费
图片[3] - RabbitMQ手动应答与持久化 - MaxSSL

5.持久化

5.1.队列持久化

package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;import com.rabbitmq.client.Channel;import java.util.Scanner;/** * @Description: 队列持久化 * @Author: hong * @Date: 2023-12-17 22:52 * @Version: 1.0 **/public class Task4 {public static final String TASK_QUEUE_NAME = "persist_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();//true持久化channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);Scanner scanner = new Scanner(System.in);System.out.println("请输入:");while (scanner.hasNext()){String message = scanner.next();channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));System.out.println("消息发送完成------" + message);}}}

5.2.消息持久化

package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;import java.util.Scanner;/** * @Description: 队列持久化与消息持久化 * @Author: hong * @Date: 2023-12-17 22:52 * @Version: 1.0 **/public class Task4 {public static final String TASK_QUEUE_NAME = "persist_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();//队列持久化 true持久化channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);Scanner scanner = new Scanner(System.in);System.out.println("请输入:");while (scanner.hasNext()){String message = scanner.next();//消息持久化MessageProperties.PERSISTENT_TEXT_PLAINchannel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println("消息发送完成------" + message);}}}

图片[4] - RabbitMQ手动应答与持久化 - MaxSSL

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享