苍穹之边,浩瀚之挚,眰恦之美; 悟心悟性,善始善终,惟善惟道! —— 朝槿《朝槿兮年说》
写在开头
在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。
主要原因是,对于多线程实现实现并发,一直以来,多线程都存在2个问题:
- 线程之间内存共享,需要通过加锁进行控制,但是加锁会导致性能下降,同时复杂的加锁机制也会增加编程编码难度
- 过多线程造成线程之间的上下文切换,导致效率低下
因此,在并发编程领域中,一直有一个很重要的设计原则: “ 不要通过内存共享来实现通信,而应该通过通信来实现内存共享。”
简单来说,就是尽可能通过消息通信,而不是内存共享来实现进程或者线程之间的同步。
关健术语
本文用到的一些关键词语以及常用术语,主要如下:
- 并发(Concurrent): 在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行。
- 并行(Parallel): 当系统有一个以上CPU时,当一个CPU执行一个进程时,另一个CPU可以执行另一个进程,两个进程互不抢占CPU资源,可以同时进行。
- 信号量(Semaphore): 是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用,也是作系统用来解决并发中的互斥和同步问题的一种方法。
- 信号量机制(Semaphores): 用来解决同步/互斥的问题的,它是1965年,荷兰学者 Dijkstra提出了一种卓有成效的实现进程互斥与同步的方法。
- 管程(Monitor) : 一般是指管理共享变量以及对共享变量的操作过程,让它们支持并发的一种机制。
- 互斥(Mutual Exclusion):一个公共资源同一时刻只能被一个进程或线程使用,多个进程或线程不能同时使用公共资源。即就是同一时刻只允许一个线程访问共享资源的问题。
- 同步(Synchronization):两个或两个以上的进程或线程在运行过程中协同步调,按预定的先后次序运行。即就是线程之间如何通信、协作的问题。
- 对象池(Object Pool): 指的是一次性创建出 N 个对象,之后所有的线程重复利用这 N 个对象,当然对象在被释放前,也是不允许其他线程使用的, 一般指保存实例对象的容器。
基本概述
在Java领域中,我们可以将锁大致分为基于Java语法层面(关键词)实现的锁和基于JDK层面实现的锁。
在Java领域中, 尤其是在并发编程领域,对于多线程并发执行一直有两大核心问题:同步和互斥。其中:
- 互斥(Mutual Exclusion):一个公共资源同一时刻只能被一个进程或线程使用,多个进程或线程不能同时使用公共资源。即就是同一时刻只允许一个线程访问共享资源的问题。
- 同步(Synchronization):两个或两个以上的进程或线程在运行过程中协同步调,按预定的先后次序运行。即就是线程之间如何通信、协作的问题。
针对对于这两大核心问题,利用管程是能够解决和实现的,因此可以说,管程是并发编程的万能钥匙。
虽然,Java在基于语法层面(synchronized 关键字)实现了对管程技术,但是从使用方式和性能上来说,内置锁(synchronized 关键字)的粒度相对过大,不支持超时和中断等问题。
为了弥补这些问题,从JDK层面对其“重复造轮子”,在JDK内部对其重新设计和定义,甚至实现了新的特性。
在Java领域中,从JDK源码分析来看,基于JDK层面实现的锁大致主要可以分为以下4种方式:
- 基于Lock接口实现的锁:JDK1.5版本提供的ReentrantLock类
- 基于ReadWriteLock接口实现的锁:JDK1.5版本提供的ReentrantReadWriteLock类
- 基于AQS基础同步器实现的锁:JDK1.5版本提供的并发相关的同步器Semaphore,CyclicBarrier以及CountDownLatch等
- 基于自定义API操作实现的锁:JDK1.8版本中提供的StampedLock类
从阅读源码不难发现,在Java SDK 并发包主要通过AbstractQueuedSynchronizer(AQS)实现多线程同步机制的封装与定义,而通过Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。
一.AQS基础同步器基本理论
在Java领域中,同步器是专门为多线程并发设计的同步机制,主要是多线程并发执行时线程之间通过某种共享状态来实现同步,只有当状态满足这种条件时线程才往下执行的一种同步机制。
一个标准的AQS同步器主要有同步状态机制,等待队列,条件队列,独占模式,共享模式等五大核心要素组成。
在Java领域中,JDK的JUC(java.util.concurrent.)包中提供了各种并发工具,但是大部分同步工具的实现基于AbstractQueuedSynchronizer类实现,其内部结构主要如下:
- 同步状态机制(Synchronization Status):主要用于实现锁(Lock)机制,是指同步状态,其要求对于状态的更新必须原子性的
- 等待队列(Wait Queue):主要用于存放等待线程获取到的锁资源,并且把线程维护到一个Node(节点)里面和维护一个非阻塞的CHL Node FIFO(先进先出)队列,主要是采用自旋锁+CAS操作来保证节点插入和移除的原子性操作。
- 条件队列(Condition Queue):用于实现锁的条件机制,一般主要是指替换“等待-通知”工作机制,主要是通过ConditionObject对象实现Condition接口提供的方法实现。
- 独占模式(Exclusive Mode):主要用于实现独占锁,主要是基于静态内部类Node的常量标志EXCLUSIVE来标识该节点是独占模式
- 共享模式(Shared Mode):主要用于实现共享锁,主要是基于静态内部类Node的常量标志SHARED来标识该节点是共享模式
我们可以得到一个比较通用的并发同步工具基础模型,大致包含如下几个内容,其中:
- 条件变量(Conditional Variable): 利用线程间共享的变量进行同步的一种工作机制
- 共享变量((Shared Variable)):一般指对象实体对象的成员变量和属性
- 阻塞队列(Blocking Queue):共享变量(Shared Variable)及其对共享变量的操作统一封装
- 等待队列(Wait Queue):每个条件变量都对应有一个等待队列(Wait Queue),内部需要实现入队操作(Enqueue)和出队操作(Dequeue)方法
- 变量状态描述机(Synchronization Status):描述条件变量和共享变量之间状态变化,又可以称其为同步状态
- 工作模式(Operation Mode): 线程资源具有排他性,因此定义独占模式和共享模式两种工作模式
综上所述,条件变量和等待队列的作用是解决线程之间的同步问题;共享变量与阻塞队列的作用是解决线程之间的互斥问题。
二. JDK显式锁统一概念模型
在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。
综合Java领域中的并发锁的各种实现与应用分析来看,一把锁或者一种锁,基本上都会包含以下几个方面:
- 锁的同步器工作机制:主要是考虑共享模式还是独享模式,是否支持超时机制,以及是否支持超时机制?
- 锁的同步器工作模式:主要是基于AQS基础同步器封装内部同步器,是否考虑公平/非公平模式?
- 锁的状态变量机制: 主要锁的状态设置,是否共享状态变量?
- 锁的队列封装定义:主要是指等待队列和条件队列,是否需要条件队列或者等待队列定义?
- 锁的底层实现操作: 主要是指底层CL锁和CAS操作,是否需要考虑自旋锁或者CAS操作实例对象方法?
- 锁的组合实现新锁: 主要是基于独占锁和共享锁,是否考虑对应API自定义操作实现?
综上所述,大致可以根据上述这些方向,我们便可以清楚?️知道Java领域中各种锁实现的基本理论时和实现思想。
六.CountDownLatch(闭锁)的设计与实现
在Java领域中,CountDownLatch(闭锁)是针对于Java多线程并发控制中倒计数器的具体数量,主要是采用递减计数方式的倒计数器思想和基于AQS基础同步器来实现的一种同步器工具类。
CountDownLatch(闭锁)是Java多线程并发中最常见的一种同步器,从锁的性质上来看,属于共享锁,其功能相当于一个多线程环境下的倒数门闩。
CountDownLatch通过定义一个倒计数器,在并发环境下由线程进行递减1操作,当计数值变为0之后,被await方法阻塞的线程将会唤醒。
通过CountDownLatch可以实现线程间的计数同步。
1. 设计思想
一般来说,通过定义一个倒计数器,为了让某个线程或者多个线程在某个运行节点上等待N个条件都满足后,才让所有的线程继续往下执行,其中倒计数器的数量则为N,每满足一个条件,倒计数器就依次逐渐递减1,直到N-1=0的时,所有等待的线程才往下继续执行。
CountDownLatch类最早是在JDK1.5版本提供的,从设计思想上来看,主要包括倒计数器的同步器,控制阻塞等待的方法,倒计数器的递减操作方法等3个核心要素。其中:
- 倒计数器的同步器:基于AQS基础抽象队列同步器封装内置实现一个静态的内置同步类,主要用于设置倒计数器的初始值以及定制AQS基础同步器的获取和释放共享锁。
- 倒计数器的初始值: 一般在构建CountDownLatch类时指定,表示的是需要等待条件的个数,即就是倒计数器的具体的资源数量Source(N)。
- 控制线程阻塞等待的方法:定义一个控制线程阻塞等待的方法,当倒计数器的具体的资源数量 Source(N)>0时,调用方法使其线程进入阻塞等待状态。
- 倒计数器的递减操作方法:定义一个倒计数器的递减操作方法,调用方法就会把倒计数器递减1,当倒计数器的具体的资源数量 Source(N)-1=0时,所有等待的线程才往下继续执行。
简单来说,CountDownLatch主要是让某个线程或者多个线程,等待其他线程完成某件事情或者某个任务结束之后才能继续执行。
2. 基本实现
在CountDownLatch类的JDK1.8版本中,对于CountDownLatch的基本实现如下:
public class CountDownLatch { private final Sync sync; /** * CountDownLatch锁-构造一个倒计数器 */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } /** * CountDownLatch锁-基于AQS定义支持同步器实现 */ private static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860 L; //......其他方法代码 } /** * CountDownLatch锁-线程等待方法 */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * CountDownLatch锁-倒计数器递减操作 */ public void countDown() { sync.releaseShared(1); } //... 其他代码}
- 倒计数器同步器:基于AQS基础定义支持同步器实现一个静态私有化的同步器Sync类,其中定义了获取和释放共享锁的两个方法
- 线程等待方法:主要是提供了一个await()方法,其本质是调用的是AQS基础同步器中的acquireSharedInterruptibly(int arg)方法,否则throws InterruptedException异常
- 倒计数器递减操作方法: 主要是提供了一个countDown()方法,其本质是调用的是AQS基础同步器中的releaseShared(int arg) 方法
2.1 基于AQS同步器封装静态内部Sync抽象类
/** * CountDownLatch锁-基于AQS同步器封装一个内部的同步器 */private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); }/** * CountDownLatch锁-获取共享锁方法 */ protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }/** * CountDownLatch锁-释放共享锁方法 */ protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
- 实现方式: 主要基于AQS封装的内部静态抽象Sync同步类实现,使用的AQS的共享模式
- 主要方法: 主要定制适配提供了tryAcquireShared()和tryReleaseShared()方法,即就是tryAcquireShared()用于获取共享锁,tryReleaseShared()方法用于释放共享锁,其中:
- 获取共享锁tryAcquireShared()方法:首先获取状态变量status,这里是指倒计数器中的数量,当status=0时,返回值=1,表示获取锁成功;否则,status !=0 时,返回值=-1,表示获取共享锁失败进行入队。
- 释放共享锁tryReleaseShared()方法: 通过自旋来实现递减操作,其中会获取状态变量status,将其递减1后使用compareAndSetState(c, nextc)方法通过CAS修改状态值
- 锁获取方式: 主要是利用getCount()来获取倒计数器中的数量,同时还可以利用构造方法指导一个倒计数器中的数量。
3. 具体实现
public class CountDownLatch { private final Sync sync; /*** CountDownLatch锁-基于AQS基础同步器实现一个内部同步器*/ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374 L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } /*** CountDownLatch锁-构造一个倒计数器*/ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } /*** CountDownLatch锁-基于AQS定义支持同步器实现*/ private static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860 L; //......其他方法代码 } /*** CountDownLatch锁-线程等待方法*/ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /*** CountDownLatch锁-返回当前计数器*/ public long getCount() { return sync.getCount(); } /*** CountDownLatch锁-线程等待方法(支持超时机制)*/ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /*** CountDownLatch锁-倒计数器递减操作*/ public void countDown() { sync.releaseShared(1); }}
- 倒计数初始值:通过构造方法CountDownLatch(int count)指定一个倒计数器的初始值,其必须大于0,否则会throw new IllegalArgumentException(“count < 0")
- 线程等待方法: 主要提供了await() 方法和await(long timeout, TimeUnit unit)方法,其中:
- 无参数await() 方法: 一般默认的方法,其本质是调用AQS同步器中的acquireSharedInterruptibly()方法,主要表示支持中断机制
- 有参数await(long timeout, TimeUnit unit)方法: 是用于实现超时机制,其本质是调用AQS同步器中的tryAcquireSharedNanos(int arg, long nanosTimeout)方法
- 倒计数递减操作方法:主要是countDown() 方法, 其本质是调用AQS同步器中的releaseShared(int arg) 方法,核心实现是AQS基础同步器的doReleaseShared方法。
- 其他方法: 主要是getCount() 方法,用来获取倒计数个数,其本质是调用AQS同步器中getCount()方法,来获取状态变量
综上所述,从一定意义上讲,CountDownLatch是一种共享锁,属于AQS基础抽象队列同步器中共享模式孵化的产物,没有支持公平模式与非公平模式的实现。
七.CyclicBarrier(循环屏障)的设计与实现
在Java领域中,CyclicBarrier(循环屏障)是针对于Java多线程并发控制中倒计数器的线程数量,主要是采用递减计数方式的倒计数器思想和基于AQS基础同步器实现的ReentrantLock锁来实现的一种同步器工具类。
CyclicBarrier(循环屏障)是Java中通过对线程预定义设置一个屏障,只有当到达屏障的线程数量到达指定的最大屏障时,屏障才会让这些线程通过执行。
从一定意义上来讲,这里的屏障本质上还是一个倒计数器,倒计数器的最大限度支持的数量就是我们为线程设置屏障大小,其工作原理与CountDownLatch(闭锁)类似,都是通过让线程阻塞等待时,倒计数器执行递减1运算。
但是与CountDownLatch不同是,CyclicBarrier(循环屏障)是基于ReentrantLock(可重入锁)来实现的,更准确的说,CyclicBarrier是对ReentrantLock的应用实例。
1. 设计思想
一般来说,通过定义一个倒计数器,为了让某个线程或者多个线程在某个运行节点上约束N个线程,需要让指定数量的线程共同到达某一个节点之后,这些线程才会一起被执行。
CyclicBarrier(循环屏障)最早是在JDK1.5版本中提供的,从设计思想上来看,主要包括倒计数器的最大屏障,控制阻塞等待的方法,倒计数器的递减操作方法,和触发点线程任务等4个核心要素。其中:
- 倒计数器的同步器: 主要基于ReentrantLock来实现控制线程对象,其本质还是基于AQS基础同步器实现。
- 倒计数器的最大屏障数量:一般是在构建CyclicBarrier(循环屏障)对象是预定义设置,表示需要在某个运行节点上约束的线程数量。
- 控制线程阻塞等待的方法:定义一个方法,使得实现阻塞线程让其进入等待状态。
- 倒计数器的递减操作方法:定义一个方法,使得让倒计数器进行递减1运算,直到达到屏障时,等待的线程才继续执行。
- 触发点线程任务:一般指的是当指定数量的线程达到设置的屏障时,才会去触发执行的任务。
简单来说,CyclicBarrier(循环屏障)是让多个线程互相等待,直到达到一个同步的运行节点。再继续一起执行。
2. 基本实现
在CyclicBarrier类的JDK1.8版本中,对于CountDownLatch的基本实现如下:
public class CyclicBarrier {/** CyclicBarrier锁—屏障lock实体 */private final ReentrantLock lock = new ReentrantLock();/** CyclicBarrier锁—屏障条件队列 */private final Condition trip = lock.newCondition();/** CyclicBarrier锁—屏障最大值 */private final int parties;/** CyclicBarrier锁—屏障触发线程任务目标 */private final Runnable barrierCommand;/** CyclicBarrier锁—当前计数器的最大值屏障实例 */private Generation generation = new Generation();/** CyclicBarrier锁—当前计数器的最大值屏障实例 */private int count;/** CyclicBarrier锁—屏障实例 */private static class Generation {boolean broken = false;}/** CyclicBarrier锁—构造一个屏障实例(不带触发任务的) */public CyclicBarrier(int parties) {this(parties, null);}/** CyclicBarrier锁—构造一个屏障实例(带触发任务的) */public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;}/** CyclicBarrier锁—无参数构造一个等待方法(默认模式) */public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}/** CyclicBarrier锁—有参数构造一个等待方法(支持超时机制) */public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout));}/** CyclicBarrier锁—更新状态变量 */private void nextGeneration() {// signal completion of last generationtrip.signalAll();// set up next generationcount = parties;generation = new Generation();}/** CyclicBarrier锁—阻塞屏障 */private void breakBarrier() {generation.broken = true;count = parties;trip.signalAll();}//...其他代码}
- 预定义设置屏障最大值: 主要是通过变量parties来实现预定义设置屏障最大值
- 设置当前屏障数量:主要是通过变量count来实现
- 控制线程的对象实例: 主要是通过ReentrantLock和Condition来控制线程间通信
- 触发目标任务对象: 主要是通过Runable来定义barrierCommand变量
- 提供了两个构造方法:都需要预定义指定屏障最大值parties,其中一个需要传入barrierAction触发点任务
- 线程阻塞等待方法:主要提供了2个await()方法,其中:
- 无参数await()方法:默认处理方式,不支持超时机制,其核心处理逻辑在dowait(boolean timed, long nanos)方法中实现
- 有参数await()方法:指定参数处理,支持超时机制,其核心处理逻辑在dowait(boolean timed, long nanos)方法中实现
- 屏障设置关健方法:主要是breakBarrier() 来实现,其中:
- 通知到达屏障的所有线程:主要是通过Condition中的signalAll()来通知屏障中所有线程已经满足条件
- 屏障设置:默认预定义设置屏障最大值与设置当前屏障数相同,主要设置count = parties
- 更新屏障状态:主要是通过generation.broken = true来实现
- 更新屏障的状态:主要是提供了nextGeneration() 方法,表示已经到达预定义设置屏障最大值,其中:
- 通知到达屏障的所有线程:主要是通过Condition中的signalAll()来通知屏障中所有线程已经满足条件
- 准备下一轮屏障设置:意味着预定义设置屏障最大值与设置当前屏障数相同,主要设置count = parties
- 重置屏障状态:主要是通过generation = new Generation()来实现
�一般来说,假设我们允许控制的最大线程数量为N,预定义设置屏障最大值为Parties(N), 当前屏障的线程数量为Current(N) ,当前屏障中的等待线程数量为Waiting(N),那么我们会得到一个计算公式:
2.1 构造Generation屏障实例标记
private static class Generation {boolean broken = false;}
主要是构造了一个静态私有化的Generation类,其中定义了一个broken变量来作为屏障标记,默认初始值为false,表示还没达到屏障最大值。
2.1 线程阻塞等待核心dowait方法
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {// [1].实例化构建ReentrantLock的对象final ReentrantLock lock = this.lock;// [2].通过lock()获取锁或者说加锁操作lock.lock();try {// [3].实例化构建Generation屏障实例对象final Generation g = generation;// [4].判断Generation屏障实例标记状态if (g.broken)throw new BrokenBarrierException();// [5].判断Thread是包含中断标志位if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// [6].对倒计数器的屏障数量递减1运算int index = --count;// [7].依据结果index == 0表示当前指定的线程数量到达屏障最大值,需要触发Runnable任务if (index == 0) { // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;// 进行下一轮屏障设置nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// [7].自旋操作for (;;) {try {// 判断是否超时if (!timed)trip.await();else if (nanos > 0L)// 进行下一轮屏障设置nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// 是否发生线程中断Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;// 如果等待时间超过指定超时时间,throw new TimeoutExceptionif (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {// 最后释放锁操作lock.unlock();}}
- 加锁操作: 实例化构建ReentrantLock的对象,通过lock()方法进行加锁操作
- 判断屏障实例标记状态:实例化构建Generation实例标记,判断屏障实例标记状态是否一致,如果不一致则throw new BrokenBarrierException();
- 判断当前线程是否被中断: 判断Thread是包含中断标志位,如果中断throw new InterruptedException()并调用breakBarrier()重新设置屏障
- 屏障倒计数器递减运算:对倒计数器的屏障数量递减1运算,即就是对当前倒计数器的当前值减去1
- 触发节点线程任务: 当前倒计数器的当前值为0时,需要触发Runnable任务,并调用nextGeneration方法开启下一轮操作;否则,当前倒计数器的当前值不为0时,调用awaitNanos(nanos)方法进入等待状态
- 自旋操作判断超时: 如果使用了超时参数,调用awaitNanos(nanos)方法进入等待状态,其中如果发生中断则调用Thread.currentThread().interrupt()设置中断标记。如果等待时间> 指定超时时间,抛出throw new TimeoutException()异常
- 释放锁: 通过unlock()方法进行解锁操作,并释放锁
3. 具体实现
在CyclicBarrier类的JDK1.8版本中,对于CyclicBarrier的具体实现如下:
public class CyclicBarrier {/** CyclicBarrier锁—屏障lock实体 */private final ReentrantLock lock = new ReentrantLock();/** CyclicBarrier锁—屏障条件队列 */private final Condition trip = lock.newCondition();/** CyclicBarrier锁—屏障最大值 */private final int parties;/** CyclicBarrier锁—屏障触发线程任务目标 */private final Runnable barrierCommand;/** CyclicBarrier锁—当前计数器的最大值屏障实例 */private Generation generation = new Generation();/** CyclicBarrier锁—当前计数器的最大值屏障实例 */private int count;/** CyclicBarrier锁—屏障实例 */private static class Generation {boolean broken = false;}/** CyclicBarrier锁—构造一个屏障实例(不带触发任务的) */public CyclicBarrier(int parties) {this(parties, null);}/** CyclicBarrier锁—构造一个屏障实例(带触发任务的) */public CyclicBarrier(int parties, Runnable barrierAction) {if (parties 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;// 如果等待时间超过指定超时时间,throw new TimeoutExceptionif (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {// 最后释放锁操作lock.unlock();}}/** CyclicBarrier锁—获取当前等屏障等待数量 */public int getNumberWaiting() {final ReentrantLock lock = this.lock;lock.lock();try {return parties - count;} finally {lock.unlock();}}/** CyclicBarrier锁—获取当前等屏障数量 */public int getParties() {return parties;}/** CyclicBarrier锁—判断当前屏障 */public boolean isBroken() {final ReentrantLock lock = this.lock;lock.lock();try {return generation.broken;} finally {lock.unlock();}}/** CyclicBarrier锁—重置屏障数量 */public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {breakBarrier(); // break the current generationnextGeneration(); // start a new generation} finally {lock.unlock();}}}
主要需要注意如下几个方法,都是基于ReentrantLock来实现加锁和解锁操作的,其中:
- getNumberWaiting()方法: 获取当前屏障中等待的线程数量
- reset() 方法:当一轮屏障操作结束,需要重置屏障中最大线程数量
- isBroken() 方法:判断是否到达屏障最大值
综上所述,从一定意义上讲,CyclicBarrier是一种可重入锁,属于ReentrantLock的应用实例,其中加锁和解锁操作都是独占模式的。
八.Semaphore(信号量)的设计与实现
在Java领域中,Semaphore(信号量)是针对于Java多线程并发控制中实现对公共资源的线程数量进行并发同时访问控制,主要是采用指定一个最大许可数的思想和基于AQS基础同步器来实现的一种同步器工具类。
Semaphore可以用来控制在同一时刻访问共享资源的线程数量,通过协调各个线程以保证共享资源的合理使用。
Semaphore维护了一组虚拟许可,它的数量可以通过构造器的参数指定。
线程在访问共享资源前,必须调用Semaphore的acquire()方法获得许可,如果许可数量为0,该线程就一直阻塞。
线程在访问共享资源后,必须调用Semaphore的release()方法释放许可。
1. 设计思想
一般来说,通过定义一个倒计数器,为了控制最多N个线程同时访问公共资源,其计数器的最大值Max(N)是被许可的最多N个线程数量,即就是许可的最大值N。
Semaphore类最早是在JDK1.5版本提供的,从设计思想上来看,主要包括倒计数器的最大许可数,同步器工作模式,获取锁方法,释放锁方法等4个核心要素。其中:
- 同步器工作模式:基于AQS基础抽象队列同步器封装内置实现一个静态的内置同步器抽象类,然后基于这个抽象类分别实现了公平同步器和非公平同步器,用来指定和描述同步器工作模式是公平模式还是非公平模式。
- 公平/非公平模式:主要描述的是多个线程在同时获取锁时是否按照先到先得的顺序获取锁,如果是则为公平模式,否则为非公平模式。
- 获取锁方法:主要定义了一个lock()方法来获取锁,表示假如锁已经被其他线程占有或持有,其当前获取锁的线程则进入等待状态。
- 释放锁方法:主要定义了一个unlock()方法来释放锁,表示假如锁已经被其他线程放弃或释放,其当前获取锁的线程则获得该锁。
2. 基本实现
在JDK1.8版本中,对于Semaphore的基本实现如下:
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210 L; /** * Semaphore锁- 封装同步器 */ private final Sync sync; /** * Semaphore锁- 封装同步器 */ abstract static class Sync extends AbstractQueuedSynchronizer { //....其他代码 } /** * Semaphore锁- 构造一个令牌许可(默认非公模式) */ public Semaphore(int permits) { sync = new NonfairSync(permits); } /** * Semaphore锁- 构造一个令牌许可(可选公平/非公模式) */ public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } /** * Semaphore锁- 获取锁方法(默认一个且可中断机制) */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * Semaphore锁- 获取锁方法(可选指定多个且可中断机制) */ public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } /** * Semaphore锁- 获取锁方法(默认多个且不可中断机制) */ public void acquireUninterruptibly() { sync.acquireShared(1); } /** * Semaphore锁- 获取锁方法(指定多个且不可中断机制) */ public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); } /** * Semaphore锁-释放锁方法(默认一个) */ public void release() { sync.releaseShared(1); } /** * Semaphore锁-释放锁方法(可选指定多个) */ public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }}
- 内部同步器:基于AQS基础同步器封装和定义了一个静态内部Sync抽象类,其中抽象了一个内置锁lock()方法
- 同步器工作模式:主要提供了 2个构造方法,其中无参数构造方法表示的是默认的工作模式,有参数构造方法主要依据参数来实现指定的工作模式
- 获取锁方法: 主要提供了3个基于acquire方法,用于获取锁共享锁,其中:
- 无参数acquire()方法:获取共享锁的一般模式,默认指定一个许可和支持可中断机制
- 有参数acquire()方法:获取共享锁的指定模式,可选指定多个许可且支持可中断机制
- 无参数acquireUninterruptibly()方法:获取共享锁的指定模式,默认指定一个许可且不支持可中断机制
- 释放锁方法: 主要是提供了2个release()方法用于释放锁共享锁,其中:
- 无参数release()方法:释放共享锁的一般模式,默认指定一个许可和支持可中断机制
- 有参数release()方法:释放共享锁的指定模式,可选指定多个许可且支持可中断机制
2.1 基于AQS同步器封装静态内部Sync抽象类
/** * Semaphore锁- 基于AQS基础同步器封装同步器 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933 L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } /** * Semaphore锁- 非公平模式获取共享锁 */ final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } /** * Semaphore锁- 释放共享锁 */ protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } /** * Semaphore锁- 自旋+compareAndSetState通过CAS操作重置令牌许可数 */ final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
- 实现方式:主要是基于AQS基础同步器封装了一个静态的的Sync抽象类,通过构造方法指定一个最大的令牌许可数量
- 主要方法:主要是看共享锁的获取nonfairTryAcquireShared()方法和释放锁tryReleaseShared()方法,其中:
- 获取锁nonfairTryAcquireShared()方法:非公平模式下获取共享锁,利用自旋+compareAndSetState()方法通过CAS操作,保证并发修改令牌许可数量
- 释放锁tryReleaseShared(i)方法: 公平/非公平模式下释放共享锁,利用自旋+compareAndSetState()方法通过CAS操作释放,会把释放的令牌许可数量增加到当前剩余的令牌许可数量中。
- 令牌许可操作方法:主要提供了drainPermits() 方法 和reducePermits() 方法,其中:
- drainPermits() 方法:主要是利用自旋+compareAndSetState()方法通过CAS操作重置令牌许可数
- reducePermits() 方法:主要是自旋+compareAndSetState)方法通过CAS操作递减计算操作令牌许可数
- 获取锁方式:令牌许可数量QS基础同步器状态变量对应,通过getPermits() 方法来获取令牌许可数量,本质是调用AQS基础同步器中的getState()来获取状态变量。
特别指出的是,这里的非公平模式主要描述的是,在令牌许可数量允许的情况下,让所有线程进行自旋操作,其实就是不关心线程到来的顺序,将全部线程放到一起去参与竞争令牌许可。
其中,主要还利用compareAndSetState方法来进行CAS操作,保证修改令牌许可数量的原子性操作。
一般来说,假设我们允许控制的最大线程数量为N,剩余令牌许可数量为Remanent(N), 当前可用令牌许可数量为Current(N) , 消耗令牌许可数量为Reduction(N),那么我们会得到一个计算公式:
�即就意味着,剩余令牌许可数量等于当前可用令牌许可数量与消耗令牌许可数量之差。
由此可见,在公平/非公平模式下,我们对于对于获取锁和释放锁时,对于剩余令牌许可数量Remanent(N)计算都满足以下公式:
- 首先,在线程在访问共享资源前,我们可以允许的最大值为Available(N),自旋获取锁的数量为Acquires(N),那么我们在获取锁时:
- �其次,在线程在访问共享资源后,自旋释放锁的数量为Releases(N),那么我们在释放锁时:
当然,需要注意的的一个问题,就是当剩余令牌许可数量Remanent(N) < 0时,表示当前线程会进入阻塞等待状态。
2.2 基于Sync抽象类封装FairSync公平同步器
/** * Semaphore锁- 基于Sync抽象类封装FairSync公平同步器 */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944 L; /** * Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可 */ FairSync(int permits) { super(permits); } /** * Semaphore锁- Semaphore锁- 公平模式释放共享锁 */ protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
- 实现方式: 主要是在基于静态内部Sync抽象类来实现,构造了一个可指定大小的的令牌许可
- 主要方法: 主要是提供了一个tryAcquireShared方法,其中利用hasQueuedPredecessors()来保证公平性
- 工作机制: 通过基于AQS基础同步器中的等待队列来实现公平机制
需要注意的是,在未达到最大的令牌许可数量时,所有线程都不会进入等待队列中。
2.3 基于Sync抽象类封装NonfairSync非公平同步器
/** * Semaphore锁- 基于Sync抽象类封装NonfairSync非公平同步器 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898 L; /** * Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可 */ NonfairSync(int permits) { super(permits); } /** * Semaphore锁- Semaphore锁- 非公平模式释放共享锁 */ protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
- 实现方式: 主要是在基于静态内部Sync抽象类来实现,构造了一个可指定大小的的令牌许可
- 主要方法: 主要是提供了一个tryAcquireShared方法,其中主要是调用了静态内部Sync抽象类nonfairTryAcquireShared方法。
- 工作机制: 通过自旋操作让所有线程竞争获取令牌许可,本质还是采用了AQS基础同步器中闯入策略到打破公平的
3. 具体实现
在JDK1.8版本中,对于Semaphore的具体实现如下:
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210 L; /** * Semaphore锁- 封装同步器 */ private final Sync sync; /** * Semaphore锁- 基于AQS基础同步器封装同步器 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933 L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } /** * Semaphore锁- 非公平模式获取共享锁 */ final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } /** * Semaphore锁- 释放共享锁 */ protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } /** * Semaphore锁- 自旋+compareAndSetState通过CAS操作重置令牌许可数 */ final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } /** * Semaphore锁- 基于Sync抽象类封装FairSync公平同步器 */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944 L; /** * Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可 */ FairSync(int permits) { super(permits); } /** * Semaphore锁- Semaphore锁- 公平模式释放共享锁 */ protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } /** * Semaphore锁- 基于Sync抽象类封装NonfairSync非公平同步器 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898 L; /** * Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可 */ NonfairSync(int permits) { super(permits); } /** * Semaphore锁- Semaphore锁- 非公平模式释放共享锁 */ protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } /** * Semaphore锁- 构造一个令牌许可(默认非公模式) */ public Semaphore(int permits) { sync = new NonfairSync(permits); } /** * Semaphore锁- 构造一个令牌许可(可选公平/非公模式) */ public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } /** * Semaphore锁- 获取锁方法(默认一个且可中断机制) */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * Semaphore锁- 获取锁方法(可选指定多个且可中断机制) */ public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } /** * Semaphore锁- 获取锁方法(默认多个且不可中断机制) */ public void acquireUninterruptibly() { sync.acquireShared(1); } /** * Semaphore锁- 获取锁方法(指定多个且不可中断机制) */ public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); } /** * Semaphore锁-释放锁方法(默认一个) */ public void release() { sync.releaseShared(1); } /** * Semaphore锁-释放锁方法(可选指定多个) */ public void release(int permits) { if (permits = 0; } /** * Semaphore锁-尝试获取锁方法(可选指定多个) */ public boolean tryAcquire(int permits) { if (permits = 0; } /** * Semaphore锁-尝试获取锁方法(可选指定多个并且支持超时机制) */ public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); } /** * Semaphore锁-尝试获取锁方法(默认一个并且支持超时机制) */ public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * Semaphore锁-统计可以令牌许可数 */ public int availablePermits() { return sync.getPermits(); } /** * Semaphore锁-重置令牌许可数 */ public int drainPermits() { return sync.drainPermits(); } /** * Semaphore锁-递减计算令牌许可数 */ protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); } /** * Semaphore锁-判断是否公平模式 */ public boolean isFair() { return sync instanceof FairSync; } /** * Semaphore锁-判断队列中是否存在线程对象 */ public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * Semaphore锁-获取队列长度 */ public final int getQueueLength() { return sync.getQueueLength(); } /** * Semaphore锁-获取队列的线程对象 */ protected Collection getQueuedThreads() { return sync.getQueuedThreads(); }}
- 信号量同步器: 主要是提供了2个构造方法来实现令牌许可的管理,其中:
- 默认非公平模式:依据指定传入的令牌许可数量permits直接实例化NonfairSync非公平同步器
- 可选公平/非公平模式:依据指定传入的令牌许可数量permits和公平标记fair来实例化NonfairSync非公平同步器和FairSync公平同步器,其中,当fair=true时,是公平平模式,否则为非公平模式
- �支持可中断机制:主要是提供了2个acquire()方法来获取锁,其中:
- 无参数acquire()方法:一般模式获取共享锁,主要是基于AQS基础同步器中的acquireSharedInterruptibly(int arg)来实现,其核心逻辑是doAcquireSharedInterruptibly(int arg)来操纵。
- 有参数acquire()方法:依据指定传入的令牌许可数量permits来判断,当permits< 0时,直接throw new IllegalArgumentException();否则,直接调用acquireSharedInterruptibly(permits)方法。
- 支持不可中断机制:主要是提供了2个acquireUninterruptibly() 方法,其中:
- 无参数acquireUninterruptibly() 方法:一般模式获取共享锁,主要是基于AQS基础同步器中acquireShared(int arg)方法来实现,其核心逻辑是doAcquireShared(int arg) 来操纵。
- 有参数acquireUninterruptibly() 方法:依据指定传入的令牌许可数量permits来判断,当permits< 0时,直接throw new IllegalArgumentException();否则,直接调用acquireShared(int arg)方法。
- 非公平模式获取锁方式: 主要提供了2个tryAcquire() 方法,其中:
- 无参数tryAcquire() 方法:非公平模式尝试获取共享锁,直接调用的是非公平同步器中的nonfairTryAcquireShared(int acquires) 方法。
- 有参数tryAcquire() 方法:依据指定传入的令牌许可数量permits来判断,当permits< 0时,直接throw new IllegalArgumentException();否则,直接调用nonfairTryAcquireShared(int acquires) 方法。
- 公平模式获取锁方式:主要提供了2个tryAcquire() 方法,支持超时机制。其中:
- 无参数tryAcquire() 方法:公平模式尝试获取共享锁,依据指定传入的令牌许可数量permits来判断,当permits< 0时,直接throw new IllegalArgumentException();否则,直接调用的是AQS基础同步器中的tryAcquire(int permits, long timeout, TimeUnit unit)方法,其核心逻辑是tryAcquireSharedNanos(int arg, long nanosTimeout)来操纵。
- 有参数tryAcquire() 方法:公平模式尝试获取共享锁,默认支持一个许可,直接调用的是AQS基础同步器中的tryAcquire(1,long timeout, TimeUnit unit)方法,其核心逻辑是tryAcquireSharedNanos(int arg, long nanosTimeout)来操纵。
- 释放锁操作方式:主要提供了2个release()方法,其中:
- 无参数release() 方法:公平/非公平模式示范锁操作,默认支持一个许可,主要是直接调用AQS基础同步器中的releaseShared(int arg) 方法
- 有参数release() 方法:公平/非公平模式示范锁操作,依据指定传入的令牌许可数量permits来判断,当permits< 0时,直接throw new IllegalArgumentException();否则,主要是直接调用AQS基础同步器中的releaseShared(int arg) 方法
- 令牌许可操作方法:主要提供了availablePermits() 方法,reducePermits(int reduction)方法 以及drainPermits() 方法,其中:
- availablePermits() 方法:获取可用的令牌许可数量,主要是调用内部同步器中getPermits()方法。
- reducePermits()方法:计算剩余可用令牌许可数量,依据指定传入的令牌许可数量reduction来判断,当reduction< 0时,直接throw new IllegalArgumentException();否则,调用内部同步器中reducePermits()方法。
- drainPermits() 方法:重置可用令牌许可数量,主要是调用内部同步器中drainPermits()方法。
- 队列操作方法:主要提供了hasQueuedThreads()方法,getQueuedThreads() 方法以及getQueueLength() 方法,其中:
- hasQueuedThreads()方法:主要是用于获取队列中是否存在等待获取令牌许可的线程对象,主要是直接使用AQS基础同步器的hasQueuedThreads()来实现。
- getQueuedThreads() 方法:主要是用于获取队列中等待获取令牌许可的线程对象,主要是直接使用AQS基础同步器的getQueuedThreads()来实现。
- getQueueLength() 方法:主要是用于获取队列中等待获取令牌许可的数量,主要是直接使用AQS基础同步器的getQueueLength()来实现。
综上所述,从一定意义上讲,Semaphore是一种共享锁,属于AQS基础抽象队列同步器中共享模式孵化的产物,支持公平模式与非公平模式,默认是使用非公平模式。
写在最后
通过对Java领域中,JDK内部提供的各种锁的实现来看,一直围绕的核心主要还是基于AQS基础同步器来实现的,但是AQS基础同步器不是一种非它不可的技术标准规范,更多的只是一套技术参考指南。
但是,实际上,Java对于锁的实现与运用远远不止这些,还有相位器(Phaser)和交换器(Exchanger),以及在Java JDK1.8版本之前并发容器ConcurrentHashMap中使用的分段锁(Segment)。
不论是何种实现和应用,在Java并发编程领域来讲,都是围绕线程安全问题的角度去考虑的,只是针对于各种各样的业务场景做的具体的实现。
一定意义上来讲,对线程加锁只是并发编程的实现方式之一,相对于实际应用来说,Java领域中的锁都只是一种单一应用的锁,只是给我们掌握Java并发编程提供一种思想没,三言两语也不可能详尽。
到此为止,这算是对于Java领域中并发锁的最终章,文中表述均为个人看法和个人理解,如有不到之处,忘请谅解也请给予批评指正。
最后,技术研究之路任重而道远,愿我们熬的每一个通宵,都撑得起我们想在这条路上走下去的勇气,未来仍然可期,与各位程序编程君共勉!
版权声明:本文为博主原创文章,遵循相关版权协议,如若转载或者分享请附上原文出处链接和链接来源。