个人简介:
> 个人主页:赵四司机
> 学习方向:JAVA后端开发
> 种一棵树最好的时间是十年前,其次是现在!
> ⏰往期文章:SpringBoot项目整合微信支付
> 喜欢的话麻烦点点关注喔,你们的支持是我的最大动力。
前言:
最近在做一个基于SpringCloud+Springboot+Docker的新闻头条微服务项目,用的是黑马的教程,现在项目开发进入了尾声,我打算通过写文章的形式进行梳理一遍,并且会将梳理过程中发现的Bug进行修复,有需要改进的地方我也会继续做出改进。这一系列的文章我将会放入微服务项目专栏中,这个项目适合刚接触微服务的人作为练手项目,假如你对这个项目感兴趣你可以订阅我的专栏进行查看,需要资料可以私信我,当然要是能给我点个小小的关注就更好了,你们的支持是我最大的动力。
目录
一:未来数据定时刷新
1.redis key值匹配
方案一:keys模糊匹配
方案二:scan
2.redis管道
3.定时刷新功能实现
二:分布式锁解决集群下的方法抢占执行
1.问题描述
2.分布式锁
3.redis分布式锁
4.实现
(1)方法添加
(2) 代码修改
5.数据库同步
三:延迟队列实现定时发布
1.提供对外接口
2.具体实现
(1)前期准备
(2)添加任务到延迟队列
(3)修改发布文章代码
(4)消费任务进行文章审核
一:未来数据定时刷新
1.redis key值匹配
将未来5分钟之内要发布的文章加入到redis之后,我们需要定时对这部分数据(也就是zset中的数据)进行扫描,以便将zset中时间到了文章存入list中准备发布,但是这时候扫描zset中的数据有两种选择,见下面分析:
方案一:keys模糊匹配
keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞
方案二:scan
SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。
2.redis管道
普通redis客户端和服务器交互模式
Pipeline请求模型
两者的区别从图中就可以看出来,第一种方式对每一个命令都需要向服务端发送一次请求,假如命令过多会不断创建连接,降低执行效率;而第二种方式则是将一批命令积攒到一起再开启通道一次性执行,大大减少了连接数。
3.定时刷新功能实现
在taskserviceImpl中添加如下方法,并且引导类中开启任务调度注解@EnableScheduling
@Scheduled(cron = "0 */1 * * * " /> 3.redis分布式锁
setnx (Setif Not Exists) 命令在指定的 key 不存在时,为 key 设置指定的值。
这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作
-
客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
-
客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
-
客户端A执行代码完成,删除锁
-
客户端B在等待一段时间后再去请求设置key的值,设置成功
-
客户端B执行代码完成,删除锁
4.实现
(1)方法添加
在工具类CacheService中添加如下方法:
/** * 加锁 * * @param name * @param expire * @return */public String tryLock(String name, long expire) { name = name + "_lock"; String token = UUID.randomUUID().toString(); RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory(); RedisConnection conn = factory.getConnection(); try { //参考redis命令: //set key value [EX seconds] [PX milliseconds] [NX|XX] Boolean result = conn.set( name.getBytes(), token.getBytes(), Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT //NX ); if (result != null && result) return token; } finally { RedisConnectionUtils.releaseConnection(conn, factory,false); } return null;}
参数name表示锁的名称,expire表示锁的过期时间,最重要的是set方法中最后一个参数RedisStringCommands.SetOption.SET_IF_ABSENT,这表示当有一个请求过来之后就会设置key值进行加锁,这样再有请求过来就获取不到了。
(2) 代码修改
/** * 定时器任务,每分钟扫描redis一次 */@Scheduled(cron = "0 */1 * * * " /> /** * 数据库同步任务,每五分钟执行一次 */@PostConstruct //表示服务一启动便执行一次@Scheduled(cron = "0 */5 * * * ?")public void reloadData() { log.info("开始同步数据库任务到redis..."); //1.清理缓存任务,避免重复 clearCache(); //2.获取5分钟之后的时间 Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE,5); //3.查看未来所有小于5分钟的任务 List tasks = taskInfoMapper.selectList (Wrappers.lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime())); if(tasks != null && tasks.size() > 0) { for (Taskinfo taskinfo : tasks) { Task task = new Task(); BeanUtils.copyProperties(taskinfo,task); task.setExecuteTime(taskinfo.getExecuteTime().getTime()); addTaskToCache(task); } }}/** * 清理缓存任务 */private void clearCache() { log.info("开始清理缓存任务..."); //获取任务集 Set futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); Set topicKeys = cacheService.scan(ScheduleConstants.TOPIC + "*"); cacheService.delete(futureKeys); cacheService.delete(topicKeys);}
三:延迟队列实现定时发布
1.提供对外接口
提供远程的feign接口,在tbug-headlines-feign-api编写类如下:
package com.my.apis.schedule;import com.my.model.common.dtos.ResponseResult;import com.my.model.schedule.dtos.Task;import org.springframework.cloud.openfeign.FeignClient;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;@FeignClient(value = "headlines-schedule")public interface IScheduleClient { /** * 添加任务 * @param task 任务对象 * @return 任务id */ @PostMapping("/api/v1/task/add") ResponseResult addTask(@RequestBody Task task); /** * 取消任务 * @param taskId 任务id * @return 取消结果 */ @GetMapping("/api/v1/task/cancel/{taskId}") ResponseResult cancelTask(@PathVariable("taskId") long taskId); /** * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ @GetMapping("/api/v1/task/poll/{type}/{priority}") ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority);}
在tbug-headlines-schedule微服务下提供对应的实现
package com.my.schedule.feign;import com.my.apis.schedule.IScheduleClient;import com.my.model.common.dtos.ResponseResult;import com.my.model.schedule.dtos.Task;import com.my.schedule.service.TaskService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;@RestControllerpublic class ScheduleClient implements IScheduleClient { @Autowired private TaskService taskService; /** * 添加任务 * @param task 任务对象 * @return 任务id */ @Override @PostMapping("/api/v1/task/add") public ResponseResult addTask(@RequestBody Task task) { return ResponseResult.okResult(taskService.addTask(task)); } /** * 取消任务 * @param taskId 任务id * @return 取消结果 */ @Override @GetMapping("/api/v1/task/cancel/{taskId}") public ResponseResult cancelTask(@PathVariable long taskId) { return ResponseResult.okResult(taskService.cancelTask(taskId)); } /** * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ @Override @GetMapping("/api/v1/task/poll/{type}/{priority}") public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority) { return ResponseResult.okResult(taskService.poll(type,priority)); }}
2.具体实现
(1)前期准备
①枚举类
package com.my.model.common.enums;import lombok.AllArgsConstructor;import lombok.Getter;@Getter@AllArgsConstructorpublic enum TaskTypeEnum { NEWS_SCAN_TIME(1001, 1,"文章定时审核"), REMOTEERROR(1002, 2,"第三方接口调用失败,重试"); private final int taskType; //对应具体业务 private final int priority; //业务不同级别 private final String desc; //描述信息}
②序列化工具
在添加任务到延迟队列的方法中,我们需要用到序列化工具进行序列化操作,而在任务消费时候又需要进行反序列化操作。java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化,但是这里我选用的是效率更高的Protostuff。Protostuff是google开源的,其采用更为紧凑的二进制数组,表现更加优异。
将ProtostuffUtil拷贝到tbug-headlines-utils中,然后导入如下依赖
io.protostuff protostuff-core 1.6.0 io.protostuff protostuff-runtime 1.6.0
(2)添加任务到延迟队列
创建WmNewsTaskService
package com.my.wemedia.service;import java.util.Date;public interface WmNewsTaskService { /** * 添加任务到延迟队列中 * @param id 文章id * @param publishTime 文章发布时间 */ void addNewsToTask(Integer id, Date publishTime); /** * 消费延迟队列数据 */ void scanNewsByTask();}
实现类
package com.my.wemedia.service.impl;import com.alibaba.fastjson.JSON;import com.my.apis.schedule.IScheduleClient;import com.my.common.constans.ScheduleConstants;import com.my.common.redis.CacheService;import com.my.model.common.dtos.ResponseResult;import com.my.model.common.enums.TaskTypeEnum;import com.my.model.schedule.dtos.Task;import com.my.model.wemedia.pojos.WmNews;import com.my.utils.common.ProtostuffUtil;import com.my.wemedia.service.WmAutoScanService;import com.my.wemedia.service.WmNewsTaskService;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Async;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Service;import java.util.Calendar;import java.util.Date;@Slf4j@Servicepublic class WmNewsTaskServiceImpl implements WmNewsTaskService { @Autowired private IScheduleClient scheduleClient; /** * 添加任务到延迟队列 * @param id 文章id * @param publishTime 文章发布时间 */ @Override @Async public void addNewsToTask(Integer id, Date publishTime) { log.info("添加任务到延迟服务中---begin"); Task task = new Task(); task.setExecuteTime(publishTime.getTime()); task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType()); task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); WmNews wmNews = new WmNews(); wmNews.setId(id); task.setParameters(ProtostuffUtil.serialize(wmNews)); scheduleClient.addTask(task); log.info("添加任务到延迟服务中---end"); }}
(3)修改发布文章代码
将之前的异步调用审核文章修改为将文章数据加入延迟队列
@Autowired private WmAutoScanService wmAutoScanService; @Autowired private WmNewsTaskService wmNewsTaskService; /** * 提交文章 * @param dto * @return */ @Override public ResponseResult submitNews(WmNewsDto dto) throws Exception { //1.参数校验 if(dto == null || dto.getContent().length() == 0) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //2.保存或修改文章 //2.1属性拷贝 WmNews wmNews = new WmNews(); BeanUtils.copyProperties(dto,wmNews); //2.2设置封面图片 if(dto.getImages() != null && dto.getImages().size() != 0) { String images = StringUtils.join(dto.getImages(), ","); wmNews.setImages(images); } //2.3封面类型为自动 if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)) { wmNews.setType(null); } saveOrUpdateWmNews(wmNews); //3.判断是否为草稿 if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())) { //直接保存结束 return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); } //4.不是草稿 //4.1保存文章图片素材与文章关系 //4.1.1提取图片素材列表 List imagesList = getImagesList(dto); //4.1.2保存 saveRelatedImages(imagesList,wmNews.getId(),WemediaConstants.WM_CONTENT_REFERENCE); //4.2保存封面图片和文章关系 saveRelatedCover(dto,imagesList,wmNews); //5.审核文章(异步调用) // wmAutoScanService.AutoScanTextAndImage(wmNews.getId()); //5.将任务添加到延迟服务 wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime()); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
(4)消费任务进行文章审核
在WmNewsTaskServiceImpl中添加如下方法:
@Autowiredprivate WmNewsAutoScanServiceImpl wmNewsAutoScanService;/** * 消费延迟队列数据 */@Scheduled(fixedRate = 1000)@Override@SneakyThrowspublic void scanNewsByTask() { ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); if(responseResult.getCode().equals(200) && responseResult.getData() != null){ log.info("文章审核---消费任务执行---begin---"); String json_str = JSON.toJSONString(responseResult.getData()); Task task = JSON.parseObject(json_str, Task.class); byte[] parameters = task.getParameters(); WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class); System.out.println(wmNews.getId()+"-----------"); wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); log.info("文章审核---消费任务执行---end---"); } }
下篇预告:定时发布文章优化策略