实体类

为了方便测试,直接在测试类中的写内部类:

    @Data    @AllArgsConstructor    @NoArgsConstructor    public class OrderInfo {        /**         * 订单id         */        private Integer id;        /**         * 描述:用来记录关闭时间,可以在测试时用来验证。关闭时间是否跟 expireTime相等         */        private String description;        /**         * 创建时间         */        private LocalDateTime createTime;        /**         * 过期时间:关闭时间         */        private LocalDateTime expireTime;    }

生成订单

模拟生成订单并设置过期时间。
执行时会在redis创建2个key:

  • redisson_delay_queue:{ } :订单数据
  • redisson_delay_queue_timeout:{ } :zset类型,按时间戳排序
    /**     * 创建订单,并设置过期时间     *     * @throws IOException     */    @Test    void createOrder() {        RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(closeKey);        RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingDeque);        // 100条订单        int n = 100;        Random random = new Random();        for (int i = 0; i < n; i++) {            // 1~100之间的正整数            int i1 = random.nextInt(100) + 1;            LocalDateTime now = LocalDateTime.now();            delayedQueue.offer(new OrderInfo(i + 1, "close: " + i1, now, now.plusSeconds(i1)), i1, TimeUnit.SECONDS);        }    }

关闭订单

关闭订单,这里会产生订阅。redis会出现redisson_delay_queue_channel

    /**     * 关闭订单     *     * @throws IOException     */    @Test    void closeOrder() {        ReentrantLock lock = new ReentrantLock();        // 5个线程        int poolSize = 5;        List<CompletableFuture> futureList = new ArrayList();        for (int i = 0; i  {                RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(closeKey);                // 加入监听                redissonClient.getDelayedQueue(blockingDeque);                while (true) {                    OrderInfo take;                    try {                        take = blockingDeque.take();                    } catch (Exception e) {                        continue;                    }                    if (take == null) {                        continue;                    }                    // 验证多次是否会重复关闭。正常里不会近,只是验证下。正式环境,可以删除                    try {                        lock.lock();                        if(closed.contains(take.getId())){                            log.info("测试是否会抢占:已存在其他线程处理关闭订单[{}]", take.getId());                        }                        closed.add(take.getId());                    }finally {                        lock.unlock();                    }                    // 处理订单关闭逻辑                    log.info("订单[{}]关闭中。。。", take.getId());                    log.info("订单[{}]已关闭!order={}", take.getId(), toJsonString(take));                }            }));        }        // 模拟正式环境中进程一直在运行,因为test时,没有join则会只执行一次出现消费完数据后进程就关闭了        CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();    }

完整测试类:

package cn.skyjilygao.demo;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.junit.jupiter.api.Test;import org.redisson.api.RBlockingDeque;import org.redisson.api.RDelayedQueue;import org.redisson.api.RedissonClient;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import java.io.IOException;import java.time.LocalDateTime;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.Set;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ConcurrentSkipListSet;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantLock;import static cn.skyjilygao.util.EntityUtil.toJsonString;@Slf4j@SpringBootTestpublic class CloseOrderTests {    @Autowired    private RedissonClient redissonClient;    public static String closeKey = "order_close_test";    public volatile static Set closed = new ConcurrentSkipListSet();    /**     * 创建订单,并设置过期时间     *     * @throws IOException     */    @Test    void createOrder() {        RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(closeKey);        RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingDeque);        int a = 100;        Random random = new Random(100);        for (int i = 0; i < a; i++) {            int i1 = random.nextInt(1 + i) + 1;            delayedQueue.offer(new OrderInfo(i + 1, "close: " + i1, LocalDateTime.now(), LocalDateTime.now().plusSeconds(i1)), i1, TimeUnit.SECONDS);        }    }    /**     * 关闭订单     *     * @throws IOException     */    @Test    void closeOrder() {        ReentrantLock lock = new ReentrantLock();        // 5个线程        int poolSize = 5;        List<CompletableFuture> futureList = new ArrayList();        for (int i = 0; i  {                RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(closeKey);                // 加入监听                redissonClient.getDelayedQueue(blockingDeque);                while (true) {                    OrderInfo take;                    try {                        take = blockingDeque.take();                    } catch (Exception e) {                        continue;                    }                    if (take == null) {                        continue;                    }                    try {                        lock.lock();                        if(closed.contains(take.getId())){                            log.info("测试是否会抢占:已存在其他线程处理关闭订单[{}]", take.getId());                        }                        closed.add(take.getId());                    }finally {                        lock.unlock();                    }                    log.info("订单[{}]关闭中。。。", take.getId());                    log.info("订单[{}]已关闭!order={}", take.getId(), toJsonString(take));                }            }));        }        // 模拟正式环境中进程一直在运行,因为test时,没有join则会只执行一次出现消费完数据后进程就关闭了        CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();    }    @Data    @AllArgsConstructor    @NoArgsConstructor    public class OrderInfo {        private Integer id;        private String description;        private LocalDateTime createTime;        private LocalDateTime expireTime;    }}