实体类
为了方便测试,直接在测试类中的写内部类:
@Data @AllArgsConstructor @NoArgsConstructor public class OrderInfo { /** * 订单id */ private Integer id; /** * 描述:用来记录关闭时间,可以在测试时用来验证。关闭时间是否跟 expireTime相等 */ private String description; /** * 创建时间 */ private LocalDateTime createTime; /** * 过期时间:关闭时间 */ private LocalDateTime expireTime; }
生成订单
模拟生成订单并设置过期时间。
执行时会在redis创建2个key:
- redisson_delay_queue:{
} :订单数据
- redisson_delay_queue_timeout:{
} :zset类型,按时间戳排序
/** * 创建订单,并设置过期时间 * * @throws IOException */ @Test void createOrder() { RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(closeKey); RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingDeque); // 100条订单 int n = 100; Random random = new Random(); for (int i = 0; i < n; i++) { // 1~100之间的正整数 int i1 = random.nextInt(100) + 1; LocalDateTime now = LocalDateTime.now(); delayedQueue.offer(new OrderInfo(i + 1, "close: " + i1, now, now.plusSeconds(i1)), i1, TimeUnit.SECONDS); } }
关闭订单
关闭订单,这里会产生订阅。redis会出现redisson_delay_queue_channel
。
/** * 关闭订单 * * @throws IOException */ @Test void closeOrder() { ReentrantLock lock = new ReentrantLock(); // 5个线程 int poolSize = 5; List<CompletableFuture> futureList = new ArrayList(); for (int i = 0; i { RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(closeKey); // 加入监听 redissonClient.getDelayedQueue(blockingDeque); while (true) { OrderInfo take; try { take = blockingDeque.take(); } catch (Exception e) { continue; } if (take == null) { continue; } // 验证多次是否会重复关闭。正常里不会近,只是验证下。正式环境,可以删除 try { lock.lock(); if(closed.contains(take.getId())){ log.info("测试是否会抢占:已存在其他线程处理关闭订单[{}]", take.getId()); } closed.add(take.getId()); }finally { lock.unlock(); } // 处理订单关闭逻辑 log.info("订单[{}]关闭中。。。", take.getId()); log.info("订单[{}]已关闭!order={}", take.getId(), toJsonString(take)); } })); } // 模拟正式环境中进程一直在运行,因为test时,没有join则会只执行一次出现消费完数据后进程就关闭了 CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); }
完整测试类:
package cn.skyjilygao.demo;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.junit.jupiter.api.Test;import org.redisson.api.RBlockingDeque;import org.redisson.api.RDelayedQueue;import org.redisson.api.RedissonClient;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import java.io.IOException;import java.time.LocalDateTime;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.Set;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ConcurrentSkipListSet;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantLock;import static cn.skyjilygao.util.EntityUtil.toJsonString;@Slf4j@SpringBootTestpublic class CloseOrderTests { @Autowired private RedissonClient redissonClient; public static String closeKey = "order_close_test"; public volatile static Set closed = new ConcurrentSkipListSet(); /** * 创建订单,并设置过期时间 * * @throws IOException */ @Test void createOrder() { RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(closeKey); RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingDeque); int a = 100; Random random = new Random(100); for (int i = 0; i < a; i++) { int i1 = random.nextInt(1 + i) + 1; delayedQueue.offer(new OrderInfo(i + 1, "close: " + i1, LocalDateTime.now(), LocalDateTime.now().plusSeconds(i1)), i1, TimeUnit.SECONDS); } } /** * 关闭订单 * * @throws IOException */ @Test void closeOrder() { ReentrantLock lock = new ReentrantLock(); // 5个线程 int poolSize = 5; List<CompletableFuture> futureList = new ArrayList(); for (int i = 0; i { RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(closeKey); // 加入监听 redissonClient.getDelayedQueue(blockingDeque); while (true) { OrderInfo take; try { take = blockingDeque.take(); } catch (Exception e) { continue; } if (take == null) { continue; } try { lock.lock(); if(closed.contains(take.getId())){ log.info("测试是否会抢占:已存在其他线程处理关闭订单[{}]", take.getId()); } closed.add(take.getId()); }finally { lock.unlock(); } log.info("订单[{}]关闭中。。。", take.getId()); log.info("订单[{}]已关闭!order={}", take.getId(), toJsonString(take)); } })); } // 模拟正式环境中进程一直在运行,因为test时,没有join则会只执行一次出现消费完数据后进程就关闭了 CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); } @Data @AllArgsConstructor @NoArgsConstructor public class OrderInfo { private Integer id; private String description; private LocalDateTime createTime; private LocalDateTime expireTime; }}