分布式锁之基于redis实现分布式锁(二)

2. 基于redis实现分布式锁

2.1. 基本实现

借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。

图片[1] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

  1. 多个客户端同时获取锁(setnx)

  2. 获取成功,执行业务逻辑,执行完成释放锁(del)

  3. 其他客户端等待重试

改造StockService方法:

@Servicepublic class StockService {@Autowiredprivate StockMapper stockMapper;@Autowiredprivate StringRedisTemplate redisTemplate;public void deduct() {// 加锁setnxBoolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "111");// 重试:递归调用if (!lock){try {Thread.sleep(50);this.deduct();} catch (InterruptedException e) {e.printStackTrace();}} else {try {// 1. 查询库存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判断库存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣减库存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {// 解锁this.redisTemplate.delete("lock");}}}}

其中,加锁也可以使用循环:

// 加锁,获取锁失败重试while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "111")){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}}

解锁:

// 释放锁this.redisTemplate.delete("lock");

使用Jmeter压力测试如下:

图片[2] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

2.2. 防死锁

图片[3] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

问题:setnx刚刚获取到锁,当前服务器宕机,导致del释放锁无法执行,进而导致锁无法锁无法释放(死锁)

解决:给锁设置过期时间,自动释放锁。

设置过期时间两种方式:

  1. 通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)

  2. 使用set指令设置过期时间:set key value ex 3 nx(既达到setnx的效果,又设置了过期时间)

图片[4] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

压力测试肯定也没有问题。

2.3. 防误删

问题:可能会释放其他服务器的锁。

场景:如果业务逻辑的执行时间是7s。执行流程如下

  1. index1业务逻辑没执行完,3秒后锁被自动释放。

  2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。

  3. index3获取到锁,执行业务逻辑

  4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。

    最终等于没锁的情况。

解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁

图片[5] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

实现如下:

图片[6] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

问题:删除操作缺乏原子性。

场景

  1. index1执行删除时,查询到的lock值确实和uuid相等

  2. index1执行删除前,lock刚好过期时间已到,被redis自动释放

  3. index2获取了lock

  4. index1执行删除,此时会把index2的lock删除

解决方案:没有一个命令可以同时做到判断 + 删除,所有只能通过其他方式实现(LUA脚本

2.4. redis中的lua脚本

2.4.1. 现实问题

redis采用单线程架构,可以保证单个命令的原子性,但是无法保证一组命令在高并发场景下的原子性。例如:

图片[7] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

在串行场景下:A和B的值肯定都是3

在并发场景下:A和B的值可能在0-6之间。

极限情况下1:

图片[8] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

则A的结果是0,B的结果是3

极限情况下2:

图片[9] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

则A和B的结果都是6

如果redis客户端通过lua脚本把3个命令一次性发送给redis服务器,那么这三个指令就不会被其他客户端指令打断。Redis 也保证脚本会以原子性(atomic)的方式执行: 当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。 这和使用 MULTI/ EXEC 包围的事务很类似。

但是MULTI/ EXEC方法来使用事务功能,将一组命令打包执行,无法进行业务逻辑的操作。这期间有某一条命令执行报错(例如给字符串自增),其他的命令还是会执行,并不会回滚。

2.4.2. lua介绍

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

设计目的

其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

Lua 特性

  • 轻量级:它用标准C语言编写并以源代码形式开放,编译后仅仅一百余K,可以很方便的嵌入别的程序里。

  • 可扩展:Lua提供了非常易于使用的扩展接口和机制:由宿主语言(通常是C或C++)提供这些功能,Lua可以使用它们,就像是本来就内置的功能一样。

  • 其它特性:

    • 支持面向过程(procedure-oriented)编程和函数式编程(functional programming);

    • 自动内存管理;只提供了一种通用类型的表(table),用它可以实现数组,哈希表,集合,对象;

    • 语言内置模式匹配;闭包(closure);函数也可以看做一个值;提供多线程(协同进程,并非操作系统所支持的线程)支持;

    • 通过闭包和table可以很方便地支持面向对象编程所需要的一些关键机制,比如数据抽象,虚函数,继承和重载等。

2.4.3. lua基本语法

对lua脚本感兴趣的同学,请移步到官方教程或者《菜鸟教程》。这里仅以redis中可能会用到的部分语法作介绍。

a = 5 -- 全局变量local b = 5 -- 局部变量, redis只支持局部变量a, b = 10, 2*x-- 等价于 a=10; b=2*x

流程控制:

if( 布尔表达式 1)then --[ 在布尔表达式 1 为 true 时执行该语句块 --]elseif( 布尔表达式 2)then --[ 在布尔表达式 2 为 true 时执行该语句块 --]else--[ 如果以上布尔表达式都不为 true 则执行该语句块 --]end

2.4.4. redis执行lua脚本 – EVAL指令

在redis中需要通过eval命令执行lua脚本。

格式:

EVAL script numkeys key [key ...] arg [arg ...]script:lua脚本字符串,这段Lua脚本不需要(也不应该)定义函数。numkeys:lua脚本中KEYS数组的大小key [key ...]:KEYS数组中的元素arg [arg ...]:ARGV数组中的元素

案例1:基本案例

EVAL "return 10" 0

输出:(integer) 10

案例2:动态传参

EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 5 10 20 30 40 50 60 70 80 90# 输出:10 20 60 70EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 10 20# 输出:0EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 20 10# 输出:1

传入了两个参数10和20,KEYS的长度是1,所以KEYS中有一个元素10,剩余的一个20就是ARGV数组的元素。

redis.call()中的redis是redis中提供的lua脚本类库,仅在redis环境中可以使用该类库。

案例3:执行redis类库方法

set aaa 10-- 设置一个aaa值为10EVAL "return redis.call('get', 'aaa')" 0# 通过return把call方法返回给redis客户端,打印:"10"

注意:脚本里使用的所有键都应该由 KEYS 数组来传递。但并不是强制性的,代价是这样写出的脚本不能被 Redis 集群所兼容。

案例4:给redis类库方法动态传参

EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 bbb 20

图片[10] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

学到这里基本可以应付redis分布式锁所需要的脚本知识了。

案例5:pcall函数的使用(了解)

-- 当call() 在执行命令的过程中发生错误时,脚本会停止执行,并返回一个脚本错误,输出错误信息EVAL "return redis.call('sets', KEYS[1], ARGV[1]), redis.call('set', KEYS[2], ARGV[2])" 2 bbb ccc 20 30-- pcall函数不影响后续指令的执行EVAL "return redis.pcall('sets', KEYS[1], ARGV[1]), redis.pcall('set', KEYS[2], ARGV[2])" 2 bbb ccc 20 30

注意:set方法写成了sets,肯定会报错。

图片[11] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

2.5. 使用lua保证删除原子性

删除LUA脚本:

if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end

代码实现:

public void deduct() {String uuid = UUID.randomUUID().toString();// 加锁setnxwhile (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS)) {// 重试:循环try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}}try {// this.redisTemplate.expire("lock", 3, TimeUnit.SECONDS);// 1. 查询库存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判断库存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣减库存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {// 先判断是否自己的锁,再解锁String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +"then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end";this.redisTemplate.execute(new DefaultRedisScript(script, Boolean.class), Arrays.asList("lock"), uuid);}}

压力测试,库存量也没有问题,截图略过。。。

2.6. 可重入锁

由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行。

用一段 Java 代码解释可重入:

public synchronized void a() {b();}public synchronized void b() {// pass}

假设 X 线程在 a 方法获取锁之后,继续执行 b 方法,如果此时不可重入,线程就必须等待锁释放,再次争抢锁。

锁明明是被 X 线程拥有,却还需要等待自己释放锁,然后再去抢锁,这看起来就很奇怪,我释放我自己~

可重入性就可以解决这个尴尬的问题,当线程拥有锁之后,往后再遇到加锁方法,直接将加锁次数加 1,然后再执行方法逻辑。退出加锁方法之后,加锁次数再减 1,当加锁次数为 0 时,锁才被真正的释放。

可以看到可重入锁最大特性就是计数,计算加锁的次数。所以当可重入锁需要在分布式环境实现时,我们也就需要统计加锁次数。

解决方案:redis + Hash

2.6.1. 加锁脚本

Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的锁的重入次数,然后利用 lua 脚本判断逻辑。

if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) thenredis.call('hincrby', KEYS[1], ARGV[1], 1);redis.call('expire', KEYS[1], ARGV[2]);return 1;elsereturn 0;end

假设值为:KEYS:[lock], ARGV[uuid, expire]

如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1。

2.6.2. 解锁脚本

-- 判断 hash set 可重入 key 的值是否等于 0-- 如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败-- 如果为 0 代表 可重入次数被减 1-- 如果为 1 代表 该可重入 key 解锁成功if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then return nil; elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then return 0; else redis.call('del', KEYS[1]); return 1; end;

2.6.3. 代码实现

由于加解锁代码量相对较多,这里可以封装成一个工具类:

图片[12] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

DistributedLockClient工厂类具体实现:

@Componentpublic class DistributedLockClient {@Autowiredprivate StringRedisTemplate redisTemplate;private String uuid;public DistributedLockClient() {this.uuid = UUID.randomUUID().toString();}public DistributedRedisLock getRedisLock(String lockName){return new DistributedRedisLock(redisTemplate, lockName, uuid);}}

DistributedRedisLock实现如下:

public class DistributedRedisLock implements Lock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private long expire = 30;public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = uuid;}@Overridepublic void lock() {this.tryLock();}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {try {return this.tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/** * 加锁方法 * @param time * @param unit * @return * @throws InterruptedException */@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time != -1){this.expire = unit.toSeconds(time);}String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +" redis.call('hincrby', KEYS[1], ARGV[1], 1) " +" redis.call('expire', KEYS[1], ARGV[2]) " +" return 1 " +"else " +" return 0 " +"end";while (!this.redisTemplate.execute(new DefaultRedisScript(script, Boolean.class), Arrays.asList(lockName), getId(), String.valueOf(expire))){Thread.sleep(50);}return true;}/** * 解锁方法 */@Overridepublic void unlock() {String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +"then " +" return nil " +"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +"then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end";Long flag = this.redisTemplate.execute(new DefaultRedisScript(script, Long.class), Arrays.asList(lockName), getId());if (flag == null){throw new IllegalMonitorStateException("this lock doesn't belong to you!");}}@Overridepublic Condition newCondition() {return null;}/** * 给线程拼接唯一标识 * @return */String getId(){return uuid + ":" + Thread.currentThread().getId();}}

2.6.4. 使用及测试

在业务代码中使用:

public void deduct() {DistributedRedisLock redisLock = this.distributedLockClient.getRedisLock("lock");redisLock.lock();try {// 1. 查询库存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判断库存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣减库存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {redisLock.unlock();}}

测试:

图片[13] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

测试可重入性:

图片[14] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

2.7. 自动续期

lua脚本:

if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then redis.call('expire', KEYS[1], ARGV[2]); return 1; else return 0; end

在RedisDistributeLock中添加renewExpire方法:

public class DistributedRedisLock implements Lock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private long expire = 30;public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = uuid + ":" + Thread.currentThread().getId();}@Overridepublic void lock() {this.tryLock();}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {try {return this.tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/** * 加锁方法 * @param time * @param unit * @return * @throws InterruptedException */@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time != -1){this.expire = unit.toSeconds(time);}String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +" redis.call('hincrby', KEYS[1], ARGV[1], 1) " +" redis.call('expire', KEYS[1], ARGV[2]) " +" return 1 " +"else " +" return 0 " +"end";while (!this.redisTemplate.execute(new DefaultRedisScript(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))){Thread.sleep(50);}// 加锁成功,返回之前,开启定时器自动续期this.renewExpire();return true;}/** * 解锁方法 */@Overridepublic void unlock() {String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +"then " +" return nil " +"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +"then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end";Long flag = this.redisTemplate.execute(new DefaultRedisScript(script, Long.class), Arrays.asList(lockName), uuid);if (flag == null){throw new IllegalMonitorStateException("this lock doesn't belong to you!");}}@Overridepublic Condition newCondition() {return null;}// String getId(){// return this.uuid + ":" + Thread.currentThread().getId();// }private void renewExpire(){String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +" return redis.call('expire', KEYS[1], ARGV[2]) " +"else " +" return 0 " +"end";new Timer().schedule(new TimerTask() {@Overridepublic void run() {if (redisTemplate.execute(new DefaultRedisScript(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {renewExpire();}}}, this.expire * 1000 / 3);}}

在tryLock方法中使用:

图片[15] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

构造方法作如下修改:

图片[16] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

解锁方法作如下修改:

图片[17] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

2.8. 手写分步式锁小结

特征

  1. 独占排他:setnx

  2. 防死锁:

    redis客户端程序获取到锁之后,立马宕机。给锁添加过期时间

    不可重入:可重入

  3. 防误删:

    先判断是否自己的锁才能删除

  4. 原子性:

    加锁和过期时间之间:set k v ex 3 nx

    判断和释放锁之间:lua脚本

  5. 可重入性:hash(key field value) + lua脚本

  6. 自动续期:Timer定时器 + lua脚本

  7. 在集群情况下,导致锁机制失效:

    1. 客户端程序10010,从主中获取锁

    2. 从还没来得及同步数据,主挂了

    3. 于是从升级为主

    4. 客户端程序10086就从新主中获取到锁,导致锁机制失效

锁操作

加锁:

  1. setnx:独占排他 死锁、不可重入、原子性

  2. set k v ex 30 nx:独占排他、死锁 不可重入

  3. hash + lua脚本:可重入锁

    1. 判断锁是否被占用(exists),如果没有被占用则直接获取锁(hset/hincrby)并设置过期时间(expire)

    2. 如果锁被占用,则判断是否当前线程占用的(hexists),如果是则重入(hincrby)并重置过期时间(expire)

    3. 否则获取锁失败,将来代码中重试

  4. Timer定时器 + lua脚本:实现锁的自动续期

    判断锁是否自己的锁(hexists == 1),如果是自己的锁则执行expire重置过期时间

解锁

  1. del:导致误删

  2. 先判断再删除同时保证原子性:lua脚本

  3. hash + lua脚本:可重入 1. 判断当前线程的锁是否存在,不存在则返回nil,将来抛出异常

    1. 存在则直接减1(hincrby -1),判断减1后的值是否为0,为0则释放锁(del),并返回1

    2. 不为0,则返回0

重试:递归 循环

2.9. 红锁算法

redis集群状态下的问题:

  1. 客户端A从master获取到锁

  2. 在master将锁同步到slave之前,master宕掉了。

  3. slave节点被晋级为master节点

  4. 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。

安全失效

解决集群下锁失效,参照redis官方网站针对redlock文档:https://redis.io/docs/manual/patterns/distributed-locks/

在算法的分布式版本中,我们假设有N个Redis服务器。这些节点是完全独立的,因此我们不使用复制或任何其他隐式协调系统。前几节已经描述了如何在单个实例中安全地获取和释放锁,在分布式锁算法中,将使用相同的方法在单个实例中获取和释放锁。将N设置为5是一个合理的值,因此需要在不同的计算机或虚拟机上运行5个Redis主服务器,确保它们以独立的方式发生故障。

为了获取锁,客户端执行以下操作:

  1. 客户端以毫秒为单位获取当前时间的时间戳,作为起始时间

  2. 客户端尝试在所有N个实例中顺序使用相同的键名、相同的随机值来获取锁定。每个实例尝试获取锁都需要时间,客户端应该设置一个远小于总锁定时间的超时时间。例如,如果自动释放时间为10秒,则尝试获取锁的超时时间可能在5到50毫秒之间。这样可以防止客户端长时间与处于故障状态的Redis节点进行通信:如果某个实例不可用,尽快尝试与下一个实例进行通信。

  3. 客户端获取当前时间 减去在步骤1中获得的起始时间,来计算获取锁所花费的时间。当且仅当客户端能够在大多数实例(至少3个)中获取锁时,并且获取锁所花费的总时间小于锁有效时间,则认为已获取锁。

  4. 如果获取了锁,则将锁有效时间减去 获取锁所花费的时间,如步骤3中所计算。

  5. 如果客户端由于某种原因(无法锁定N / 2 + 1个实例或有效时间为负)而未能获得该锁,它将尝试解锁所有实例(即使没有锁定成功的实例)。

每台计算机都有一个本地时钟,我们通常可以依靠不同的计算机来产生很小的时钟漂移。只有在拥有锁的客户端将在锁有效时间内(如步骤3中获得的)减去一段时间(仅几毫秒)的情况下终止工作,才能保证这一点。以补偿进程之间的时钟漂移

当客户端无法获取锁时,它应该在随机延迟后重试,以避免同时获取同一资源的多个客户端之间不同步(这可能会导致脑裂的情况:没人胜)。同样,客户端在大多数Redis实例中尝试获取锁的速度越快,出现裂脑情况(以及需要重试)的窗口就越小,因此理想情况下,客户端应尝试将SET命令发送到N个实例同时使用多路复用。

值得强调的是,对于未能获得大多数锁的客户端,尽快释放(部分)获得的锁有多么重要,这样就不必等待锁定期满才能再次获得锁(但是,如果发生了网络分区,并且客户端不再能够与Redis实例进行通信,则在等待密钥到期时需要付出可用性损失)。

2.10. redisson中的分布式锁

图片[18] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

图片[19] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

官方文档地址:Home · redisson/redisson Wiki · GitHub

2.10.1. 可重入锁(Reentrant Lock)

基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

RLock对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

RLock lock = redisson.getLock("anyLock");// 最常见的使用方法lock.lock();// 加锁以后10秒钟自动解锁// 无需调用unlock方法手动解锁lock.lock(10, TimeUnit.SECONDS);// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);if (res) { try { ... } finally { lock.unlock(); }}
  1. 引入依赖

org.redissonredisson3.11.2
  1. 添加配置

@Configurationpublic class RedissonConfig {@Beanpublic RedissonClient redissonClient(){Config config = new Config();// 可以用"rediss://"来启用SSL连接config.useSingleServer().setAddress("redis://172.16.116.100:6379");return Redisson.create(config);}}
  1. 代码中使用

@Autowiredprivate RedissonClient redissonClient;public void checkAndLock() {// 加锁,获取锁失败重试RLock lock = this.redissonClient.getLock("lock");lock.lock();// 先查询库存是否充足Stock stock = this.stockMapper.selectById(1L);// 再减库存if (stock != null && stock.getCount() > 0){stock.setCount(stock.getCount() - 1);this.stockMapper.updateById(stock);}// 释放锁lock.unlock();}
  1. 压力测试

性能跟我们手写的区别不大。

图片[20] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

数据库也没有问题

2.10.2. 公平锁(Fair Lock)

基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。

RLock fairLock = redisson.getFairLock("anyLock");// 最常见的使用方法fairLock.lock();// 10秒钟以后自动解锁// 无需调用unlock方法手动解锁fairLock.lock(10, TimeUnit.SECONDS);// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);fairLock.unlock();

2.10.3. 联锁(MultiLock)

基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。

RLock lock1 = redissonInstance1.getLock("lock1");RLock lock2 = redissonInstance2.getLock("lock2");RLock lock3 = redissonInstance3.getLock("lock3");RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);// 同时加锁:lock1 lock2 lock3// 所有的锁都上锁成功才算成功。lock.lock();...lock.unlock();

2.10.4. 红锁(RedLock)

基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。

RLock lock1 = redissonInstance1.getLock("lock1");RLock lock2 = redissonInstance2.getLock("lock2");RLock lock3 = redissonInstance3.getLock("lock3");RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);// 同时加锁:lock1 lock2 lock3// 红锁在大部分节点上加锁成功就算成功。lock.lock();...lock.unlock();

2.10.5. 读写锁(ReadWriteLock)

基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。

分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");// 最常见的使用方法rwlock.readLock().lock();// 或rwlock.writeLock().lock();// 10秒钟以后自动解锁// 无需调用unlock方法手动解锁rwlock.readLock().lock(10, TimeUnit.SECONDS);// 或rwlock.writeLock().lock(10, TimeUnit.SECONDS);// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);// 或boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);...lock.unlock();

添加StockController方法:

@GetMapping("test/read")public String testRead(){String msg = stockService.testRead();return "测试读";}@GetMapping("test/write")public String testWrite(){String msg = stockService.testWrite();return "测试写";}

添加StockService方法:

public String testRead() {RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");rwLock.readLock().lock(10, TimeUnit.SECONDS);System.out.println("测试读锁。。。。");// rwLock.readLock().unlock();return null;}public String testWrite() {RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");rwLock.writeLock().lock(10, TimeUnit.SECONDS);System.out.println("测试写锁。。。。");// rwLock.writeLock().unlock();return null;}

打开开两个浏览器窗口测试:

  • 同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始

  • 同时访问读:不用等待

  • 先写后读:读要等待(约10s)写完成

  • 先读后写:写要等待(约10s)读完成

2.10.6. 信号量(Semaphore)

基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

RSemaphore semaphore = redisson.getSemaphore("semaphore");semaphore.trySetPermits(3);semaphore.acquire();semaphore.release();

在StockController添加方法:

@GetMapping("test/semaphore")public String testSemaphore(){this.stockService.testSemaphore();return "测试信号量";}

在StockService添加方法:

public void testSemaphore() {RSemaphore semaphore = this.redissonClient.getSemaphore("semaphore");semaphore.trySetPermits(3);try {semaphore.acquire();TimeUnit.SECONDS.sleep(5);System.out.println(System.currentTimeMillis());semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}}

添加测试用例:并发10次,循环一次

图片[21] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

控制台效果:

控制台1:1606960790234160696080033716069608004431606960805248控制台2:160696079032816069607953321606960800245控制台3:160696079043316069607952381606960795437

由此可知:

1606960790秒有3次请求进来:每个控制台各1次

1606960795秒有3次请求进来:控制台2有1次,控制台3有2次

1606960800秒有3次请求进来:控制台1有2次,控制台2有1次

1606960805秒有1次请求进来:控制台1有1次

2.10.7. 闭锁(CountDownLatch)

基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");latch.trySetCount(1);latch.await();// 在其他线程或其他JVM里RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");latch.countDown();

需要两个方法:一个等待,一个计数countDown

给StockController添加测试方法:

@GetMapping("test/latch")public String testLatch(){this.stockService.testLatch();return "班长锁门。。。";}@GetMapping("test/countdown")public String testCountDown(){this.stockService.testCountDown();return "出来了一位同学";}

给StockService添加测试方法:

public void testLatch() {RCountDownLatch latch = this.redissonClient.getCountDownLatch("latch");latch.trySetCount(6);try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}}public void testCountDown() {RCountDownLatch latch = this.redissonClient.getCountDownLatch("latch");latch.trySetCount(6);latch.countDown();}

重启测试,打开两个页面:当第二个请求执行6次之后,第一个请求才会执行。

图片[22] - 分布式锁之基于redis实现分布式锁(二) - MaxSSL

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享