- 在实际分布式项目中延迟任务一般不会使用JDK自带的延迟队列,因为它是基于JVM内存存储,没有持久化操作,所以当服务重启后就会丢失任务。
- 在项目中可以使用MQ死信队列或redisson延迟队列进行处理延迟任务,本篇文章将讲述redisson延迟队列的使用demo和其执行源码。
demo示例
- 通过脚手架创建一个简易springboot项目,引入redisson的maven依赖,并简单配置redisson连接属性。
org.redisson redisson 3.16.6 @Configurationpublic class RedissonConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; /** * 获取redissonClient实例 * * @return * @throws Exception */ @Bean public RedissonClient getRedisson() { Config config = new Config(); String address = "redis://" + host + ":" + port; config.useSingleServer().setAddress(address); return Redisson.create(config); }}复制代码
- 定义一个redisson延迟队列插入和获取任务处理类RedissonQueueHandle,通过控制spring的bean加载周期开启独立线程获取延迟任务。
- 这里获取延迟任务使用了三种方法,除了第一种阻塞式获取任务方法外,其他两种方法都不是百分比按照延迟参数获取到任务,因为是时间间隔定时循环获取延迟任务。
/** * redisson延迟队列处理器 * * @author zrh */@Slf4j@Componentpublic class RedissonQueueHandle implements InitializingBean { private final RBlockingQueue<RedisDataEntity> queue; private final RDelayedQueue<RedisDataEntity> delayedQueue; public RedissonQueueHandle (RedissonClient client) { this.queue = client.getBlockingQueue("redisson:queue"); this.delayedQueue = client.getDelayedQueue(queue); } @Override public void afterPropertiesSet () { // 开一个线程阻塞式获取任务 thread(); // 使用netty时间轮循环获取任务// watchDog(new HashedWheelTimer()); // 使用线程池定时获取任务// schedule(); } private void thread () { new Thread(() -> { while (true) { try { RedisDataEntity entity = queue.take(); log.info("本次获取数据:{},耗时:{}", entity, System.currentTimeMillis() - entity.getTime()); } catch (Exception e) { } } }, "zrh").start(); } private void watchDog (final HashedWheelTimer timer) { timer.newTimeout(timeout -> { RedisDataEntity entity = queue.poll(); if (null != entity) { log.info("本次获取数据:{},耗时:{}", entity, System.currentTimeMillis() - entity.getTime()); } watchDog(timer); }, 3, TimeUnit.SECONDS); } private void schedule () { Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(() -> { RedisDataEntity entity = queue.poll(); if (null != entity) { log.info("本次获取数据:{},耗时:{}", entity, System.currentTimeMillis() - entity.getTime()); } }, 5, 5, TimeUnit.SECONDS); } /** * 放入redis,定时过期 * * @param entity */ public void offer (RedisDataEntity entity) { try { delayedQueue.offer(entity, entity.getExpire(), TimeUnit.MILLISECONDS); } catch (Exception e) { log.error("放入redis延迟队列异常", e); } }}复制代码
- 放入redisson延迟队列可以是字符串也可以是对象RedisDataEntity,因为有进行IO磁盘存储操作,所以必须实现Serializable序列化接口。
/** * @Author: ZRH * @Date: 2022/1/10 11:54 */@Datapublic class RedisDataEntity implements Serializable { /** * 数据 */ private final T data; /** * 过期时间(单位:毫秒) */ private final Long expire; /** * 添加时间 */ private final Long time; public RedisDataEntity (T data, Long expire, Long time) { this.data = data; this.expire = expire; this.time = time; }}复制代码
- 然后开一个插入数据接口:
/** * @Author: ZRH * @Date: 2022/1/10 11:45 */@Slf4j@RestControllerpublic class IndexController { private final RedissonQueueHandle redisHandle; public IndexController (RedissonQueueHandle redisHandle) { this.redisHandle = redisHandle; } @PostMapping("redissonQueue") public String redissonQueue (@RequestParam String data, @RequestParam Long expire) { RedisDataEntity entity = new RedisDataEntity(data, expire, System.currentTimeMillis()); log.info("本次添加数据:{}", entity); redisHandle.offer(entity); return "ok"; }}访问接口设置延迟30秒:http://localhost:8802/redissonQueue?data=a&expire=30000,打印结果如下2022-01-14 14:21:52.140 INFO 10808 --- [nio-8802-exec-1] c.r.web.controller.IndexController : 本次添加数据:RedisDataEntity(data=a, expire=30000, time=1642141312135)2022-01-14 14:21:52.887 INFO 10808 --- [nio-8802-exec-2] c.r.web.controller.IndexController : 本次添加数据:RedisDataEntity(data=a, expire=30000, time=1642141312887)2022-01-14 14:22:22.240 INFO 10808 --- [ zrh] c.r.web.redis.RedissonQueueHandle : 本次获取数据:RedisDataEntity(data=a, expire=30000, time=1642141312135),耗时:301052022-01-14 14:22:22.914 INFO 10808 --- [ zrh] c.r.web.redis.RedissonQueueHandle : 本次获取数据:RedisDataEntity(data=a, expire=30000, time=1642141312887),耗时:30027复制代码
初始执行流程源码解析
- redisson延迟队列最终都是和redis服务进行交互的,那可以使用monitor命令查看redis中执行了哪些命令,这样对了解其执行流程有很大帮助。
zrangebyscore和zrange指令
- 订阅指令SUBSCRIBE发出后,在QueueTransferTask.start()方法里添加的监听器触发了,就会执行pushTask()
- pushTaskAsync()方法执行完(lua脚本执行完),就会开启一个定时任务scheduleTask()
...... protected abstract RTopic getTopic(); protected abstract RFuture pushTaskAsync(); private void pushTask() { // 这个抽象方法在之前构建RedissonDelayedQueue对象的构造函数里有实现,最后返回元素过期时间 RFuture startTimeFuture = pushTaskAsync(); startTimeFuture.onComplete((res, e) -> { if (e != null) { if (e instanceof RedissonShutdownException) { return; } log.error(e.getMessage(), e); scheduleTask(System.currentTimeMillis() + 5 * 1000L); return; } if (res != null) { scheduleTask(res); } }); }复制代码
BLPOP指令
- 当RedissonDelayedQueue延迟队列构造完成后,会调用延迟队列的take()方法获取延迟任务,然后会进入RedissonBlockingQueue.takeAsync()方法:
...... @Override public RFuture takeAsync() { return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), 0); } /* * (non-Javadoc) * @see java.util.concurrent.BlockingQueue#take() */ @Override public V take() throws InterruptedException { return commandExecutor.getInterrupted(takeAsync()); } ......复制代码
- 注意这里的参数其值为 BLPOP,很明显这里就是和我们要找的BLPOP指令有关,所以这里其实就是客户端通过BLPOP指令阻塞式获取值。在客户端开个线程一直循环阻塞获取元素即可;
- 看下源码继续向下进入CommandAsyncService.writeAsync(…)方法,然后继续向下进入RedisExecutor.execute()方法:
...... public void execute() { if (mainPromise.isCancelled()) {...} if (!connectionManager.getShutdownLatch().acquire()) {...} codec = getCodec(codec); // 获取连接 RFuture connectionFuture = getConnection(); RPromise attemptPromise = new RedissonPromise(); mainPromiseListener = (r, e) -> {...}; if (attempt == 0) {...} scheduleRetryTimeout(connectionFuture, attemptPromise); connectionFuture.onComplete((connection, e) -> { if (connectionFuture.isCancelled()) {...} if (!connectionFuture.isSuccess()) {...} // 连接获取成功就执行当前方法 sendCommand(attemptPromise, connection); writeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { checkWriteFuture(writeFuture, attemptPromise, connection); } }); }); attemptPromise.onComplete((r, e) -> {...}); }复制代码
- 该方法里一些支线方法按下不表。中间有个超时重试机制,使用netty的时间轮,不是重点也就不表述了。
- 先获取写入操作连接对象任务,然后进入方法sendCommand(attemptPromise, connection)发送指令
- 指令:”BLPOP”,参数:”redisson:queue” “0”
- 然后跟进插入元素offer方法,进入RedissonDelayedQueue.offerAsync()方法内,如下所示:
...... @Override public void offer(V e, long delay, TimeUnit timeUnit) { get(offerAsync(e, delay, timeUnit)); } @Override public RFuture offerAsync(V e, long delay, TimeUnit timeUnit) { if (delay < 0) { throw new IllegalArgumentException("Delay can't be negative"); } long delayInMs = timeUnit.toMillis(delay); long timeout = System.currentTimeMillis() + delayInMs; long randomId = ThreadLocalRandom.current().nextLong(); return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID, "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);" + "redis.call('rpush', KEYS[3], value);" // if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); " + "if v[1] == value then " + "redis.call('publish', KEYS[4], ARGV[1]); " + "end;", Arrays.
- 其中很明显一长串的脚本命令就是在redis中执行的指令,基本流程比较简单:
- “zadd”:这是向zset集合”redisson_delay_queue_timeout:{redisson:queue}”里添加元素数据(此数据被处理过,不用管其结构),排序值为当前时间戳+延迟时间
- “rpush”:把元素数据推送到list队列”redisson:queue”
- “zrange”:获取zset集合”redisson_delay_queue_timeout:{redisson:queue}”中排好序的第一个元素
- “publish”:如果上述获取的元素是本次插入的元素,那就发布通知队列”redisson_delay_queue_channel:{redisson:queue}”,内容为当前元素的过期时间,这样做是为了减少本次元素到期的时间差。
最后定时器源码解析
- 定时器任务主要是通过监听器监听到了有新的客户端订阅或元素通知发布出来时,就会执行pushTask()和scheduleTask(…)方法:
...... private int messageListenerId; private int statusListenerId; public void start() { RTopic schedulerTopic = getTopic(); // 当有新的客户端订阅schedulerTopic,就是触发执行pushTask()方法 statusListenerId = schedulerTopic.addListener(new BaseStatusListener() { @Override public void onSubscribe(String channel) { pushTask(); } }); // 当redis有新的消息通知,就会触发scheduleTask(...)方法,startTime为上述中publish通知的元素过期时间 messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener() { @Override public void onMessage(CharSequence channel, Long startTime) { scheduleTask(startTime); } }); }复制代码
- pushTask()方法是对redis延迟队列进行操作的方法,scheduleTask(…)是netty时间轮来控制调用pushTask()方法,所以pushTask()和scheduleTask()互相调用。
...... private void scheduleTask(final Long startTime) { TimeoutTask oldTimeout = lastTimeout.get(); if (startTime == null) {...} if (oldTimeout != null) {...} long delay = startTime - System.currentTimeMillis(); if (delay > 10) { Timeout timeout = connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { pushTask(); TimeoutTask currentTimeout = lastTimeout.get(); if (currentTimeout.getTask() == timeout) { lastTimeout.compareAndSet(currentTimeout, null); } } }, delay, TimeUnit.MILLISECONDS); if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) { timeout.cancel(); } } else { pushTask(); } } protected abstract RTopic getTopic(); protected abstract RFuture pushTaskAsync(); private void pushTask() { RFuture startTimeFuture = pushTaskAsync(); startTimeFuture.onComplete((res, e) -> { if (e != null) { if (e instanceof RedissonShutdownException) { return; } log.error(e.getMessage(), e); scheduleTask(System.currentTimeMillis() + 5 * 1000L); return; } if (res != null) { scheduleTask(res); } }); }复制代码
- 总结:
- 当有新的客户端进行订阅,就调用pushTask()方法拉取数据放入阻塞队列。
- 当有信的消息进行发布,就调用scheduleTask(…)方法,并根据其过期时间判断是通过时间轮延迟调用还是立即调用pushTask()方法。
最后
- redisson延迟队列的源码相对而言其实是比较抽象复杂的,感觉没有其分布式锁这块源码容易解析。
- 但仔细用心去看,跟着主要方法走还是可以了解其执行流程。虚心学习,共同进步 -_-