在 java.util.concurrent.locks包中有很多Lock的实现类,常用的有ReentrantLock、 ReadWriteLock(实现类ReentrantReadWriteLock),其实现都依赖 java.util.concurrent.AbstractQueuedSynchronizer类,实现思路都大同小异。
1.AbstractQueuedSynchronizer
1.AQS主要的作用就是维护一个CLH线程阻塞队列(数据结构为双向链表,通过内部类Node实现,每个Node通过pred和next记录前后节点,并且还有个当前node的waitStatus)
2.CLH是一个FIFO队列,AQS中有记录队首(head)和队尾(tail)的字段,当向队列插入一个Node的时候,插入到tail后面,如果出队列的时候,修改head。
3.每次竞争锁的时候,从队列中取出head进行竞争(但不一定保证会取得锁,比如非公平锁)。
2.ReentrantLock
1.Sync ReentrantLock中有个抽象内部类Sync继承了AbstractQueuedSynchronizer, 而Sync又有2个实现子类FairSync和NonfairSync,而lock和和unlock方法主要是依靠Sync的具体实例来实现的。
2.1 lock()
以NonfairSync为例子
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
compareAndSetState(0, 1)主要是通过CAS的模式,如果当前没有竞争,则直接或者锁,否则,尝试获取(也就是acquire(1)方法)
。
然后我们再看看acquire(1)方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先通过tryAcquire()方法尝试获取,如果不能的话,则通过AddWaiter()方法,用当前线程生成一个Node放入队尾,而acquireQueued()则是一种自旋锁的实现方式。最后把当前线程interrupt。 AQS的实现精妙之一就是把tryAcquire延迟到子类去实现。因为公平锁和非公平锁的实现方式肯定是不一样。
非公平锁的tryAcquire()的函数代码是通过nonfairTryAcquire()方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error(Maximum lock count exceeded);
setState(nextc);
return true;
}
return false;
}
AQS中有个state字段,当没有线程竞争锁的即初始化的为0,主要目的是为了重入锁的实现。每次线程获取一个锁时候++,释放的时候--。 c==0 表示没有线程竞争该锁,如果c!=0并且当前线程就是获取锁的线程,只用修改state就行了。 acquireQueued()方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
看似就是一个无限循环,直到获得锁为止。其实shouldParkAfterFailedAcquire()方法中,通过前一个Node的waitStatus来判断是否应该把当前线程阻塞,阻塞是通过parkAndCheckInterrupt()中的LockSupport.park()实现 2.2 unlock unlock中也是通过release方法实现
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease和tryAcquire一样,也是延迟到子类(Sync)实现的。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
c==0的时候,才能成功释放锁,所以多次锁定就需要多次释放才能解锁。
释放锁之后,就会唤醒队列的一个node中的线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
这 段代码的意思在于找出第一个可以unpark的线程,一般说来head.next == head,Head就是第一个线程,但Head.next可能被取消或被置为null,因此比较稳妥的办法是从后往前找第一个可用线程。貌似回溯会导致性 能降低,其实这个发生的几率很小,所以不会有性能影响。