【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下)

个人简介:

> 个人主页:赵四司机
> 学习方向:JAVA后端开发
> 种一棵树最好的时间是十年前,其次是现在!
> ⏰往期文章:SpringBoot项目整合微信支付
> 喜欢的话麻烦点点关注喔,你们的支持是我的最大动力。

前言:

最近在做一个基于SpringCloud+Springboot+Docker的新闻头条微服务项目,用的是黑马的教程,现在项目开发进入了尾声,我打算通过写文章的形式进行梳理一遍,并且会将梳理过程中发现的Bug进行修复,有需要改进的地方我也会继续做出改进。这一系列的文章我将会放入微服务项目专栏中,这个项目适合刚接触微服务的人作为练手项目,假如你对这个项目感兴趣你可以订阅我的专栏进行查看,需要资料可以私信我,当然要是能给我点个小小的关注就更好了,你们的支持是我最大的动力。

目录

一:未来数据定时刷新

1.redis key值匹配

方案一:keys模糊匹配

方案二:scan

2.redis管道

3.定时刷新功能实现

二:分布式锁解决集群下的方法抢占执行

1.问题描述

2.分布式锁

3.redis分布式锁

4.实现

(1)方法添加

(2) 代码修改

5.数据库同步

三:延迟队列实现定时发布

1.提供对外接口

2.具体实现

(1)前期准备

(2)添加任务到延迟队列

(3)修改发布文章代码

(4)消费任务进行文章审核


一:未来数据定时刷新

1.redis key值匹配

将未来5分钟之内要发布的文章加入到redis之后,我们需要定时对这部分数据(也就是zset中的数据)进行扫描,以便将zset中时间到了文章存入list中准备发布,但是这时候扫描zset中的数据有两种选择,见下面分析:

方案一:keys模糊匹配

keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞

图片[1] - 【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下) - MaxSSL

方案二:scan

SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。

图片[2] - 【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下) - MaxSSL

2.redis管道

普通redis客户端和服务器交互模式

图片[3] - 【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下) - MaxSSL

Pipeline请求模型

图片[4] - 【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下) - MaxSSL

两者的区别从图中就可以看出来,第一种方式对每一个命令都需要向服务端发送一次请求,假如命令过多会不断创建连接,降低执行效率;而第二种方式则是将一批命令积攒到一起再开启通道一次性执行,大大减少了连接数。

3.定时刷新功能实现

在taskserviceImpl中添加如下方法,并且引导类中开启任务调度注解@EnableScheduling

@Scheduled(cron = "0 */1 * * * " />

3.redis分布式锁

setnx (Setif Not Exists) 命令在指定的 key 不存在时,为 key 设置指定的值。

图片[5] - 【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下) - MaxSSL

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功

  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败

  • 客户端A执行代码完成,删除锁

  • 客户端B在等待一段时间后再去请求设置key的值,设置成功

  • 客户端B执行代码完成,删除锁

4.实现

(1)方法添加

在工具类CacheService中添加如下方法:

/** * 加锁 * * @param name * @param expire * @return */public String tryLock(String name, long expire) {    name = name + "_lock";    String token = UUID.randomUUID().toString();    RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();    RedisConnection conn = factory.getConnection();    try {        //参考redis命令:        //set key value [EX seconds] [PX milliseconds] [NX|XX]        Boolean result = conn.set(                name.getBytes(),                token.getBytes(),                Expiration.from(expire, TimeUnit.MILLISECONDS),                RedisStringCommands.SetOption.SET_IF_ABSENT //NX        );        if (result != null && result)            return token;    } finally {        RedisConnectionUtils.releaseConnection(conn, factory,false);    }    return null;}

参数name表示锁的名称,expire表示锁的过期时间,最重要的是set方法中最后一个参数RedisStringCommands.SetOption.SET_IF_ABSENT,这表示当有一个请求过来之后就会设置key值进行加锁,这样再有请求过来就获取不到了。

(2) 代码修改

/** * 定时器任务,每分钟扫描redis一次 */@Scheduled(cron = "0 */1 * * * " />

/** * 数据库同步任务,每五分钟执行一次 */@PostConstruct  //表示服务一启动便执行一次@Scheduled(cron = "0 */5 * * * ?")public void reloadData() {    log.info("开始同步数据库任务到redis...");    //1.清理缓存任务,避免重复    clearCache();    //2.获取5分钟之后的时间    Calendar calendar = Calendar.getInstance();    calendar.add(Calendar.MINUTE,5);        //3.查看未来所有小于5分钟的任务    List tasks = taskInfoMapper.selectList            (Wrappers.lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));    if(tasks != null && tasks.size() > 0) {        for (Taskinfo taskinfo : tasks) {            Task task = new Task();            BeanUtils.copyProperties(taskinfo,task);            task.setExecuteTime(taskinfo.getExecuteTime().getTime());            addTaskToCache(task);        }    }}/** * 清理缓存任务 */private void clearCache() {    log.info("开始清理缓存任务...");    //获取任务集    Set futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");    Set topicKeys = cacheService.scan(ScheduleConstants.TOPIC + "*");    cacheService.delete(futureKeys);    cacheService.delete(topicKeys);}

三:延迟队列实现定时发布

1.提供对外接口

提供远程的feign接口,在tbug-headlines-feign-api编写类如下:

package com.my.apis.schedule;import com.my.model.common.dtos.ResponseResult;import com.my.model.schedule.dtos.Task;import org.springframework.cloud.openfeign.FeignClient;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;@FeignClient(value = "headlines-schedule")public interface IScheduleClient {    /**     * 添加任务     * @param task   任务对象     * @return       任务id     */    @PostMapping("/api/v1/task/add")    ResponseResult addTask(@RequestBody Task task);    /**     * 取消任务     * @param taskId        任务id     * @return              取消结果     */    @GetMapping("/api/v1/task/cancel/{taskId}")    ResponseResult cancelTask(@PathVariable("taskId") long taskId);    /**     * 按照类型和优先级来拉取任务     * @param type     * @param priority     * @return     */    @GetMapping("/api/v1/task/poll/{type}/{priority}")    ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority")  int priority);}

在tbug-headlines-schedule微服务下提供对应的实现

package com.my.schedule.feign;import com.my.apis.schedule.IScheduleClient;import com.my.model.common.dtos.ResponseResult;import com.my.model.schedule.dtos.Task;import com.my.schedule.service.TaskService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;@RestControllerpublic class ScheduleClient implements IScheduleClient {    @Autowired    private TaskService taskService;    /**     * 添加任务     * @param task   任务对象     * @return       任务id     */    @Override    @PostMapping("/api/v1/task/add")    public ResponseResult addTask(@RequestBody Task task) {        return ResponseResult.okResult(taskService.addTask(task));    }    /**     * 取消任务     * @param taskId        任务id     * @return              取消结果     */    @Override    @GetMapping("/api/v1/task/cancel/{taskId}")    public ResponseResult cancelTask(@PathVariable long taskId) {        return ResponseResult.okResult(taskService.cancelTask(taskId));    }    /**     * 按照类型和优先级来拉取任务     * @param type     * @param priority     * @return     */    @Override    @GetMapping("/api/v1/task/poll/{type}/{priority}")    public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority) {        return ResponseResult.okResult(taskService.poll(type,priority));    }}

2.具体实现

(1)前期准备

①枚举类

package com.my.model.common.enums;import lombok.AllArgsConstructor;import lombok.Getter;@Getter@AllArgsConstructorpublic enum TaskTypeEnum {    NEWS_SCAN_TIME(1001, 1,"文章定时审核"),    REMOTEERROR(1002, 2,"第三方接口调用失败,重试");    private final int taskType; //对应具体业务    private final int priority; //业务不同级别    private final String desc; //描述信息}

②序列化工具

在添加任务到延迟队列的方法中,我们需要用到序列化工具进行序列化操作,而在任务消费时候又需要进行反序列化操作。java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化,但是这里我选用的是效率更高的Protostuff。Protostuff是google开源的,其采用更为紧凑的二进制数组,表现更加优异。

将ProtostuffUtil拷贝到tbug-headlines-utils中,然后导入如下依赖

    io.protostuff    protostuff-core    1.6.0    io.protostuff    protostuff-runtime    1.6.0

(2)添加任务到延迟队列

创建WmNewsTaskService

package com.my.wemedia.service;import java.util.Date;public interface WmNewsTaskService {    /**     * 添加任务到延迟队列中     * @param id 文章id     * @param publishTime  文章发布时间     */    void addNewsToTask(Integer id, Date publishTime);    /**     * 消费延迟队列数据     */    void scanNewsByTask();}

实现类

package com.my.wemedia.service.impl;import com.alibaba.fastjson.JSON;import com.my.apis.schedule.IScheduleClient;import com.my.common.constans.ScheduleConstants;import com.my.common.redis.CacheService;import com.my.model.common.dtos.ResponseResult;import com.my.model.common.enums.TaskTypeEnum;import com.my.model.schedule.dtos.Task;import com.my.model.wemedia.pojos.WmNews;import com.my.utils.common.ProtostuffUtil;import com.my.wemedia.service.WmAutoScanService;import com.my.wemedia.service.WmNewsTaskService;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Async;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Service;import java.util.Calendar;import java.util.Date;@Slf4j@Servicepublic class WmNewsTaskServiceImpl implements WmNewsTaskService {    @Autowired    private IScheduleClient scheduleClient;    /**     * 添加任务到延迟队列     * @param id 文章id     * @param publishTime  文章发布时间     */    @Override    @Async    public void addNewsToTask(Integer id, Date publishTime) {        log.info("添加任务到延迟服务中---begin");        Task task = new Task();        task.setExecuteTime(publishTime.getTime());        task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());        task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());        WmNews wmNews = new WmNews();        wmNews.setId(id);        task.setParameters(ProtostuffUtil.serialize(wmNews));        scheduleClient.addTask(task);        log.info("添加任务到延迟服务中---end");    }}

(3)修改发布文章代码

将之前的异步调用审核文章修改为将文章数据加入延迟队列

    @Autowired    private WmAutoScanService wmAutoScanService;    @Autowired    private WmNewsTaskService wmNewsTaskService;    /**     * 提交文章     * @param dto     * @return     */    @Override    public ResponseResult submitNews(WmNewsDto dto) throws Exception {        //1.参数校验        if(dto == null || dto.getContent().length() == 0) {            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);        }        //2.保存或修改文章        //2.1属性拷贝        WmNews wmNews = new WmNews();        BeanUtils.copyProperties(dto,wmNews);        //2.2设置封面图片        if(dto.getImages() != null && dto.getImages().size() != 0) {            String images = StringUtils.join(dto.getImages(), ",");            wmNews.setImages(images);        }        //2.3封面类型为自动        if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)) {            wmNews.setType(null);        }        saveOrUpdateWmNews(wmNews);        //3.判断是否为草稿        if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())) {            //直接保存结束            return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);        }        //4.不是草稿        //4.1保存文章图片素材与文章关系        //4.1.1提取图片素材列表        List imagesList = getImagesList(dto);        //4.1.2保存        saveRelatedImages(imagesList,wmNews.getId(),WemediaConstants.WM_CONTENT_REFERENCE);        //4.2保存封面图片和文章关系        saveRelatedCover(dto,imagesList,wmNews);        //5.审核文章(异步调用)        // wmAutoScanService.AutoScanTextAndImage(wmNews.getId());        //5.将任务添加到延迟服务        wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);    }

(4)消费任务进行文章审核

在WmNewsTaskServiceImpl中添加如下方法:

@Autowiredprivate WmNewsAutoScanServiceImpl wmNewsAutoScanService;/**     * 消费延迟队列数据     */@Scheduled(fixedRate = 1000)@Override@SneakyThrowspublic void scanNewsByTask() {    ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());    if(responseResult.getCode().equals(200) && responseResult.getData() != null){        log.info("文章审核---消费任务执行---begin---");        String json_str = JSON.toJSONString(responseResult.getData());        Task task = JSON.parseObject(json_str, Task.class);        byte[] parameters = task.getParameters();        WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);        System.out.println(wmNews.getId()+"-----------");        wmNewsAutoScanService.autoScanWmNews(wmNews.getId());        log.info("文章审核---消费任务执行---end---");    }    }

下篇预告:定时发布文章优化策略

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享