我们在高并发的项目中基本上都离不开缓存,那么既然引入缓存,那就会有一个缓存与数据库数据一致性的问题。

首先,我们先来看看高并发项目里面Redis常见的三种缓存读写模式。

Cache Aside

读写分离模式,是最常见的Redis缓存模式,多数采用。读写数据时需要先查找缓存,如果缓存中没有,则从数据库中查找数据。如果查询到数据,需要将数据放到缓存中,下次访问再直接从缓存中获取数据,以提高访问效率。写操作通常不会直接更新缓存,而是删除缓存,因为存储结构是hash、list,则更新数据需要遍历。
  • 优点
    • 读取效率高,缓存命中率高,写操作与数据库同步,数据一致性较高,实现较为简单。
  • 缺点
    • 数据库和缓存之间存在数据不一致的问题,需要考虑缓存失效和数据库更新操作带来的缓存不一致问题。
  • 应用场景
    • 适用于读操作非常频繁而写操作相对比较少的情况,如电商网站的商品详情页,读取次数远高于更新和添加操作。

Read/Write Through

读写穿透模式,读写操作会直接修改缓存,然后再同步更新数据库中的数据,开发较为复杂,一般少用。在Read/WriteThrough模式下,每次数据的读写操作都会操作缓存,再同步到数据库,以保证缓存和数据库数据的一致性。应用程序将缓存作为主要的数据源,数据库对于应用程序是透明的,更新数据库和从数据库的读取的任务都交给缓存来实现。
  • 优点
    • 写操作速度较快,一致性较高,缓存与数据库的数据保持一致,缓存命中率较高。
  • 缺点
    • 读操作较慢,如果缓存没有可用数据,每次都会进行数据库查询,数据量较大时会对性能带来较大的影响。
  • 应用场景
    • 系统处理写操作频繁且读操作不频繁的场景,如云存储Ceph

Write Behind

被称为Write Back模式或异步写入模式,一般较少使用。如果有写操作,缓存会记录修改了缓存的数据,但是并不会立即同步到数据库中。一般会把缓存中的数据更新到磁盘中,等到后续有查询数据操作时,再异步批量更新数据库中的数据。该模式的优点就是写操作速度很快,不会对性能产生影响,同时也避免了频繁更新数据库的情况,提升了数据库性能。
  • 优点
    • 写操作速度快,性能较高,数据一致性一般较高。
  • 缺点
    • 读操作较慢,由于异步方式更新数据库,可能会存在数据的延迟。
  • 应用场景:
    • 用于较数据读写比重较高的场景,如游戏中的用户活动积分等信息,刚开始对写操作性能要求很高,后续查询比较少。

业务开发里面,基本都是从数据库读取到缓存里面,那缓存和数据库的读写顺序是怎样的?

场景一:先更新数据库,再更新缓存

  • 线程A 更新数据库,更新完数据库,线程A 更新缓存,缓存更新成功,但是线程A 数据库事务commit失败,或者方法体发生异常,进行rollback 。会导致缓存和数据库不一致 。

场景二:先删缓存,再更新数据库

  • 线程A 删除缓存,更新数据库,但是还没commit。这时线程B访问缓存,发现没数据,去数据库读取未commit的放到缓存,也就是老数据。线程A 进行了commit操作。这也会导致缓存是老数据和数据库是新数据,不一致。

场景三:先删除缓存,再更新数据库,再删除缓存

  • 线程A 删除缓存,更新数据库,但是还没commit。线程B访问缓存,发现没数据,去数据库读取未commit的放到缓存(老数据 )。线程A 进行了commit操作。线程A 再次删除缓存数据(这时缓存为空,后续读取就是最新的数据)。保证了数据的一致性,但是浪费了多次IO,相当于每次都要多删一次Redis。

OK,我们回归正题,什么是多级缓存架构?

多级缓存架构是一种通常用于优化应用程序性能且高可用的缓存技术,通常由多层缓存组成,其中每一层缓存可以根据其不同的特征和作用进行选择和调整。通过缓存数据的多份副本,最大化应用程序性能和可用性,避免缓存穿透、缓存击穿、以及数据不一致的问题。

这里我们采用Nginx+Lua+Canal+Redis+Mysql架构,也就是说读操作通过Lua查询Nginx的缓存,如果Nginx缓存没有数据,则查询Redis缓存,如果Redis缓存也没有数据,直接查询mysql写操作时,Canal监听数据库指定表的增量变化,Java程序消费Canal监听到的增量变化写到Redis中。Java-canal程序操作Redis缓存,Nginx本地缓存是否应用和失效,取决项目类型。

OK,什么是Canal呢?

阿里巴巴基于 MySQL 的增量日志解析和订阅发布系统,主要用于解决数据订阅与消费问题。Canal主要支持了MySQL的binlog解析,解析完成利用canal client 用来处理获得的相关数据。

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

  • 基于日志增量订阅和消费的业务包括

    • 数据库镜像
    • 数据库实时备份
    • 索引构建和实时维护(拆分异构索引、倒排索引等)
    • 业务 cache 刷新
    • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

  • Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议。
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )。
  • Canal 解析 binary log 对象(原始为 byte 流) ,并解析成独立的数据操作事件,如插入、更新、删除等。

环境准备

OK,前面主要是给大家介绍了下多级缓存的一个架构,接下来我们先做一些环境的准备。

首先部署MySQL,这里我们采用docker容器化的方式部署MySQL。部署完成后一定要开启MySQL的binlog日志。

#创建目录mkdir -p /home/data/mysql/#部署docker run \-p 3306:3306 \-e MYSQL_ROOT_PASSWORD=123456 \-v /home/data/mysql/conf:/etc/mysql/conf.d \-v /home/data/mysql/data:/var/lib/mysql:rw \--name mysql_test \--restart=always \-d mysql:8.0

编辑配置文件my.cnf,在mysqld模块下,编辑完成后重启mysql服务。

# 开启 binlog, 可以不加,默认开启log-bin=mysql-bin# 选择 ROW 模式binlog_format=row#server_id不要和canal的slaveId重复server-id=1

以下是MySQL三种binlog模式的简介

STATEMENT 格式

Statement-Based Replication,SBR,每一条会修改数据的 SQL 都会记录在 binlog 中。每一条会修改数据 SQL 都会记录在 binlog 中,性能高,发生的变更操作只记录所执行的 SQL 语句,而不记录具体变更的值。不需要记录每一行数据的变化,极大的减少了 binlog 的日志量,避免了大量的 IO 操作,提升了系统的性能。由于 Statement 模式只记录 SQL,而如果一些 SQL 中 包含了函数,那么可能会出现执行结果不一致的情况。缺点:uuid() 函数,每次执行都会生成随机字符串,在 master 中记录了 uuid,当同步到 slave 后再次执行,结果不一样,now()之类的函数以及获取系统参数的操作, 都会出现主从数据不同步的问题。

ROW 格式(默认)

Row-Based Replication,RBR,不记录 SQL 语句上下文信息,仅保存哪条记录被修改。Row 格式不记录 SQL 语句上下文相关信息,仅记录某一条记录被修改成什么样子。清楚地记录下每一行数据修改的细节,不会出现 Statement 中存在的那种数据无法被正常复制的情况,保证最高精度和粒度。缺点:Row 格式存在问题,就是日志量太大,批量 update、整表 delete、alter 表等操作,要记录每一行数据的变化,此时会产生大量的日志,大量的日志也会带来 IO 性能问题。

MIXED 格式

在 STATEMENT 和 ROW 之间自动进行切换的模式。在没有大量变更时使用 STATEMENT 格式。而在发生大量变更时使用 ROW 格式,以确保日志具有高精度和粒度,同时保证存储空间的有效使用。

运行以下命令检查是否开启了binlog:SHOW VARIABLES LIKE 'log_bin'

数据库授权

-- 创建同步用户CREATE USER 'canal'@'%';-- 设置密码ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '123456';-- 授予复制权限GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- 刷新权限FLUSH PRIVILEGES;

然后我们部署一下Redis,也是采用docker部署,很简单。

docker run -itd --name redis -p 6379:6379 \--privileged=true \-v /redis/data:/data --restart always redis \--requirepass "psd"

然后,我们来部署下canal-server,我们也是采用docker部署。

 docker run -p 11111:11111 --name canal -d canal/canal-server:v1.1.4

进入容器内部,修改配置文件

docker exec -it canal /bin/bash# 修改配置文件vi canal-server/conf/example/instance.properties################################################### mysql serverId , 修改id,不要和mysql 主节点一致即可----------canal.instance.mysql.slaveId=2canal.instance.gtidon=false# 修改 mysql 主节点的ip----------canal.instance.master.address=ip:3306canal.instance.tsdb.enable=true# username/password 授权的数据库账号密码----------canal.instance.dbUsername=canalcanal.instance.dbPassword=123456canal.instance.connectionCharset = UTF-8canal.instance.enableDruid=false# mysql 数据解析关注的表,正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠 \\,所有表:.* 或 .*\\..*canal.instance.filter.regex=.*\\..*canal.instance.filter.black.regex=

重启容器,进入容器查看日志。

docker restart canal

docker exec -it canal /bin/bashtail -100f canal-server/logs/example/example.log

接下来我们来部署一下最后一项Nginx。这里我们直接部署OpenResty,这里面涵盖Nginx和Lua。

依次执行以下命令。

# add the yum repo:wget https://openresty.org/package/centos/openresty.reposudo mv openresty.repo /etc/yum.repos.d/# update the yum index:sudo yum check-updatesudo yum install openresty#安装命令行工具sudo yum install openresty-resty# 列出所有 openresty 仓库里的软件包sudo yum --disablerepo="*" --enablerepo="openresty" list available#查看版本resty -V

OK,前置的服务器环境部署完成啦,下面我们开始编码环节。

首先我们要先创建一个数据库表格。然后对于这个表写一套增删改查逻辑。这快我们创建一个商品表。

CREATE TABLE `product` (`id` bigint NOT NULL AUTO_INCREMENT,`title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,`cover_img` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '封面图',`amount` decimal(10,2) DEFAULT NULL COMMENT '现价',`summary` varchar(2048) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '概要',`detail` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_bin COMMENT '详情',`gmt_modified` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,`gmt_create` datetime DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;

创建SpringBoot项目,加入redis、mysql、mybatis、canal依赖,并且配置yml文件。

 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>3.0.6</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency></dependencies>

编写商品的增删改查逻辑。具体的编码这里就不做展示啦,就是一些基础的增删改查。大家在操作时可以去下载源码包,我会上传到CSDN上。

/** * @author lixiang * @date 2023/6/25 17:20 */public interface ProductService {/** * 新增商品 * @param product */void addProduct(ProductDO product);/** * 修改商品 * @param product */void updateProduct(ProductDO product);/** * 删除商品 * @param id */void deleteProductById(Long id);/** * 根据ID查询商品 * @param id * @return */ProductDO selectProductById(Long id);/** * 分页查询商品信息 * @param current * @param size * @return */Map<String,Object> selectProductList(int current, int size);}

启动项目验证没有问题,我们开始下一步的操作。在开发canal监听时,我们先了解一下什么是ApplicationRunner

ApplicationRunnerSpring Boot框架提供的一个接口,它用于在Spring Boot应用程序启动后运行一些任务或代码。当应用程序启动后,如果想在启动后自动执行一些任务或初始化操作,可以使用这个接口。

使用步骤:创建一个类,并实现该接口,并在该类上添加@Component注解,以确保Spring Boot能够扫描该类并执行它的run方法。

/** * @author lixiang * @date 2023/6/29 23:08 */@Component@Slf4jpublic class CanalRedisConsumer implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("CanalRedisConsumer执行");}}

Ok,接下来我们主要关注核心逻辑的实现就可以啦。直接上代码。

/** * 这里我们直接操作redis的String类型 * @author lixiang * @date 2023/6/29 23:08 */@Component@Slf4jpublic class CanalRedisConsumer implements ApplicationRunner {@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void run(ApplicationArguments args) throws Exception {// 创建一个 CanalConnector 连接器CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("payne.f3322.net", 11111),"example", "", "");try {// 连接 Canal Server,尝试多次重连while (true) {try {canalConnector.connect();break;} catch (CanalClientException e) {log.info("Connect to Canal Server failed, retrying...");}}log.info("Connect to Canal Server success");//订阅数据库表,默认监听所有的数据库、表,等同于:.*\\..*canalConnector.subscribe(".*\\..*");// 回滚到上一次的 batchId,取消已经消费过的日志canalConnector.rollback();// 持续监听 Canal Server 推送的数据,并将数据写入 Redis 中while (true) {Message message = canalConnector.getWithoutAck(100);long batchId = message.getId();// 如果没有新数据,则暂停固定时间后继续获取if (batchId == -1 || message.getEntries().isEmpty()) {try {Thread.sleep(1000);continue;} catch (InterruptedException e) {e.printStackTrace();}}//处理数据for (CanalEntry.Entry entry : message.getEntries()) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChange = null;try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("Error parsing Canal Entry.", e);}String table = entry.getHeader().getTableName();CanalEntry.EventType eventType = rowChange.getEventType();log.info("Canal监听数据变化,DB:{},Table:{},Type:{}",entry.getHeader().getSchemaName(),table,eventType);// 变更后的新数据for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {deleteData(table, rowData);} else {insertOrUpdateData(table, rowData);}}}try {canalConnector.ack(batchId);} catch (Exception e) {// 回滚所有未确认的 BatchcanalConnector.rollback(batchId);}}} finally {canalConnector.disconnect();}}/** * 删除行数据 */private void deleteData(String table, CanalEntry.RowData rowData) {List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();JSONObject json = new JSONObject();columns.forEach(column->json.put(column.getName(), column.getValue()));String key = table + ":" + columns.get(0).getValue();log.info("Redis中删除Key为: {} 的数据",key);redisTemplate.delete(key);}/** * 新增或者修改数据 */private void insertOrUpdateData(String table, CanalEntry.RowData rowData) {List<CanalEntry.Column> columns = rowData.getAfterColumnsList();JSONObject json = new JSONObject();columns.forEach(column->json.put(column.getName(), column.getValue()));String key = table + ":" + columns.get(0).getValue();log.info("Redis中新增或修改Key为: {} 的数据",key);redisTemplate.opsForValue().set(key, json);}}

然后,我们开发商品增删改查的接口。

/** * @author lixiang * @date 2023/6/30 17:48 */@RestController@RequestMapping("/api/v1/product")public class ProductController {@Autowiredprivate ProductService productService;/** * 新增 * @param product * @return */@PostMapping("/save")public String save(@RequestBody ProductDO product){int flag = productService.addProduct(product);return flag==1" />"SUCCESS":"FAIL";}/** * 修改 * @param product * @return */@PostMapping("/update")public String update(@RequestBody ProductDO product){int flag = productService.updateProduct(product);return flag==1?"SUCCESS":"FAIL";}/** * 根据ID查询 * @param id * @return */@GetMapping("/findById")public ProductDO update(@RequestParam("id") Long id){ProductDO productDO = productService.selectProductById(id);return productDO;}/** * 分页查询 * @param current * @param size * @return */@GetMapping("/page")public Map<String, Object> update(@RequestParam("current") int current,@RequestParam("size") int size){Map<String, Object> stringObjectMap = productService.selectProductList(current, size);return stringObjectMap;}}

新增一个商品进行验证。


接下来我们将SpringBoot程序打包放到服务器上运行。

mvn clean package

守护进程启动nohup java -jar multi-level-cache-1.0-SNAPSHOT.jar& 

我们验证了DB变化同步缓存,接下来我们来开发,通过Nginx直接读取Redis部分。

首先,我们先了解下什么是OpenResty, 为什么要用OpenResty?

OpenResty由章亦春发起,是基于Ngnix和Lua的高性能web平台,内部集成精良的LUa库、第三方模块、依赖, 开发者可以方便搭建能够处理高并发、扩展性极高的动态web应用、web服务、动态网关。 OpenResty将Nginx核心、LuaJIT、许多有用的Lua库和Nginx第三方模块打包在一起。Nginx是C语言开发,如果要二次扩展是很麻烦的,而基于OpenResty,开发人员可以使用 Lua 编程语言对 Nginx 核心模块进行二次开发拓展。性能强大,OpenResty可以快速构造出1万以上并发连接响应的超高性能Web应用系统。
  • 对于一些高性能的服务来说,可以直接使用 OpenResty 访问 Mysql或Redis等,

  • 不需要通过第三方语言(PHP、Python、Ruby)等来访问数据库再返回,这大大提高了应用的性能

那么什么是Lua脚本呢?

Lua 由标准 C 编写而成,没有提供强大的库,但可以很容易的被 C/C++ 代码调用,也可以反过来调用 C/C++ 的函数。 在应用程序中可以被广泛应用,不过Lua是一种脚本/动态语言,不适合业务逻辑比较重的场景,适合小巧的应用场景

Ok,接下来我们就开始开发Nginx通过Lua脚本直接读取Redis部分。

-- 引入需要使用到的库local redis = require "resty.redis"local redis_server = "ip地址"local redis_port = 6379local redis_pwd = "123456"-- 获取 Redis 中存储的数据local function get_from_redis(key)local red = redis:new()local ok, err = red:connect(redis_server, redis_port)red:auth(redis_pwd)if not ok then-- 如果从 Redis 中获取数据失败,将错误信息写入 Nginx 的错误日志中ngx.log(ngx.ERR, "failed to connect to Redis: ", err)return ""endlocal result, err = red:get(key)if not result thenngx.log(ngx.ERR, "failed to get ", key, " from Redis: ", err)return ""end-- 将 Redis 连接放回连接池中red:set_keepalive(10000, 100)return resultend-- 获取缓存数据local function get_cache_data()-- 获取当前请求的 URIlocal uri = ngx.var.uri-- 获取当前请求的 id 参数local id = ngx.var.arg_id-- 将 URI 写入 Nginx 的错误日志中ngx.log(ngx.ERR, "URI: ", uri) -- 将当前请求的所有参数写入 Nginx 的错误日志中ngx.log(ngx.ERR, "Args: ", ngx.var.args)local start_pos = string.find(uri, "/", 6) + 1local end_pos = string.find(uri, "/", start_pos)-- 截取第三个和第四个斜杠之间的子串local cache_prefix = string.sub(uri, start_pos, end_pos - 1) -- Redis 中键的名称由子串和 id 组成local key = cache_prefix .. ":" .. idlocal result = get_from_redis(key)if result == nil or result == ngx.null or result == "" then-- Redis 中未命中,需要到服务器后端获取数据ngx.log(ngx.ERR, "not hit cache, key = ", key)else-- Redis 命中,返回结果ngx.log(ngx.ERR, "hit cache, key = ", key)-- 直接将 Redis 中存储的结果返回给客户端ngx.say(result)-- 结束请求,客户端无需再等待响应ngx.exit(ngx.HTTP_OK)endend-- 执行获取缓存数据的功能get_cache_data()

将Lua脚本放到服务器制定的目录下。

新建lua目录,创建cache.lua文件。

然后,我们进行配置nginx。

  • nginx配置反向代理,结合lua脚本读取redis。
  • 如果redis缓存命中,则读取缓存数据直接返回。
  • 如果缓存未命中,则反向代理请求后端接口获取数据返回。
#usernobody;worker_processes1;#error_loglogs/error.log;#error_loglogs/error.lognotice;#error_loglogs/error.loginfo;#pidlogs/nginx.pid;events {worker_connections1024;}http {include mime.types;default_typeapplication/octet-stream;#配置下编码,不然浏览器会乱码charset utf-8;#log_formatmain'$remote_addr - $remote_user [$time_local] "$request" '#'$status $body_bytes_sent "$http_referer" '#'"$http_user_agent" "$http_x_forwarded_for"';#access_loglogs/access.logmain;sendfileon;#tcp_nopush on;#keepalive_timeout0;keepalive_timeout65;#gzipon;# 这里设置为 off,是为了避免每次修改之后都要重新 reload 的麻烦。# 在生产环境上需要 lua_code_cache 设置成 on。lua_code_cache off; # 虚拟机主机块,还需要配置lua文件扫描路径lua_package_path "$prefix/lualib/" />
./nginx -c /usr/local/openresty/nginx/conf/nginx.conf -s reload

nginx启动成功。

OK,我们接下来进行测试验证,首先先访问Redis中有缓存的数据。通过Nginx直接访问Redis。

http://payne.f3322.net:8888/api/v1/product/findById" />=3

然后我们在访问Redis中没有缓存的数据。没有命中到缓存,直接穿透到SpringBoot程序。

http://payne.f3322.net:8888/api/v1/product/findById" />=2

新增数据时,会往Redis中进行同步。

http://payne.f3322.net:8888/api/v1/product/save{"title":"Mac Pro 13","coverImg":"/group/4.png","amount":"19999.00","summary":"Mac Pro 13","detail":"Mac Pro 13"}

OK,至此多级缓存案例就完成啦。记得给博主一个三连哦!