1.SleepUtil线程睡眠工具类
package com.hong.utils;/** * @Description: 线程睡眠工具类 * @Author: hong * @Date: 2023-12-16 23:10 * @Version: 1.0 **/public class SleepUtil {public static void sleep(int second) {try {Thread.sleep(1000*second);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
2.消息生产者
package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;import com.rabbitmq.client.Channel;import java.util.Scanner;/** * @Description: 消息手动应答时不丢失,放回队列重新消费 * @Author: hong * @Date: 2023-12-16 22:33 * @Version: 1.0 **/public class Task3 {public static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);Scanner scanner = new Scanner(System.in);System.out.println("请输入:");while (scanner.hasNext()){String message = scanner.next();channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));System.out.println("消息发送完成------" + message);}}}
3.两个消费者
模拟一个处理速度快(Worker3),另一个处理速度慢(Worker4)
3.1.处理时间短
package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;import com.hong.utils.SleepUtil;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;/** * @Description: 消息手动应答时不丢失,放回队列重新消费 * @Author: hong * @Date: 2023-12-16 23:05 * @Version: 1.0 **/public class Worker3 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker3等待接收消息,处理速度快");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(1);System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));//手动应答/** * 第一个参数:消息标识 * 第二个参数是否批量:true批量 */channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");//手动应答false channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}}
3.2.处理时间长
package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;import com.hong.utils.SleepUtil;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;/** * @Description: 消息手动应答时不丢失, 放回队列重新消费 * @Author: hong * @Date: 2023-12-16 23:05 * @Version: 1.0 **/public class Worker4 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker4等待接收消息,处理速度慢");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(20);System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));//手动应答/** * 第一个参数:消息标识 * 第二个参数是否批量:true批量 */channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");//手动应答false channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}}
4.结果
启动生产者后启动2个消费者,等消息bb接收到后,发送cc和dd
等Worker4接收到消息bb后将其关闭,发现原本该Worker4消费的消息dd并未丢失,重回队列被Worker3消费
5.持久化
5.1.队列持久化
package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;import com.rabbitmq.client.Channel;import java.util.Scanner;/** * @Description: 队列持久化 * @Author: hong * @Date: 2023-12-17 22:52 * @Version: 1.0 **/public class Task4 {public static final String TASK_QUEUE_NAME = "persist_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();//true持久化channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);Scanner scanner = new Scanner(System.in);System.out.println("请输入:");while (scanner.hasNext()){String message = scanner.next();channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));System.out.println("消息发送完成------" + message);}}}
5.2.消息持久化
package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;import java.util.Scanner;/** * @Description: 队列持久化与消息持久化 * @Author: hong * @Date: 2023-12-17 22:52 * @Version: 1.0 **/public class Task4 {public static final String TASK_QUEUE_NAME = "persist_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();//队列持久化 true持久化channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);Scanner scanner = new Scanner(System.in);System.out.println("请输入:");while (scanner.hasNext()){String message = scanner.next();//消息持久化MessageProperties.PERSISTENT_TEXT_PLAINchannel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println("消息发送完成------" + message);}}}
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END