第5章 Java中的锁
5.1 Lock接口
锁 是用来控制多个线程访问共享资源 的方式,一般来说,一个锁能够防止多个线程同时访问共享资源。
在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能的,而Java SE 5之后,并发包中新增了Lock接口 (以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能 ,只是在使用时需要显式地获取和释放锁 。虽然它缺少了(通过synchronized块或者方法所提供的)隐式获取释放锁的便捷性,但是却更灵活 ,扩展性更好 。
Lock的使用非常简单
1 2 3 4 5 6 7 Lock lock = new ReentrantLock(); lock.lock(); try { }finally { lock.unlock(); }
在finally块中释放锁,目的是保证在获取到锁之后,最终能够被释放 。
Lock接口的API如下表所示。
方法名称
描述
void lock()
获取锁 :调用该方法当前线程将会获取锁,当锁获取后,从该方法返回
void lockInterruptibly() throws InterruptedException
可中断地获取锁 :该方法可以响应中断,即在锁的获取中可以中断当前线程
boolean tryLock()
尝试非阻塞获取锁 :获取到立刻返回ture,获取不到立刻返回false,不会等待
boolean tryLock(long time, TimeUnit unit)
超时获取锁 :当前线程在以下三种情况下会返回 1、当前线程在超时时间内获取了锁。2、当前线程在超时时间内被中断。3、超时时间结束,返回false
void unlock()
释放锁
Condition newCondition()
获取等待通知组件:该组件和当前的锁绑定
5.2 队列同步器
队列同步器 AbstractQueuedSynchronizer
(以下简称同步器),是用来构建锁 或者其他同步组件的基础框架.
同步器是实现锁的关键 ,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者之间的关系:
锁是面向使用者的 ,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节 ;
同步器面向的是锁的实现者 ,它简化了锁的实现方式 ,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。
锁和同步器很好地隔离了使用者和实现者所需关注的领域。
在正式介绍同步器之前,我们要先了解共享锁 和独占锁
独占锁就是在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能够获取锁
而共享锁允许同一时刻多个线程获取到锁
5.2.1 队列同步器的接口与示例
同步器的设计 是基于模板方法 这种设计模式的,也就是说,
使用者 需要继承同步器 并重写指定的方法 ,
随后将同步器组合 在自定义同步组件的实现中,并调用同步器提供的模板方法 ,
而这些模板方法将会调用使用者重写的方法 。
下面介绍继承后,我们需要重写的5个方法,如下(不重写也行,同步器里提供了默认的空实现)
方法名称
描述
protected boolean tryAcquire(int arg)
独占式获取同步状态:实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
protected boolean tryRelease(int arg)
独占式释放同步状态:等待获取同步状态的线程有机会获取同步状态
protected int tryAcquireShared(int arg)
共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败
protected boolean tryReleaseShared(int arg)
共享式释放同步状态
protected boolean isHeldExclusively()
判断当前同步器是否在独占模式下被线程占用
下面再介绍用于访问或者修改同步状态的方法 ,这三个方法同步器已经帮我们实现好了,它们是线程安全的,我们在重写上面5个方法时可能会用到它们来修改同步状态
方法名称
描述
getState()
获取当前同步状态
setState()
设置当前同步状态
compareAndSet(int expect, int update)
使用CAS设置当前状态,该方法是原子的
下面介绍实现一个锁时,锁可以调用同步器提供的各种方法 ,也就是前文提到的模版方法
方法名称
描述
void acquire(int arg)
独占式获取同步状态:如果当前线程获取同步状态成功,则由该方法返回,否则将进入同步队列等待
void acquireInterruptibly(int arg)
与acquire方法相同,但是该方法可以响应中断。如果当前线程在同步队列中收到了中断,则抛出InterruptedException并返回
boolean tryAcquireNanos(int arg, long nanos)
在acquireInterruptibly的基础上增加了超时限制,如果当前线程在超时时间内没有获得同步状态,则返回false,否则返回true
void acqureShared(int arg)
共享式的获取同步状态,如果当前线程未获取到同步状态,将进入同步队列等待
void acquireSharedInterruptibly(int arg)
共享式的获取同步状态,并且可以响应中断
boolean tryAcquireSharedNanos(int arg, long nanos)
在acquireSharedInterruptibly(int arg)基础上增加了超时限制
boolean release(int arg)
独占式释放同步状态,该方法释放同步状态之后,还会从同步队列中将头结点唤醒
boolean releaseShared(int arg)
共享式的释放同步状态
Collection<Thread>getQueuedThreads()
获取等待在同步队列上的线程集合
下面我们实现一个独占锁,来体会一下同步器的工作原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 package ThreadPoolSample;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;public class Mutex implements Lock { private static class MutexSynchronizer extends AbstractQueuedSynchronizer { @Override protected boolean isHeldExclusively () { return getState() == 1 ; } @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , 1 )){ this .setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { if (getState() == 0 ) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null ); setState(0 ); return true ; } Condition newCondition () { return new ConditionObject(); } } private final MutexSynchronizer synchronizer = new MutexSynchronizer(); @Override public void lock () { synchronizer.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { synchronizer.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return synchronizer.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return synchronizer.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { synchronizer.release(1 ); } @Override public Condition newCondition () { return synchronizer.newCondition(); } }
由此可以看出,有了AQS之后,我们要实现一个可靠的自定义同步组件变得简单了
5.2.2 队列同步器的实现分析
5.2.2.1 同步队列
同步器 依赖内部的同步队列 (一个FIFO双向队列)来完成同步状态的管理
当前线程获取同步状态失败时 ,同步器会将当前线程 以及等待状态等信息构造成为一个节点(Node)并将其
加入同步队列的尾部 ,同时会 阻塞当前线程
当同步状态被释放 时,会把首节点中的线程唤醒 ,使其再次尝试获取同步状态
同步器中有一个头节点和一个尾节点,当新节点入队时,插入到尾节点 。这个操作必须是线程安全 的。同步器使用了CAS方法 :compareAndSetTail(Node expect, Node update)
来保证 这个操作的线程安全 。
而当某个节点释放锁然后出队时,则不需要保证线程安全 。因为同步器定义了一个规则:当头节点出队时,只有头结点的后继节点有资格获取锁。因此不需要考虑线程安全,因为没有其他节点有资格和头结点的后继节点竞争。
5.2.2.2 独占式同步状态获取与释放
我们先给出独占式获取和释放同步状态的步骤,然后分别解释每一步
在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;
移出队列(或停止自旋)的条件是前驱节点为头节点且前驱节点释放了同步状态。
在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
独占式获取同步状态的流程图 如下
该方法的源码如下
1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
首先调用tryAcquire
尝试非阻塞的获取同步状态 ,成功则直接返回
假如上一步失败,则调用addWaiter
将这个线程加入到同步队列中
然后调用acquireQueued
,这个方法是个死循环,自旋地获取同步状态
selfInterrupt()
应该是自中断,我也不知道有啥用,先跳过吧
接着我们看看addWaiter
方法,它用一种线程安全的方式将当前节点加入到同步队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
然后再看看acquireQueued()方法是怎样自旋地获取同步状态的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
最后我们看看独占式地释放同步状态的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
5.2.2.3 共享式同步状态获取与释放
左半部分,共享式访问资源时,其他共享式的访问均被允许,而独占式访问被阻塞
右半部分是独占式访问资源时,同一时刻其他访问均被阻塞。
下面来看共享式获取同步状态的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
下面是共享式释放同步状态的源码
1 2 3 4 5 6 7 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
5.2.2.4 独占式超时获取同步状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 private boolean doAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L ) return false ; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return true ; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L ) return false ; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
5.2.2.5 自定义同步组件——TwinsLock
本节我们再设计一个简单的同步组件来加深理解,需求如下:
设计一个同步工具,该工具在同一时刻,只允许至多两个线程同时访问
超过两个线程的访问将被阻塞
我们将这个同步工具命名为TwinsLock。
设计过程如下
首先,确定访问模式 。TwinsLock能够在同一时刻支持多个线程的访问,这显然是共享式访问 ,因此,要
求TwinsLock必须重写tryAcquireShared(int args)方法和tryReleaseShared(int args)方法
其次,定义资源数 。TwinsLock在同一时刻允许至多两个线程的同时访问,表明同步资源数为2,这样可以设置初始状态status为2,当一个线程进行获取,status减1,该线程释放,则status加1
最后,组合自定义同步器 。前面的章节提到,自定义同步组件通过组合自定义同步器来完成同步功能,一般情况下自定义同步器会被定义为自定义同步组件的内部类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 package ThreadPoolSample;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;public class TwinsLock implements Lock { private Sync sync = new Sync(2 ); static class Sync extends AbstractQueuedSynchronizer { public Sync (int count) { if (count <= 0 ){ throw new IllegalArgumentException("count must larger than 0" ); } setState(count); } @Override protected int tryAcquireShared (int reduceCount) { for (;;){ int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)){ return newCount; } } } @Override protected boolean tryReleaseShared (int returnCount) { for (;;){ int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)){ return true ; } } } public Condition newCondition () { return new ConditionObject(); } } @Override public void lock () { sync.acquireShared(1 ); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } @Override public boolean tryLock () { return sync.tryAcquireShared(1 ) > 0 ; } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(time)); } @Override public void unlock () { sync.release(1 ); } @Override public Condition newCondition () { return newCondition(); } }
5.3 重入锁(ReentrantLock)
ReentrantLock是Lock接口 最常用的实现 之一,ReentrantLock支持重进入:在调用lock()
方法时,已经获取到锁的线程,能够再次调用lock()
方法获取锁而不被阻塞 。
然后解释一下锁获取的公平性问题 ,
如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的 ,反之,是不公平的。
公平的获取锁 ,也就是等待时间最长的线程最优先获取锁 ,也可以说锁获取是顺序的。
ReentrantLock提供了一个构造函数,能够控制锁是否是公平的 。
5.4.1 实现重进入
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞 ,该特性的实现需要解决以下两个问题 :
线程再次获取锁 :锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
锁的最终释放 :
下面查看一下非公平获取锁的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
然后查看一下释放锁的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
5.4.2 公平与非公平获取锁的区别
下面查看公平地获取锁的源代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
该方法与nonfairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断
对比公平锁与非公平锁
公平锁可以保证不引起饥饿,但是这意味着大量线程切换,极为消耗资源
非公平锁可以减少线程切换,但是会引起饥饿
如果不特殊声明,ReentrantLock默认为非公平锁
5.4 读写锁
读写锁 在同一时刻可以允许多个读线程访问 ,但是在写线程访问 时,所有的读线程和其他写线程均被阻塞 。
在使用读写锁时,只需要在读操作时获取读锁,写操作时获取写锁即可。当写锁被获取到时,后续(非当前写
操作线程)的读写操作都会被阻塞,写锁释放之后,所有操作继续执行,
一般情况下,读写锁的性能都会比排它锁好 ,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。Java并发包提供读写锁的实现是ReentrantReadWriteLock ,它提供的特性如下 :
公平性选择 :用户可以选择使用公平锁还是非公平锁
重进入 :支持重进入,读线程在获取读锁之后,能再次获取读锁。写线程在获取写锁之后能再次获取写锁,同时也可以获取读锁。
锁降级 :在某些特定情况下,写锁可以降级为读锁
5.4.1 读写锁的接口与示例
下面通过一个缓存的示例来介绍读写锁的使用方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package ThreadPoolSample;import java.util.HashMap;import java.util.Map;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantReadWriteLock;public class Cache { static Map<String, Object> map = new HashMap<>(); static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock r = rwl.readLock(); static Lock w = rwl.writeLock(); public static final Object get (String key) { r.lock(); try { return map.get(key); }finally { r.unlock(); } } public static final Object put (String key, Object value) { w.lock(); try { return map.put(key, value); } finally { w.unlock(); } } public static final void clear () { w.lock(); try { map.clear(); }finally { w.unlock(); } } }
上述示例中,Cache组合一个非线程安全的HashMap作为缓存的实现,同时使用读写锁的读锁和写锁来保证Cache是线程安全的。
在读操作 get(String key)方法中,需要获取读锁 ,这使得并发访问该方法时不会被阻塞。
写操作 put(String key,Object value)方法和clear()方法,在更新HashMap时必须提前获取写锁 ,当获取写锁后,其他线程对于读锁和写锁的获取均被阻塞,而只有写锁被释放之后,其他读写操作才能继续。
Cache使用读写锁提升读操作的并发性 ,也保证每次写操作对所有的读写操作的可见性 ,同时简化了编程方式。
5.4.2 读写锁的实现分析
接下来分析ReentrantReadWriteLock的实现,主要包括:
读写状态的设计
写锁的获取与释放
读锁的获取与释放
锁降级 (以下没有特别说明读写锁均可认为是ReentrantReadWriteLock)
5.4.2.1 读写状态的设计
读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态 ,使得该状态的设计成为读写锁实现的关键。
如果在一个整型变量上维护多种状态,就一定需要“按位切割使用 ”这个变量,读写锁将变量切分成了两个部分,高16位表示读 ,低16位表示写
读写锁是如何迅速确定读和写各自的状态呢 ?
答案是通过位运算 。假设当前同步状态值为S,写状态等于S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0右移16位)。
当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是S+0x00010000。
5.4.2.2 写锁的获取与释放
写锁 是一个支持重进入 的排它锁 。
如果当前线程已经获取了写锁,则增加写状态。
如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态
获取写锁的代码如代码清单5-17所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; } static int exclusiveCount (int c) { return c & EXCLUSIVE_MASK; }
该方法保证了如果存在读锁则写锁不能被获取,原因是:如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。
5.4.2.3 读锁的获取与释放
读锁 是一个支持重进入 的共享锁 ,它能够被多个线程同时获取 ,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。
下面查看获取读锁的删减版源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected final int tryAcquireShared (int unused) { for (;;) { int c = getState(); int nextc = c + (1 << 16 ); if (nextc < c) throw new Error("Maximum lock count exceeded" ); if (exclusiveCount(c) != 0 && owner != Thread.currentThread()) return -1 ; if (compareAndSetState(c, nextc)) return 1 ; } }
5.4.2.4 锁降级
锁降级 是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
下面是一个锁降级的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public void processData () { readLock.lock(); if (!update) { readLock.unlock(); writeLock.lock(); try { if (!update) { update = true ; } readLock.lock(); } finally { writeLock.unlock(); } } try { } finally { readLock.unlock(); } }
5.5 LockSupport工具
LockSupport定义了一组的公共静态方法 ,这些方法提供了最基本的线程阻塞和唤醒功能 ,而LockSupport也成为构建同步组件的基础工具。
LockSupport定义了一组以park开头的方法 用来阻塞当前线程
以及unpark(Thread thread)方法 来唤醒一个被阻塞的线程 。
方法名称
描述
void park()
阻塞当前线程 ,如果调用unpark(Thread thread)方法或者当前线程被中断,才能从park()方法返回
void parkNanos(long nanos)
阻塞当前线程 ,最长不超过nanos纳秒,返回条件在park()
的基础上增加了超时返回
void parkUntil(long deadline)
阻塞当前线程 ,直到deadline
时间
void unpark(Thread thread)
唤醒处于阻塞状态的线程 thread
5.6 Condition接口
任意一个Java对象,都拥有一组监视器方法 (定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。
Condition接口 也提供了类似Object的监视器方法 ,与Lock配合可以实现等待/通知模式 ,但是这两者在使用方式以及功能特性上还是有差别的。
5.6.1 Condition接口与示例
Condition定义了等待/通知两种类型的方法,**当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。**Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的。
Condition
的使用方式如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void conditionWait () throws InterruptedException { lock.lock(); try { condition.await(); } finally { lock.unlock(); } } public void conditionSignal () throws InterruptedException { lock.lock(); try { condition.signal(); } finally { lock.unlock(); } }
如示例所示,一般都会将Condition对象作为成员变量。
当调用await()方法后 ,当前线程会释放锁并在此等待 ,
而其他线程调用Condition对象的signal()方法 ,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。
Condition定义的部分方法如下表
获取一个Condition必须通过Lock的newCondition()方法
下面通过一个有界队列的示例来深入了解Condition的使用方式。有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线程,直到队列出现“空位”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package ThreadPoolSample;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class BoundedQueue <T > { private Object[] items; private int addIndex, removeIndex, count; private Lock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public BoundedQueue (int size) { items = new Object[size]; } public void add (T t) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[addIndex] = t; if (++addIndex == items.length) addIndex = 0 ; ++count; notEmpty.signal(); }finally { lock.unlock(); } } public T remove () throws InterruptedException { lock.lock(); try { while (count == 0 ) notEmpty.await(); Object x = items[removeIndex]; if (++removeIndex == items.length) removeIndex = 0 ; --count; notFull.signal(); return (T)x; }finally { lock.unlock(); } } }
5.6.2 Condition的实现分析
每个Condition对象都包含着一个队列 (以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。
下面将分析Condition的实现 ,主要包括:等待队列、等待和通知,
5.6.2.1 等待队列
等待队列是一个FIFO的队列 ,在队列中的每个节点都包含了一个线程引用 ,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。
一个Condition包含一个等待队列,Condition拥有首节点 (firstWaiter)和尾节点 (lastWaiter)。
当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列
如图所示,Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列
5.6.2.2 等待
略
5.6.2.3 通知
略