目录
- 并发编程需要解决的问题
- 锁
- 内置锁(synchronized)
- 显式锁
- Lock
- 可重入锁(ReentrantLock)
- ReentrantLock
- ReentrantReadWriteLock
- StampedLock
- 线程的等待和唤醒
- LockSupport
- AQS
- AbstractOwnableSynchronizer
- AbstractQueuedSynchronizer
- AbstractQueuedLongSynchronizer
并发编程需要解决的问题
并发编程需要解决的三个问题:可见性、原子性、有序性。
同步与互斥是解决并发编程问题的方法:
- 互斥是指某一资源同一时间只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法控制对资源的访问顺序
- 同步是指在互斥的基础上实现对资源的有序访问,即:也是不可以同时访问,并且还需要按照某种顺序来运行。
锁
锁可以解决三大问题可见性、原子性、有序性,提供对资源的同步和互斥的访问。
Java中提供的锁分为两类:内置锁(synchronized)和显式锁(Lock)
锁类型 | 可重入性 | 公平性 | 读写分离 | 锁降级 | 锁升级 | 独占锁 | 悲观读锁 | 乐观读锁 | 非阻塞锁 | 超时 | 优点 | 缺点 |
---|---|---|---|---|---|---|---|---|---|---|---|---|
synchronized | 可重入 | 非公平 | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 | 不支持 | 1 不需要显式的获取锁和释放锁 | 1 不支持非阻塞锁,线程拿不到锁就会一直等待 ,除了获取锁没有其他办法能够让其结束等待 |
ReentrantLock | 可重入 | 可配置 | 不支持 | 不支持 | 不支持 | 支持 | 支持 | 不支持 | 支持 | 支持 | 1 大部分情况下,性能优于内置锁 | 1 需要显式的获取锁和释放锁,增加了编程复杂度和出错几率 |
ReentrantReadWriteLock | 可重入 | 可配置 | 支持 | 支持 | 不支持 | 支持 | 支持 | 不支持 | 支持 | 支持 | 1 支持读写锁分离 | |
StampedLock | 非可重入 | 可配置 | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 | 1 支持乐观读锁,缓解锁饥饿问题,在读多写少的情况下效率提升 | 1 在读少写多的情况下效率不如其他锁 |
可重入性:同一个线程对于已经获得到的锁,可以多次继续申请到该锁的使用权
公平/非公平:如果在时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的。公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。
内置锁(synchronized)
Java内置锁不需要显式的获取锁和释放锁,由JVM内部来实现锁的获取与释放。而且任何一个对象都能作为一把内置锁。在 JDK1.4及之前就是使用 内置锁Synchronized来进行线程同步控制的
上文说,任何一个对象都能作为一把内置锁”,意味着synchronized关键字出现的地方,都有一个对象与之关联 ,具体表现为:
- 当synchronized作用于 普通方法 时,锁对象是 this ;
- 当synchronized作用于 静态方法 时,锁对象是 当前类的Class对象 ;
- 当synchronized作用于 代码块 时,锁对象是 synchronized(obj)中的这个obj。
package com.qupeng.concurrent.lock;public class SynchronizedObject {private Object lock = new Object();public static synchronized void lockClass() {}public static void lockClass1() {synchronized(SynchronizedObject.class) {}}public synchronized void lockThisObject() {}public void lockField() {synchronized(lock) {}}}
原理:
由 JVM 虚拟机内部实现,是基于 monitor 机制 ,每个对象都存在着一个 监视器(monitor)实与之关联,monitor的本质是依赖于底层操作系统的实现,称为内部锁或者Monitor锁。Monitor是线程私有的数据结构,每一个线程都有一个 可用monitor record列表 ,同时还有一个全局的可用列表。 每一个被锁住的对象都会和一个monitor关联 ,同时monitor中有一个 Owner字段 存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。
示例:用内置锁和Object.wait, Object.notify实现一个阻塞队列
package com.qupeng.concurrent.lock;import com.qupeng.concurrent.thread.MyThreadFactory;import com.qupeng.concurrent.thread.ThreadUtils;import java.util.PriorityQueue;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;public class SynchronizedTest {private static BlockingQueue<String> blockingQueue = new BlockingQueue<String>();private static AtomicInteger index = new AtomicInteger(1);public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(4, new MyThreadFactory());Producer producer1 = new Producer();Producer producer2 = new Producer();Consumer consumer1 = new Consumer();Consumer consumer2 = new Consumer();Future future1 = executorService.submit(producer1);Future future2 = executorService.submit(producer2);Future future3= executorService.submit(consumer1);Future future4 = executorService.submit(consumer2);try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}future1.cancel(true);future2.cancel(true);future3.cancel(true);future4.cancel(true);executorService.shutdown();System.out.println("Thread pool terminated.");}private static class Producer implements Callable<String> {@Overridepublic String call() throws Exception {while(!Thread.currentThread().isInterrupted()) {try {String item = String.valueOf(index.getAndIncrement());Thread.sleep(1000);blockingQueue.put(item);} catch (InterruptedException e) {ThreadUtils.printInterruptedMessage();break;}}return "";}}private static class Consumer implementsRunnable{@Overridepublic void run() {while(!Thread.currentThread().isInterrupted()) {try {String item = blockingQueue.take();Thread.sleep(1000);} catch (InterruptedException e) {ThreadUtils.printInterruptedMessage();break;}}}}private static class BlockingQueue<T> {private final int capacity = 1;private PriorityQueue<T> priorityQueue = new PriorityQueue<T>(capacity);public void put(T element) throws InterruptedException {synchronized (priorityQueue) {while (priorityQueue.size() == capacity) {ThreadUtils.printThreadMessage("Waiting.");priorityQueue.wait();}priorityQueue.add(element);ThreadUtils.printThreadMessage("Put item with index: " + element);priorityQueue.notifyAll();}}public T take() throws InterruptedException {synchronized (priorityQueue) {while (priorityQueue.size() == 0) {ThreadUtils.printThreadMessage("Waiting.");priorityQueue.wait();}T element =priorityQueue.poll();ThreadUtils.printThreadMessage("Poll item with index: " + element);priorityQueue.notifyAll();return element;}}}}
显式锁
Java内置锁的功能相对单一,不具备一些比较高级的锁功能:
- 限时抢锁:在抢锁时设置超时时长,如果超时还未获得锁就放弃,不至于无限等下去
- 中断抢锁:在抢锁时,外部线程给抢锁线程发出一个中断信号,就能唤起等待锁的线程,并且终止抢占过程。
- 一个锁多个等待队列:为锁维持多个等待队列,以便提高锁的效率。比如在生产者消费者模式实现中,生产者和消费者共用一把锁,该锁上维持两个等待队列,一个生产者队列,一个消费者队列。
除了以上的功能问题之外,Java对象锁还存在性能问题。在竞争稍微激烈的情况下,Java对象锁会膨胀为重量级锁(基于操作对象的Mutex Lock实现),而重量级锁的线程阻塞和唤醒操作,需要进程在内核状态和用户态之间来回切换,导致性能非常低。所以这个时候就需要引入一种新的锁。 Java显式锁就是为了解决这些Java对象的功能问题、性能问题而生。Lock是Java代码级别的锁。为了和Java对象锁区分,Lock接口叫显式锁接口,其对象实例叫显式锁对象。
Lock类结构如下图:
Lock
与使用synchronized方法和语句相比,Lock实现提供了更广泛的锁定操作。它们允许更灵活的结构,可能具有完全不同的属性,并且可能支持多个关联的 Condition 对象。
锁是一种控制多线程访问共享资源的工具。通常,锁提供对共享资源的独占访问:一次只有一个线程可以获取锁,并且对共享资源的所有访问都需要首先获取锁。但是,某些锁可能允许并发访问共享资源,例如 ReadWriteLock 的读锁。
同步方法或语句的使用提供了对与每个对象关联的隐式监视器锁的访问,但强制所有锁的获取和释放以块结构的方式发生:当获取多个锁时,它们必须以相反的顺序释放,并且所有锁必须在获得它们的相同词法范围内释放。
虽然同步方法和语句的作用域机制使使用监视器锁编程变得更加容易,并有助于避免许多涉及锁的常见编程错误,但在某些情况下,您需要以更灵活的方式使用锁。例如,一些遍历并发访问的数据结构的算法需要使用“hand-over-hand”或“链锁”:你获取节点 A 的锁,然后节点 B,然后释放 A 并获取 C,然后释放 B并获得 D 等等。 Lock 接口的实现通过允许在不同范围内获取和释放锁以及允许以任意顺序获取和释放多个锁来启用此类技术。
随着这种灵活性的增加,额外的责任也随之而来。块结构锁定的缺失消除了同步方法和语句发生的锁定的自动释放。在大多数情况下,应使用以下成语:
锁定 l = …;
l.lock();
尝试 {
// 访问受此锁保护的资源
} 最后 {
l.unlock();
}
当锁定和解锁发生在不同的作用域时,必须注意确保所有在持有锁时执行的代码都受到 try-finally 或 try-catch 的保护,以确保在必要时释放锁。
锁实现通过提供非阻塞尝试获取锁 (tryLock())、尝试获取可被中断的锁 (lockInterruptibly() 和尝试获取可以超时的锁(tryLock(long, TimeUnit))。
Lock 类还可以提供与隐式监视器锁完全不同的行为和语义,例如保证排序、不可重入使用或死锁检测。如果实现提供了这种专门的语义,那么实现必须记录这些语义。
请注意,Lock 实例只是普通对象,它们本身可以用作同步语句中的目标。获取 Lock 实例的监视器锁与调用该实例的任何 lock() 方法没有指定关系。建议为避免混淆,除非在它们自己的实现中,否则不要以这种方式使用 Lock 实例。
除非另有说明,否则为任何参数传递 null 值将导致引发 NullPointerException。
内存同步
所有 Lock 实现必须强制执行与内置监视器锁提供的相同的内存同步语义,如 Java 语言规范(17.4 内存模型)中所述:
成功的锁定操作与成功的锁定操作具有相同的内存同步效果。
成功的解锁操作与成功的解锁操作具有相同的内存同步效果。
不成功的锁定和解锁操作,以及重入锁定/解锁操作,不需要任何内存同步效果。
实施注意事项
三种形式的锁获取(可中断、不可中断和定时)在它们的性能特征、顺序保证或其他实现质量上可能不同。此外,中断正在进行的锁获取的能力在给定的 Lock 类中可能不可用。因此,实现不需要为所有三种形式的锁获取定义完全相同的保证或语义,也不需要支持正在进行的锁获取的中断。需要一个实现来清楚地记录每个锁定方法提供的语义和保证。它还必须遵守此接口中定义的中断语义,以支持获取锁的中断:完全或仅在方法入口上。
由于中断通常意味着取消,并且对中断的检查通常很少,因此实现可以更倾向于响应中断而不是正常的方法返回。即使可以证明在另一个操作可能已解除阻塞线程之后发生中断也是如此。一个实现应该记录这个行为。
可重入锁(ReentrantLock)
ReentrantLock
一种可重入互斥锁,其基本行为和语义与使用同步方法和语句访问的隐式监视器锁相同,但具有扩展功能。
ReentrantLock 由上次成功锁定但尚未解锁的线程拥有。当锁不被另一个线程拥有时,调用锁的线程将返回,成功获取锁。如果当前线程已经拥有锁,该方法将立即返回。这可以使用方法 isHeldByCurrentThread() 和 getHoldCount() 来检查。
此类的构造函数接受一个可选的公平参数。当设置为 true 时,在争用情况下,锁有利于授予对等待时间最长的线程的访问权限。否则,此锁不保证任何特定的访问顺序。使用由许多线程访问的公平锁的程序可能会显示出比使用默认设置的程序更低的整体吞吐量(即更慢;通常要慢得多),但在获取锁和保证不会出现饥饿的情况下具有较小的时间差异。但是请注意,锁的公平性并不能保证线程调度的公平性。因此,使用公平锁的许多线程之一可能会连续多次获得它,而其他活动线程没有进展并且当前没有持有锁。另请注意,不定时的 tryLock() 方法不遵守公平设置。如果锁可用,即使其他线程正在等待,它也会成功。
推荐的做法是总是在调用 lock 之后立即使用 try 块,最常见的是在构造之前/之后,例如:
X类{
私有最终 ReentrantLock 锁 = new ReentrantLock();
// …
公共无效 m() {
lock.lock(); // 阻塞直到条件成立
尝试 {
// … 方法体
} 最后 {
lock.unlock()
}
}
}
除了实现 Lock 接口之外,该类还定义了许多公共和受保护的方法来检查锁的状态。其中一些方法仅对仪器仪表和监控有用。
此类的序列化与内置锁的行为方式相同:反序列化锁处于未锁定状态,无论其在序列化时的状态如何。
此锁最多支持同一线程的 2147483647 个递归锁。尝试超过此限制会导致锁定方法抛出错误。
方法 | 说明 |
---|---|
public void lock() | 获取锁。如果没有被另一个线程持有,则获取锁并立即返回,将锁持有计数设置为 1。如果当前线程已经持有锁,则持有计数加一并且该方法立即返回。如果锁被另一个线程持有,那么当前线程将被禁用以用于线程调度目的并处于休眠状态,直到获得锁为止,此时锁持有计数设置为 1。 |
public void lockInterruptibly() throws InterruptedException | |
public boolean tryLock() | |
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException | |
public void unlock() | |
public Condition newCondition() | |
public int getHoldCount() | |
public boolean isHeldByCurrentThread() | |
public boolean isLocked() | |
public final boolean isFair() | |
protected Thread getOwner() | |
public final boolean hasQueuedThreads() | |
public final boolean hasQueuedThread(Thread thread) | |
public final int getQueueLength() | |
protected Collection getQueuedThreads() | |
public boolean hasWaiters(Condition condition) | |
public int getWaitQueueLength(Condition condition) | |
protected Collection getWaitingThreads(Condition condition) | |
public String toString() |
package com.qupeng.concurrent.lock;import com.qupeng.concurrent.thread.MyThreadFactory;import com.qupeng.concurrent.thread.ThreadUtils;import java.util.PriorityQueue;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class ReenterantLockTest {private static BlockingQueue<String> blockingQueue = new BlockingQueue<String>();private static AtomicInteger index = new AtomicInteger(1);public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(4, new MyThreadFactory());Producer producer1 = new Producer();Producer producer2 = new Producer();Consumer consumer1 = new Consumer();Consumer consumer2 = new Consumer();Future future1 = executorService.submit(producer1);Future future2 = executorService.submit(producer2);Future future3= executorService.submit(consumer1);Future future4 = executorService.submit(consumer2);try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}future1.cancel(true);future2.cancel(true);future3.cancel(true);future4.cancel(true);executorService.shutdown();System.out.println("Thread pool terminated.");}private static class Producer implements Callable<String> {@Overridepublic String call() throws Exception {while(!Thread.currentThread().isInterrupted()) {try {String item = String.valueOf(index.getAndIncrement());Thread.sleep(1000);blockingQueue.put(item);} catch (InterruptedException e) {ThreadUtils.printInterruptedMessage();break;}}return "";}}private static class Consumer implementsRunnable{@Overridepublic void run() {while(!Thread.currentThread().isInterrupted()) {try {String item = blockingQueue.take();Thread.sleep(1000);} catch (InterruptedException e) {ThreadUtils.printInterruptedMessage();break;}}}}private static class BlockingQueue<T> {private final int capacity = 1;private PriorityQueue<T> priorityQueue = new PriorityQueue<T>(capacity);ReentrantLock reentrantLock = new ReentrantLock();Condition putCondition = reentrantLock.newCondition();Condition takeConditon = reentrantLock.newCondition();public void put(T element) throws InterruptedException {reentrantLock.lockInterruptibly();try {while (priorityQueue.size() == capacity) {ThreadUtils.printThreadMessage("Waiting.");putCondition.await();}priorityQueue.add(element);ThreadUtils.printThreadMessage("Put item with index: " + element);takeConditon.signal();} finally {reentrantLock.unlock();}}public T take() throws InterruptedException {reentrantLock.lockInterruptibly();try {while (priorityQueue.size() == 0) {ThreadUtils.printThreadMessage("Waiting.");takeConditon.await();}T element =priorityQueue.poll();ThreadUtils.printThreadMessage("Poll item with index: " + element);putCondition.signal();return element;} finally {reentrantLock.unlock();}}}}
ReentrantReadWriteLock
ReadWriteLock 的实现,支持与 ReentrantLock 类似的语义。
此类具有以下属性:
获得顺序
此类不会为锁访问强加读取器或写入器的优先顺序。但是,它确实支持可选的公平策略。
非公平模式(默认)
当构造为非公平(默认)时,进入读写锁的顺序是未指定的,受重入约束。持续竞争的非公平锁可能会无限期地推迟一个或多个读取器或写入器线程,但通常比公平锁具有更高的吞吐量。
公平模式
当构造为公平时,线程使用近似到达顺序策略竞争进入。当当前持有的锁被释放时,要么为等待时间最长的单个写入线程分配写入锁,要么如果有一组读取线程等待的时间比所有等待写入线程的时间长,则为该组分配读取锁。
如果持有写锁或存在等待写入线程,则尝试获取公平读锁(不可重入)的线程将阻塞。直到当前等待的最早的写线程获得并释放写锁之后,该线程才会获得读锁。当然,如果一个等待的写入者放弃它的等待,留下一个或多个读取线程作为队列中最长的等待者并且没有写入锁,那么这些读取器将被分配读取锁。
除非读锁和写锁都空闲(这意味着没有等待的线程),否则试图获取公平写锁(不可重入)的线程将阻塞。 (请注意,非阻塞 ReentrantReadWriteLock.ReadLock.tryLock() 和 ReentrantReadWriteLock.WriteLock.tryLock() 方法不遵守此公平设置,如果可能,将立即获取锁,而不管等待线程如何。)
重入
此锁允许读取器和写入器以 ReentrantLock 的样式重新获取读取或写入锁。在写线程持有的所有写锁都被释放之前,不允许非重入读者。
此外,写入者可以获取读锁,但反之则不行。在其他应用程序中,当在调用或回调在读锁下执行读取的方法期间持有写锁时,重入可能很有用。如果读者试图获取写锁,它将永远不会成功。
锁定降级
重入还允许从写锁降级为读锁,方法是获取写锁,然后是读锁,然后释放写锁。但是,无法从读锁升级到写锁。
锁获取中断
读锁和写锁都支持获取锁时的中断。
条件支持
写锁提供了一个 Condition 实现,其行为方式与写锁的行为方式相同,就像 ReentrantLock.newCondition() 提供的 Condition 实现为 ReentrantLock 所做的那样。当然,这个条件只能与写锁一起使用。
读锁不支持 Condition 并且 readLock().newCondition() 抛出 UnsupportedOperationException。
仪器仪表
此类支持确定锁是否被持有或竞争的方法。这些方法是为监视系统状态而设计的,而不是为同步控制而设计的。
此类的序列化与内置锁的行为方式相同:反序列化锁处于未锁定状态,无论其在序列化时的状态如何。
方法 | 说明 |
---|---|
public final boolean isFair() | |
protected Thread getOwner() | |
public int getReadLockCount() | |
public boolean isWriteLocked() | |
public boolean isWriteLockedByCurrentThread() | |
public int getWriteHoldCount() | |
public int getReadHoldCount() | |
protected Collection getQueuedWriterThreads() | |
protected Collection getQueuedReaderThreads() | |
public final boolean hasQueuedThreads() | |
public final boolean hasQueuedThread(Thread thread) | |
public final int getQueueLength() | |
protected Collection getQueuedThreads() | |
public boolean hasWaiters(Condition condition) | |
public int getWaitQueueLength(Condition condition) | |
protected Collection getWaitingThreads(Condition condition) | |
public String toString() |
package com.qupeng.concurrent.lock;import com.qupeng.concurrent.thread.MyThreadFactory;import com.qupeng.concurrent.thread.ThreadUtils;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReentrantReadWriteLockTest {private static final ReentrantReadWriteLockList<String> list = new ReentrantReadWriteLockList<>();private static final AtomicInteger writeValue = new AtomicInteger(1);public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(4, new MyThreadFactory());Writer writer = new Writer();Reader reader1 = new Reader();Reader reader2 = new Reader();Reader reader3 = new Reader();Future future1 = executorService.submit(writer);Future future2 = executorService.submit(reader1);Future future3= executorService.submit(reader2);Future future4 = executorService.submit(reader3);try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}future1.cancel(true);future2.cancel(true);future3.cancel(true);future4.cancel(true);executorService.shutdown();System.out.println("Thread pool terminated.");}private static class Reader implements Runnable {@Overridepublic void run() {while(!Thread.currentThread().isInterrupted()) {try {list.size();Thread.sleep(200);} catch (InterruptedException e) {ThreadUtils.printInterruptedMessage();break;}}}}private static class Writer implements Runnable {@Overridepublic void run() {while(!Thread.currentThread().isInterrupted()) {int numberNew = writeValue.getAndIncrement();try {Thread.sleep(1000);list.add(String.valueOf(numberNew));} catch (InterruptedException e) {ThreadUtils.printInterruptedMessage();break;}}}}private static class ReentrantReadWriteLockList<T> {private List<T> list = new ArrayList<>();ReentrantReadWriteLock lock = new ReentrantReadWriteLock();Lock writeLock = lock.writeLock();Condition condition = writeLock.newCondition();public void add(T element) throws InterruptedException {lock.writeLock().lock();try {ThreadUtils.printThreadMessage("Add: " + element);Thread.sleep(1000);this.printLockInfo();this.list.add(element);}finally {lock.writeLock().unlock();}}public T get(int index) {lock.readLock().lock();try {return this.list.get(index);}finally {lock.readLock().unlock();}}public int size() {lock.readLock().lock();try {ThreadUtils.printThreadMessage("Size is: " + list.size());this.printLockInfo();return this.list.size();}finally {lock.readLock().unlock();}}public void printLockInfo() {ThreadUtils.printThreadMessage("Queue length: " + lock.getQueueLength());ThreadUtils.printThreadMessage("Read hold count: " + lock.getReadHoldCount());ThreadUtils.printThreadMessage("Read lock count: " + lock.getReadLockCount());ThreadUtils.printThreadMessage("Write hold count: " + lock.getWriteHoldCount());ThreadUtils.printThreadMessage("Has queued thread: " + lock.hasQueuedThread(Thread.currentThread()));ThreadUtils.printThreadMessage("Has queued threads: " + lock.hasQueuedThreads());ThreadUtils.printThreadMessage("Is fair: " + lock.isFair());ThreadUtils.printThreadMessage("Is write locked: " + lock.isWriteLocked());ThreadUtils.printThreadMessage("Is write locked by Current thread: " + lock.isWriteLockedByCurrentThread());}public ReentrantReadWriteLock getLock() {return this.lock;}}}
StampedLock
StampedLock除了能提供了读、写锁,还提供了乐观读锁。
一种基于能力的锁,具有三种用于控制读/写访问的模式。 StampedLock 的状态由版本和模式组成。 锁获取方法返回一个代表和控制与锁状态相关的访问的标记; 这些方法的“尝试”版本可能会返回特殊值零来表示无法获取访问权限。 锁释放和转换方法需要标记作为参数,如果它们与锁的状态不匹配,则会失败。 三种模式是:
- 写模式。方法 writeLock() 可能会阻塞等待独占访问,返回可以在方法 unlockWrite(long) 中使用以释放锁的标记。还提供了不定时和定时版本的 tryWriteLock。在写模式下持有锁时,可能无法获得读锁,所有乐观读验证都会失败。
- 读模式。方法 readLock() 可能会阻塞等待非独占访问,返回可用于方法 unlockRead(long) 以释放锁的标记。还提供了不定时和定时版本的 tryReadLock。
- 乐观读模式。方法 tryOptimisticRead() 仅当锁当前未处于写入模式时才返回非零标记。如果自获得给定标记后尚未以写入模式获得锁,则方法 validate(long) 返回 true。这种模式可以被认为是一个非常弱的读锁版本,可以随时被写入者打破。对短只读代码段使用乐观模式通常会减少争用并提高吞吐量。然而,它的使用本质上是脆弱的。乐观读取部分应该只读取字段并将它们保存在局部变量中以供验证后使用。在乐观模式下读取的字段可能非常不一致,因此仅当您对数据表示足够熟悉以检查一致性和/或重复调用方法 validate() 时才适用。例如,当首先读取对象或数组引用,然后访问其字段、元素或方法之一时,通常需要执行此类步骤。
此类还支持有条件地提供跨三种模式的转换的方法。例如,方法 tryConvertToWriteLock(long) 尝试“升级”模式,如果 (1) 已经处于写入模式 (2) 处于读取模式并且没有其他读取器或 (3) 处于乐观模式并且锁可用。这些方法的形式旨在帮助减少在基于重试的设计中出现的一些代码膨胀。
StampedLocks 旨在用作开发线程安全组件的内部实用程序。它们的使用依赖于对它们所保护的数据、对象和方法的内部属性的了解。它们不是可重入的,因此锁定的主体不应调用其他可能尝试重新获取锁的未知方法(尽管您可以将标记传递给可以使用或转换它的其他方法)。读锁模式的使用依赖于相关的代码段是无副作用的。未经验证的乐观读取部分不能调用未知的方法来容忍潜在的不一致。邮票使用有限的表示,并且在密码学上不安全(即,有效的邮票可能是可猜测的)。邮票值可以在(不早于)连续运行一年后回收。超过此期限而未使用或验证的邮票可能无法正确验证。 StampedLocks 是可序列化的,但总是反序列化为初始解锁状态,因此它们对于远程锁定没有用处。
StampedLock 的调度策略并不总是优先选择读者而不是作者,反之亦然。所有“尝试”方法都是尽力而为,不一定符合任何调度或公平策略。任何获取或转换锁的“尝试”方法的零返回不携带任何关于锁状态的信息;随后的调用可能会成功。
因为它支持跨多种锁定模式的协调使用,所以该类不直接实现 Lock 或 ReadWriteLock 接口。但是,在仅需要相关功能集的应用程序中,可以将 StampedLock 视为 ReadLock()、asWriteLock() 或 asReadWriteLock()。
方法 | 说明 |
---|---|
public long writeLock() | |
public long tryWriteLock() | |
public long tryWriteLock(long time, TimeUnit unit) throws InterruptedException | |
public long writeLockInterruptibly() throws InterruptedException | |
public long readLock() | |
public long tryReadLock() | |
public long tryReadLock(long time, TimeUnit unit) throws InterruptedException | |
public long readLockInterruptibly() throws InterruptedException | |
public long tryOptimisticRead() | |
public boolean validate(long stamp) | |
public void unlockWrite(long stamp) | |
public void unlockRead(long stamp) | |
public void unlock(long stamp) | |
public long tryConvertToWriteLock(long stamp) | |
public long tryConvertToReadLock(long stamp) | |
public long tryConvertToOptimisticRead(long stamp) | |
public boolean tryUnlockWrite() | |
public boolean tryUnlockRead() | |
public boolean isWriteLocked() | |
public boolean isReadLocked() | |
public int getReadLockCount() | |
public String toString() | |
public Lock asReadLock() | |
public Lock asWriteLock() | |
public ReadWriteLock asReadWriteLock() |
package com.qupeng.concurrent.lock;import com.qupeng.concurrent.thread.MyThreadFactory;import com.qupeng.concurrent.thread.ThreadUtils;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.locks.StampedLock;public class StampedLockTest {private static final List<String> list = new ArrayList<>();private static final StampedLock stampedLock = new StampedLock();private static final AtomicInteger writeValue = new AtomicInteger(1);public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(4, new MyThreadFactory());Writer writer = new Writer();Reader reader1 = new Reader();Reader reader2 = new Reader();Reader reader3 = new Reader();Future future1 = executorService.submit(writer);Future future2 = executorService.submit(reader1);Future future3= executorService.submit(reader2);Future future4 = executorService.submit(reader3);try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}future1.cancel(true);future2.cancel(true);future3.cancel(true);future4.cancel(true);executorService.shutdown();System.out.println("Thread pool terminated.");}private static class Reader implements Runnable {@Overridepublic void run() {while(!Thread.currentThread().isInterrupted()) {long stamp = stampedLock.tryOptimisticRead(); // 为提升效率,首先尝试获取乐观读锁,得到读戳int size = list.size();ThreadUtils.printThreadMessage("Size under optimistic lock: " + size);if (!stampedLock.validate(stamp)) { // 如果有线程申请得到了读戳对应的写锁,临界区很有可能被改变,需要申请读锁stamp = stampedLock.readLock();try {size = list.size();// 获取到最新的临界区值ThreadUtils.printThreadMessage("Size under read lock: " + size);} finally {stampedLock.unlockRead(stamp); // 释放读锁}}try {Thread.sleep(200);} catch (InterruptedException e) {break;}}}}private static class Writer implements Runnable {@Overridepublic void run() {while(!Thread.currentThread().isInterrupted()) {long stamp = stampedLock.readLock(); // 获取读锁,并得到读戳try {int size = list.size(); // 读取临界区while (true) {long writeStamp = stampedLock.tryConvertToWriteLock(stamp); // 修改临界区时,升级为写锁,并得到写戳if (0 != writeStamp) { // 写戳不为0,锁升级成功stamp = writeStamp;if (3 == size) {list.clear();writeValue.set(0);ThreadUtils.printThreadMessage("Clear list.");Thread.sleep(1000);} else {int element = writeValue.getAndIncrement();ThreadUtils.printThreadMessage("Start adding: " + element);list.add(String.valueOf(element));Thread.sleep(1000);ThreadUtils.printThreadMessage("End adding: " + element);}break;} else { // 写戳为0,锁升级失败stampedLock.unlockRead(stamp); // 释放读锁stamp = stampedLock.writeLock(); // 直接申请写锁,继续修改临界区}}} catch (InterruptedException e) {break;} finally {stampedLock.unlock(stamp);// 释放锁}}}}}
线程的等待和唤醒
等待与唤醒是Java提供的一种线程之间的同步机制。线程可以为等待特定条件的达成而阻塞(成为WAITING状态),然后直到超时或被其他线程唤醒。
目前,JAVA提供了3种线程等待唤醒的机制。
- synchronized + Object的wait()和notify()方法
- Lock的lock()方法和unlock()方法 + Conditon的await()和signal()方法
- LockSupport的park()和unpark()方法
此外,使用Thread.sleep, Thread.suspend也可以实现线程的阻塞。
package com.qupeng.concurrent.lock;import com.qupeng.concurrent.thread.MyThreadFactory;import com.qupeng.concurrent.thread.ThreadUtils;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.LockSupport;import java.util.concurrent.locks.ReentrantLock;public class WaitNotifyTest {private static final ReentrantLock reentrantLock = new ReentrantLock();private static final Condition condition = reentrantLock.newCondition();private static final Object lockObject = new Object();private static Thread threadSuspended;private static Thread threadParked;public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(4, new MyThreadFactory());Future future1 = executorService.submit(new ThreadSuspendTest());Future future2 = executorService.submit(new WaitAndNotifyTest());Future future3 = executorService.submit(new AwaitAndSignalTest());Future future4 = executorService.submit(new ParkAndUnparkTest());ThreadUtils.sleep(2);System.out.println("Calling Thread.resume().");threadSuspended.resume();ThreadUtils.sleep(2);System.out.println("Calling Object.notify().");synchronized (lockObject) {lockObject.notify();}ThreadUtils.sleep(2);System.out.println("Calling Condition.signal().");reentrantLock.lock();try{condition.signal();} finally {reentrantLock.unlock();}ThreadUtils.sleep(2);System.out.println("Calling LockSupport.unpark().");LockSupport.unpark(threadParked);ThreadUtils.sleep(2);future1.cancel(true);future2.cancel(true);future3.cancel(true);future4.cancel(true);executorService.shutdown();System.out.println("Thread pool terminated.");}private static class ThreadSuspendTest implements Runnable {@Overridepublic void run() {ThreadUtils.printThreadMessage("Thread is suspended by calling currentThread.suspend().");threadSuspended = Thread.currentThread();Thread.currentThread().suspend();ThreadUtils.printThreadMessage("Thread is resumed.");}}private static class WaitAndNotifyTest implements Runnable {@Overridepublic void run() {ThreadUtils.printThreadMessage("Thread is waiting by calling Object.wait().");try {synchronized (lockObject) {lockObject.wait();}} catch (InterruptedException e) {e.printStackTrace();}ThreadUtils.printThreadMessage("Thread is awake.");}}private static class AwaitAndSignalTest implements Runnable {@Overridepublic void run() {ThreadUtils.printThreadMessage("Thread is waiting by calling Condition.awaitUninterruptibly().");reentrantLock.lock();condition.awaitUninterruptibly();ThreadUtils.printThreadMessage("Thread is awake.");}}private static class ParkAndUnparkTest implements Runnable {@Overridepublic void run() {ThreadUtils.printThreadMessage("Thread is waiting by calling LockSupport.park().");threadParked = Thread.currentThread();LockSupport.park(this);ThreadUtils.printThreadMessage("Thread is awake.");}}}
线程等待方式 | 是否释放持有锁 | 等待时间 | 唤醒方式 | 等待或唤醒前是否需要加锁 | 调用顺序 |
---|---|---|---|---|---|
Thread.sleep | 否 | 必须指定 | 超时自动唤醒 | 不需要 | |
Thread.suspend | 否 | 不能指定 | 必须调用Thread.resume,并且不释放锁,所以死锁风险极大,已废弃 | 不需要 | |
object.wait | 是 | 可选 | 1 超时自动唤醒 | 在synchronized块中调用 | wait必须在notify之前 |
condition.await | 是 | 可选 | 1 超时自动唤醒 | 在Lock.lock和Lock.unlock之间调用 | await必须在signal之前 |
LockSupport.park | 否 | 不需要 | LockSupport.unpark | 不需要 | unpark不必在park之前调用 |
LockSupport
用于创建锁和其他同步类的基本线程阻塞原语。
此类与使用它的每个线程关联一个permit(在 Semaphore 类的意义上)。如果permit可用,park调用将立即返回,并在此过程中消耗它;否则可能会阻塞。如果permit尚不可用,则调用 unpark 可使许可证可用。 (但与信号量不同,许可证不会累积。最多有一个。)
方法 park 和 unpark 提供了阻塞和解除阻塞线程的有效方法,这些线程不会遇到导致不推荐使用的方法 Thread.suspend 和 Thread.resume 无法用于此类目的的问题:一个调用 park 的线程和另一个试图解除它的线程之间的争用由于许可,将保持活力。此外,如果调用者的线程被中断,park 将返回,并且支持超时版本。 park 方法也可以在任何其他时间“无缘无故”地返回,因此通常必须在循环中调用,该循环在返回时重新检查条件。从这个意义上说,停车是“忙等待”的优化,不会浪费太多时间,但必须与取消停车配对才能有效。
三种形式的 park 都支持一个 blocker 对象参数。该对象在线程被阻塞时被记录,以允许监视和诊断工具识别线程被阻塞的原因。 (此类工具可以使用 getBlocker(Thread) 方法访问阻止程序。)强烈建议使用这些表单而不是没有此参数的原始表单。在锁实现中作为阻塞器提供的正常参数是 this。
这些方法旨在用作创建更高级别同步实用程序的工具,它们本身对大多数并发控制应用程序没有用处。 park 方法仅用于以下形式的构造:
while (!canProceed()) { … LockSupport.park(this); }
其中 canProceed 或在调用停车之前的任何其他操作都不会导致锁定或阻塞。因为每个线程只有一个许可,所以任何对 park 的中间使用都可能会干扰其预期效果。
方法 | 说明 |
---|---|
public static void park(Object blocker) | |
public static void unpark(Thread thread) | |
public static void parkNanos(Object blocker, long nanos) | |
public static void parkUntil(Object blocker, long deadline) | |
public static Object getBlocker(Thread t) | |
public static void park() | |
public static void parkNanos(long nanos) | |
public static void parkUntil(long deadline) |
AQS
java中AQS是AbstractQueuedSynchronizer类,AQS依赖FIFO队列来提供一个框架,这个框架用于实现锁以及锁相关的同步器,比如信号量、事件等。
AQS定义了一套多线程访问共享资源的同步器框架,是整个包的基石,Lock、ReadWriteLock、CountDowndLatch、CyclicBarrier、Semaphore、ThreadPoolExecutor等都是在AQS的基础上实现的。
AQS的核心思想是:通过一个volatile修饰的int属性state代表同步状态,例如0是无锁状态,1是上锁状态。多线程竞争资源时,通过CAS的方式来修改state,例如从0修改为1,修改成功的线程即为资源竞争成功的线程,将其设为exclusiveOwnerThread,也称“工作线程”,资源竞争失败的线程会被放入一个FIFO的队列中并挂起休眠,当exclusiveOwnerThread线程释放资源后,会从队列中唤醒线程继续工作,循环往复。
在AQS中,主要有两部分功能,一部分是操作state变量,第二部分是实现排队和阻塞机制。
AbstractOwnableSynchronizer
可能由线程独占的同步器。 此类为创建可能需要所有权概念的锁和相关同步器提供了基础。 AbstractOwnableSynchronizer 类本身不管理或使用此信息。 但是,子类和工具可以使用适当维护的值来帮助控制和监视访问并提供诊断。
方法 | 说明 |
---|---|
protected final void setExclusiveOwnerThread(Thread thread) | 设置当前拥有独占访问权限的线程。 空参数表示没有线程拥有访问权限。 此方法不会强加任何同步或volatile字段访问 |
getExclusiveOwnerThread | 返回最后由 setExclusiveOwnerThread 设置的线程,如果从未设置,则返回 null。 此方法不会强加任何同步或volatile字段访问。 |
AbstractQueuedSynchronizer
提供一个框架来实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关的同步器(信号量、事件等)。此类旨在为大多数依赖单个原子 int 值来表示状态的同步器提供有用的基础。子类必须定义更改此状态的受保护方法,并定义该状态在获取或释放此对象方面的含义。鉴于这些,此类中的其他方法执行所有排队和阻塞机制。子类可以维护其他状态字段,但仅跟踪使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法操作的原子更新的 int 值以进行同步。
子类应定义为非公共内部帮助类,用于实现其封闭类的同步属性。 AbstractQueuedSynchronizer 类不实现任何同步接口。相反,它定义了诸如acquireInterruptibly(int)之类的方法,这些方法可以被具体的锁和相关的同步器适当地调用以实现它们的公共方法。
此类支持默认独占模式和共享模式中的一种或两种。以独占模式获取时,其他线程尝试获取时不会成功。多个线程的共享模式获取可能(但不一定)成功。此类不“理解”这些差异,除非在机械意义上,当共享模式获取成功时,下一个等待线程(如果存在)也必须确定它是否也可以获取。不同模式下等待的线程共享同一个FIFO队列。通常,实现子类仅支持其中一种模式,但两者都可以发挥作用,例如在 ReadWriteLock 中。只支持独占或只支持共享模式的子类不需要定义支持未使用模式的方法。
该类定义了一个嵌套的 AbstractQueuedSynchronizer.ConditionObject 类,该类可以被支持独占模式的子类用作 Condition 实现,其中方法 isHeldExclusively() 报告是否针对当前线程独占保持同步,方法 release(int) 与当前线程一起调用getState() 值完全释放此对象,acquire(int) 给定此保存的状态值,最终将此对象恢复到其先前获取的状态。没有 AbstractQueuedSynchronizer 方法否则会创建这样的条件,因此如果无法满足此约束,请不要使用它。 AbstractQueuedSynchronizer.ConditionObject 的行为当然取决于其同步器实现的语义。
此类为内部队列提供检查、检测和监视方法,以及用于条件对象的类似方法。这些可以根据需要导出到类中,使用 AbstractQueuedSynchronizer 用于它们的同步机制。
此类的序列化仅存储底层原子整数维护状态,因此反序列化对象具有空线程队列。需要可序列化的典型子类将定义一个 readObject 方法,该方法在反序列化时将其恢复到已知的初始状态。
3,225 / 5,000
翻译结果
用法
要将此类用作同步器的基础,请在适用时重新定义以下方法,方法是使用 getState()、setState(int) 和/或 compareAndSetState(int, int) 检查和/或修改同步状态:
尝试获取(整数)
尝试释放(整数)
tryAcquireShared(int)
tryReleaseShared(int)
isHeldExclusively()
默认情况下,这些方法中的每一个都会引发 UnsupportedOperationException。这些方法的实现必须是内部线程安全的,并且通常应该是短的而不是阻塞的。定义这些方法是使用此类的唯一受支持的方法。所有其他方法都被声明为最终方法,因为它们不能独立变化。
您可能还会发现从 AbstractOwnableSynchronizer 继承的方法对于跟踪拥有独占同步器的线程很有用。鼓励您使用它们——这使监视和诊断工具能够帮助用户确定哪些线程持有锁。
即使此类基于内部 FIFO 队列,它也不会自动强制执行 FIFO 获取策略。独占同步的核心形式为:
获得:
而(!tryAcquire(arg)){
如果尚未排队,则将线程入队;
可能阻塞当前线程;
}
释放:
如果(尝试释放(arg))
解锁第一个排队的线程;
(共享模式类似,但可能涉及级联信号。)
因为在入队之前调用了获取中的检查,所以新获取的线程可能会抢在其他被阻塞和排队的线程之前。但是,如果需要,您可以定义 tryAcquire 和/或 tryAcquireShared 以通过内部调用一个或多个检查方法来禁用插入,从而提供公平的 FIFO 获取顺序。特别是,如果 hasQueuedPredecessors()(一种专门为公平同步器使用的方法)返回 true,大多数公平同步器可以定义 tryAcquire 返回 false。其他变化是可能的。
默认插入(也称为贪婪、放弃和避免护送)策略的吞吐量和可扩展性通常最高。虽然这不能保证公平或无饥饿,但允许较早排队的线程在稍后排队的线程之前重新竞争,并且每次重新竞争都有成功对抗传入线程的无偏机会。此外,虽然获取不是通常意义上的“旋转”,但它们可能会在阻塞之前执行多次调用 tryAcquire 并穿插其他计算。当独占同步只是短暂地保持时,这提供了自旋的大部分好处,而没有大部分责任。如果需要,您可以通过预先调用获取具有“快速路径”检查的方法来增加这一点,可能会预先检查 hasContended() 和/或 hasQueuedThreads() 以仅在可能不竞争同步器时才这样做。
此类通过将其使用范围专门用于可以依赖 int 状态、获取和释放参数以及内部 FIFO 等待队列的同步器,部分地为同步提供了高效且可扩展的基础。如果这还不够,您可以使用原子类、您自己的自定义队列类和 LockSupport 阻塞支持从较低级别构建同步器。
方法 | 说明 |
---|---|
protected final int getState() | 返回同步状态的当前值。 此操作具有volatile读取的内存语义。 |
protected final void setState(int newState) | 设置同步状态的值。 此操作具有volatile写入的内存语义。 |
protected final boolean compareAndSetState(int expect, int update) | 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值。 此操作具有volatile读写的内存语义。 |
protected boolean tryAcquire(int arg) | 尝试以独占模式获取。 此方法应查询对象的状态是否允许以独占模式获取它,如果允许则获取它。此方法始终由执行获取的线程调用。 如果此方法报告失败,如果该线程尚未排队,则获取方法可以将该线程排队,直到由某个其他线程的释放发出信号。 这可用于实现方法 Lock.tryLock()。默认实现抛出 UnsupportedOperationException。 |
protected boolean tryRelease(int arg) | |
protected int tryAcquireShared(int arg) | |
protected boolean tryReleaseShared(int arg) | |
protected boolean isHeldExclusively() | |
public final void acquire(int arg) | |
public final void acquireInterruptibly(int arg) throws InterruptedException | |
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException | |
public final boolean release(int arg) | |
public final void acquireShared(int arg) | |
public final void acquireSharedInterruptibly(int arg) throws InterruptedException | |
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException | |
public final boolean releaseShared(int arg) | |
public final boolean hasQueuedThreads() | |
public final boolean hasContended() | |
public final Thread getFirstQueuedThread() | |
public final boolean isQueued(Thread thread) | |
public final boolean hasQueuedPredecessors() | |
public final int getQueueLength() | |
public final Collection getQueuedThreads() | |
public final Collection getExclusiveQueuedThreads() | |
public final Collection getSharedQueuedThreads() | |
public String toString() | |
public final boolean owns(AbstractQueuedSynchronizer.ConditionObject condition) | |
public final boolean hasWaiters(AbstractQueuedSynchronizer.ConditionObject condition) | |
public final int getWaitQueueLength(AbstractQueuedSynchronizer.ConditionObject condition) | |
public final Collection getWaitingThreads(AbstractQueuedSynchronizer.ConditionObject condition) |
AbstractQueuedLongSynchronizer
AbstractQueuedSynchronizer 的一个版本,其中同步状态保持为 long。 此类具有与 AbstractQueuedSynchronizer 完全相同的结构、属性和方法,只是所有与状态相关的参数和结果都定义为 long 而不是 int。 在创建需要 64 位状态的多级锁和屏障等同步器时,此类可能很有用。