推薦閱讀:
多線程編程的核心
在前面,我們了解了多線程的底層運作機制,我們終於知道,原來多線程環境下存在着如此之多的問題。
在JDK5之前,我們只能選擇synchronized
關鍵字來實現鎖,而JDK5之后,由於volatile
關鍵字得到了升級,所以並發框架包便出現了,相比傳統的synchronized
關鍵字,我們對於鎖的實現,有了更多的選擇。
Doug Lea — JUC並發包的作者
如果IT的歷史,是以人為主體串接起來的話,那么肯定少不了Doug Lea。這個鼻梁掛着眼鏡,留着德王威廉二世的胡子,臉上永遠掛着謙遜靦腆笑容,服務於紐約州立大學Oswego分校計算機科學系的老大爺。
說他是這個世界上對Java影響力最大的一個人,一點也不為過。因為兩次Java歷史上的大變革,他都間接或直接的扮演了舉足輕重的角色。2004年所推出的Tiger。Tiger廣納了15項JSRs(Java Specification Requests)的語法及標准,其中一項便是JSR-166。JSR-166是來自於Doug編寫的util.concurrent包。
讓我們來感受一下,JUC為我們帶來了什么。
鎖框架
在JDK 5之后,並發包中新增了Lock接口(以及相關實現類)用來實現鎖功能,Lock接口提供了與synchronized關鍵字類似的同步功能,但需要在使用時手動獲取鎖和釋放鎖。
Lock和Condition接口
使用並發包中的鎖和我們傳統的synchronized
鎖不太一樣,這里的鎖我們可以認為是一把真正意義上的鎖,
每個鎖都是一個對應的鎖對象,我只需要向鎖對象獲取鎖或是釋放鎖即可。
我們首先來看看,此接口中定義了什么:
public interface Lock {
//獲取鎖,拿不到鎖會阻塞,等待其他線程釋放鎖,獲取到鎖后返回
void lock();
//同上,但是等待過程中會響應中斷
void lockInterruptibly() throws InterruptedException;
//嘗試獲取鎖,但是不會阻塞,如果能獲取到會返回true,不能返回false
boolean tryLock();
//嘗試獲取鎖,但是可以限定超時時間,如果超出時間還沒拿到鎖返回false,否則返回true,可以響應中斷
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//釋放鎖
void unlock();
//暫時可以理解為替代傳統的Object的wait()、notify()等操作的工具
Condition newCondition();
}
這里我們可以演示一下,如何使用Lock類來進行加鎖和釋放鎖操作:
public class Main {
private static int i = 0;
public static void main(String[] args) throws InterruptedException {
//可重入鎖ReentrantLock類是Lock類的一個實現
Lock testLock = new ReentrantLock();
Runnable action = () -> {
for (int j = 0; j < 100000; j++) { //還是以自增操作為例
//加鎖,加鎖成功后其他線程如果也要獲取鎖,會阻塞,等待當前線程釋放
testLock.lock();
i++;
//解鎖,釋放鎖之后其他線程就可以獲取這把鎖了(注意在這之前一定得加鎖,不然報錯)
testLock.unlock();
}
};
new Thread(action).start();
new Thread(action).start();
Thread.sleep(1000); //等上面兩個線程跑完
System.out.println(i);
}
}
可以看到,和我們之前使用synchronized
相比,我們這里是真正在操作一個"鎖"對象,
當我們需要加鎖時,只需要調用lock()
方法,而需要釋放鎖時,只需要調用unlock()
方法。
程序運行的最終結果和使用synchronized
鎖是一樣的。
那么,我們如何像傳統的加鎖那樣,調用對象的wait()
和notify()
方法呢,並發包提供了Condition接口:
public interface Condition {
//與調用鎖對象的wait方法一樣,會進入到等待狀態,
//但是這里需要調用Condition的signal或signalAll方法進行喚醒,
//等待狀態下是可以響應中斷的
void await() throws InterruptedException;
//同上,但不響應中斷(看名字都能猜到)
void awaitUninterruptibly();
//等待指定時間,如果在指定時間(納秒)內被喚醒,會返回剩余時間,如果超時,會返回0或負數,可以響應中斷
long awaitNanos(long nanosTimeout) throws InterruptedException;
//等待指定時間(可以指定時間單位),如果等待時間內被喚醒,返回true,否則返回false,可以響應中斷
boolean await(long time, TimeUnit unit) throws InterruptedException;
//可以指定一個明確的時間點,如果在時間點之前被喚醒,返回true,否則返回false,可以響應中斷
boolean awaitUntil(Date deadline) throws InterruptedException;
//喚醒一個處於等待狀態的線程,注意還得獲得鎖才能接着運行
void signal();
//同上,但是是喚醒所有等待線程
void signalAll();
}
演示一下:
public static void main(String[] args) throws InterruptedException {
Lock testLock = new ReentrantLock();
Condition condition = testLock.newCondition();
new Thread(() -> {
testLock.lock(); //和synchronized一樣,必須持有鎖的情況下才能使用await
System.out.println("線程1進入等待狀態!");
try {
condition.await(); //進入等待狀態
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程1等待結束!");
testLock.unlock();
}).start();
Thread.sleep(100); //防止線程2先跑
new Thread(() -> {
testLock.lock();
System.out.println("線程2開始喚醒其他等待線程");
condition.signal(); //喚醒線程1,但是此時線程1還必須要拿到鎖才能繼續運行
System.out.println("線程2結束");
testLock.unlock(); //這里釋放鎖之后,線程1就可以拿到鎖繼續運行了
}).start();
}
可以發現,Condition對象使用方法和傳統的對象使用差別不是很大。
思考:下面這種情況跟上面有什么不同?
public static void main(String[] args) throws InterruptedException {
Lock testLock = new ReentrantLock();
new Thread(() -> {
testLock.lock();
System.out.println("線程1進入等待狀態!");
try {
testLock.newCondition().await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程1等待結束!");
testLock.unlock();
}).start();
Thread.sleep(100);
new Thread(() -> {
testLock.lock();
System.out.println("線程2開始喚醒其他等待線程");
testLock.newCondition().signal();
System.out.println("線程2結束");
testLock.unlock();
}).start();
}
通過分析可以得到,在調用newCondition()
后,會生成一個新的Condition對象,
並且同一把鎖內是可以存在多個Condition對象的(實際上原始的鎖機制等待隊列只能有一個,而這里可以創建很多個Condition來實現多等待隊列),
而上面的例子中,實際上使用的是不同的Condition對象,只有對同一個Condition對象進行等待和喚醒操作才會有效,而不同的Condition對象是分開計算的。
最后我們再來講解一下時間單位,這是一個枚舉類,也是位於java.util.concurrent
包下:
public enum TimeUnit {
/**
* Time unit representing one thousandth of a microsecond
*/
NANOSECONDS {
public long toNanos(long d) { return d; }
public long toMicros(long d) { return d/(C1/C0); }
public long toMillis(long d) { return d/(C2/C0); }
public long toSeconds(long d) { return d/(C3/C0); }
public long toMinutes(long d) { return d/(C4/C0); }
public long toHours(long d) { return d/(C5/C0); }
public long toDays(long d) { return d/(C6/C0); }
public long convert(long d, TimeUnit u) { return u.toNanos(d); }
int excessNanos(long d, long m) { return (int)(d - (m*C2)); }
},
//....
可以看到時間單位有很多的,比如DAY
、SECONDS
、MINUTES
等,我們可以直接將其作為時間單位,比如我們要讓一個線程等待3秒鍾,可以像下面這樣編寫:
public static void main(String[] args) throws InterruptedException {
Lock testLock = new ReentrantLock();
new Thread(() -> {
testLock.lock();
try {
System.out.println("等待是否未超時:"+testLock.newCondition().await(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
testLock.unlock();
}).start();
}
當然,Lock類的tryLock方法也是支持使用時間單位的,各位可以自行進行測試。
TimeUnit除了可以作為時間單位表示以外,還可以在不同單位之間相互轉換:
public static void main(String[] args) throws InterruptedException {
System.out.println("60秒 = "+TimeUnit.SECONDS.toMinutes(60) +"分鍾");
System.out.println("365天 = "+TimeUnit.DAYS.toSeconds(365) +" 秒");
}
也可以更加便捷地使用對象的wait()
方法:
public static void main(String[] args) throws InterruptedException {
synchronized (Main.class) {
System.out.println("開始等待");
TimeUnit.SECONDS.timedWait(Main.class, 3); //直接等待3秒
System.out.println("等待結束");
}
}
我們也可以直接使用它來進行休眠操作:
public static void main(String[] args) throws InterruptedException {
TimeUnit.SECONDS.sleep(1); //休眠1秒鍾
}
可重入鎖
前面,我們講解了鎖框架的兩個核心接口,那么我們接着來看看鎖接口的具體實現類,
我們前面用到了ReentrantLock,它其實是鎖的一種,叫做可重入鎖,那么這個可重入代表的是什么意思呢?
簡單來說,就是同一個線程,可以反復進行加鎖操作:
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.lock(); //連續加鎖2次
new Thread(() -> {
System.out.println("線程2想要獲取鎖");
lock.lock();
System.out.println("線程2成功獲取到鎖");
}).start();
lock.unlock();
System.out.println("線程1釋放了一次鎖");
TimeUnit.SECONDS.sleep(1);
lock.unlock();
System.out.println("線程1再次釋放了一次鎖"); //釋放兩次后其他線程才能加鎖
}
可以看到,主線程連續進行了兩次加鎖操作(此操作是不會被阻塞的),
在當前線程持有鎖的情況下繼續加鎖不會被阻塞,並且,加鎖幾次,就必須要解鎖幾次,否則此線程依舊持有鎖。
我們可以使用getHoldCount()
方法查看當前線程的加鎖次數:
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.lock();
System.out.println("當前加鎖次數:"+lock.getHoldCount()+",是否被鎖:"+lock.isLocked());
TimeUnit.SECONDS.sleep(1);
lock.unlock();
System.out.println("當前加鎖次數:"+lock.getHoldCount()+",是否被鎖:"+lock.isLocked());
TimeUnit.SECONDS.sleep(1);
lock.unlock();
System.out.println("當前加鎖次數:"+lock.getHoldCount()+",是否被鎖:"+lock.isLocked());
}
可以看到,當鎖不再被任何線程持有時,值為0
,並且通過isLocked()
方法查詢結果為false
。
實際上,如果存在線程持有當前的鎖,那么其他線程在獲取鎖時,是會暫時進入到等待隊列的,我們可以通過getQueueLength()
方法獲取等待中線程數量的預估值:
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
lock.lock();
Thread t1 = new Thread(lock::lock), t2 = new Thread(lock::lock);;
t1.start();
t2.start();
TimeUnit.SECONDS.sleep(1);
System.out.println("當前等待鎖釋放的線程數:"+lock.getQueueLength());
System.out.println("線程1是否在等待隊列中:"+lock.hasQueuedThread(t1));
System.out.println("線程2是否在等待隊列中:"+lock.hasQueuedThread(t2));
System.out.println("當前線程是否在等待隊列中:"+lock.hasQueuedThread(Thread.currentThread()));
}
我們可以通過hasQueuedThread()
方法來判斷某個線程是否正在等待獲取鎖狀態。
同樣的,Condition也可以進行判斷:
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
}).start();
TimeUnit.SECONDS.sleep(1);
lock.lock();
System.out.println("當前Condition的等待線程數:"+lock.getWaitQueueLength(condition));
condition.signal();
System.out.println("當前Condition的等待線程數:"+lock.getWaitQueueLength(condition));
lock.unlock();
}
通過使用getWaitQueueLength()
方法能夠查看同一個Condition目前有多少線程處於等待狀態。
公平鎖與非公平鎖
前面我們了解了如果線程之間爭搶同一把鎖,會暫時進入到等待隊列中,
那么多個線程獲得鎖的順序是不是一定是根據線程調用lock()
方法時間來定的呢?
我們可以看到,ReentrantLock
的構造方法中,是這樣寫的:
public ReentrantLock() {
sync = new NonfairSync(); //看名字貌似是非公平的
}
其實鎖分為公平鎖和非公平鎖,默認我們創建出來的ReentrantLock是采用的非公平鎖作為底層鎖機制。
那么什么是公平鎖什么又是非公平鎖呢?
- 公平鎖:多個線程按照申請鎖的順序去獲得鎖,線程會直接進入隊列去排隊,永遠都是隊列的第一位才能得到鎖。
- 非公平鎖:多個線程去獲取鎖的時候,會直接去嘗試獲取,獲取不到,再去進入等待隊列,如果能獲取到,就直接獲取到鎖。
簡單來說,公平鎖不讓插隊,都老老實實排着;
非公平鎖讓插隊,但是排隊的人讓不讓你插隊就是另一回事了。
我們可以來測試一下公平鎖和非公平鎖的表現情況:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
這里我們選擇使用第二個構造方法,可以選擇是否為公平鎖實現:
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock(false);
Runnable action = () -> {
System.out.println("線程 "+Thread.currentThread().getName()+" 開始獲取鎖...");
lock.lock();
System.out.println("線程 "+Thread.currentThread().getName()+" 成功獲取鎖!");
lock.unlock();
};
for (int i = 0; i < 10; i++) { //建立10個線程
new Thread(action, "T"+i).start();
}
}
這里我們只需要對比將在1秒后開始獲取鎖...
和成功獲取鎖!
的順序是否一致即可,如果是一致,那說明所有的線程都是按順序排隊獲取的鎖,如果不是,那說明肯定是有線程插隊了。
運行結果可以發現,在公平模式下,確實是按照順序進行的,而在非公平模式下,一般會出現這種情況:線程剛開始獲取鎖馬上就能搶到,並且此時之前早就開始的線程還在等待狀態,很明顯的插隊行為。
那么,接着下一個問題,公平鎖在任何情況下都一定是公平的嗎?
到隊列同步器中再進行討論。
讀寫鎖過了就是隊列同步器AQS
讀寫鎖
除了可重入鎖之外,還有一種類型的鎖叫做讀寫鎖,當然它並不是專門用作讀寫操作的鎖,
它和可重入鎖不同的地方在於,可重入鎖是一種排他鎖,當一個線程得到鎖之后,另一個線程必須等待其釋放鎖,否則一律不允許獲取到鎖。
而讀寫鎖在同一時間,是可以讓多個線程獲取到鎖的,它其實就是針對於讀寫場景而出現的。
讀寫鎖維護了一個讀鎖和一個寫鎖,這兩個鎖的機制是不同的。
- 讀鎖:在沒有任何線程占用寫鎖的情況下,同一時間可以有多個線程加讀鎖。
- 寫鎖:在沒有任何線程占用讀鎖的情況下,同一時間只能有一個線程加寫鎖。
讀寫鎖也有一個專門的接口:
public interface ReadWriteLock {
//獲取讀鎖
Lock readLock();
//獲取寫鎖
Lock writeLock();
}
此接口有一個實現類ReentrantReadWriteLock(實現的是ReadWriteLock接口,不是Lock接口,它本身並不是鎖),注意我們操作ReentrantReadWriteLock時,不能直接上鎖,而是需要獲取讀鎖或是寫鎖,再進行鎖操作:
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.readLock().lock();
new Thread(lock.readLock()::lock).start();
}
這里我們對讀鎖加鎖,可以看到可以多個線程同時對讀鎖加鎖。
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.readLock().lock();
new Thread(lock.writeLock()::lock).start();
}
有讀鎖狀態下無法加寫鎖,反之亦然:
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.writeLock().lock();
new Thread(lock.readLock()::lock).start();
}
並且,ReentrantReadWriteLock不僅具有讀寫鎖的功能,還保留了可重入鎖和公平/非公平機制,比如同一個線程可以重復為寫鎖加鎖,並且必須全部解鎖才真正釋放鎖:
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.writeLock().lock();
lock.writeLock().lock();
new Thread(() -> {
lock.writeLock().lock();
System.out.println("成功獲取到寫鎖!");
}).start();
System.out.println("釋放第一層鎖!");
lock.writeLock().unlock();
TimeUnit.SECONDS.sleep(1);
System.out.println("釋放第二層鎖!");
lock.writeLock().unlock();
}
通過之前的例子來驗證公平和非公平:
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
Runnable action = () -> {
System.out.println("線程 "+Thread.currentThread().getName()+" 將在1秒后開始獲取鎖...");
lock.writeLock().lock();
System.out.println("線程 "+Thread.currentThread().getName()+" 成功獲取鎖!");
lock.writeLock().unlock();
};
for (int i = 0; i < 10; i++) { //建立10個線程
new Thread(action, "T"+i).start();
}
}
可以看到,結果是一致的。
鎖降級和鎖升級
鎖降級指的是寫鎖降級為讀鎖。
當一個線程持有寫鎖的情況下,雖然其他線程不能加讀鎖,但是線程自己是可以加讀鎖的:
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.writeLock().lock();
lock.readLock().lock();
System.out.println("成功加讀鎖!");
}
那么,如果我們在同時加了寫鎖和讀鎖的情況下,釋放寫鎖,是否其他的線程就可以一起加讀鎖了呢?
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.writeLock().lock();
lock.readLock().lock();
new Thread(() -> {
System.out.println("開始加讀鎖!");
lock.readLock().lock();
System.out.println("讀鎖添加成功!");
}).start();
TimeUnit.SECONDS.sleep(1);
lock.writeLock().unlock(); //如果釋放寫鎖,會怎么樣?
}
可以看到,一旦寫鎖被釋放,那么主線程就只剩下讀鎖了,因為讀鎖可以被多個線程共享,所以這時第二個線程也添加了讀鎖。
而這種操作,就被稱之為"鎖降級"(注意不是先釋放寫鎖再加讀鎖,而是持有寫鎖的情況下申請讀鎖再釋放寫鎖)
注意在僅持有讀鎖的情況下去申請寫鎖,屬於"鎖升級",ReentrantReadWriteLock是不支持的:
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.readLock().lock();
lock.writeLock().lock();
System.out.println("所升級成功!");
}
可以看到線程直接卡在加寫鎖的那一句了。
隊列同步器AQS
前面我們了解了可重入鎖和讀寫鎖,那么它們的底層實現原理到底是什么樣的呢?
比如我們執行了ReentrantLock的lock()
方法,那它的內部是怎么在執行的呢?
public void lock() {
sync.lock();
}
可以看到,它的內部實際上啥都沒做,而是交給了Sync對象在進行,並且,不只是這個方法,其他的很多方法都是依靠Sync對象在進行:
public void unlock() {
sync.release(1);
}
那么這個Sync對象是干什么的呢?
可以看到,公平鎖和非公平鎖都是繼承自Sync,而Sync是繼承自AbstractQueuedSynchronizer,簡稱隊列同步器:
abstract static class Sync extends AbstractQueuedSynchronizer {
//...
}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
所以,要了解它的底層到底是如何進行操作的,還得看隊列同步器,我們就先從這里下手吧!
底層實現
AbstractQueuedSynchronizer(下面稱為AQS)是實現鎖機制的基礎,它的內部封裝了包括鎖的獲取、釋放、以及等待隊列。
一個鎖(排他鎖為例)的基本功能就是:
獲取鎖、釋放鎖、當鎖被占用時,其他線程來爭搶會進入等待隊列,
AQS已經將這些基本的功能封裝完成了,
其中等待隊列是核心內容,等待隊列是由雙向鏈表數據結構實現的,
每個等待狀態下的線程都可以被封裝進結點中並放入雙向鏈表中,而對於雙向鏈表是以隊列的形式進行操作的,它像這樣:
AQS中有一個head
字段和一個tail
字段分別記錄雙向鏈表的頭結點和尾結點,而之后的一系列操作都是圍繞此隊列來進行的。我們先來了解一下每個結點都包含了哪些內容:
//每個處於等待狀態的線程都可以是一個節點,並且每個節點是有很多狀態的
static final class Node {
//每個節點都可以被分為獨占模式節點或是共享模式節點,分別適用於獨占鎖和共享鎖
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//等待狀態,這里都定義好了
//唯一一個大於0的狀態,表示已失效,可能是由於超時或中斷,此節點被取消。
static final int CANCELLED = 1;
//此節點后面的節點被掛起(進入等待狀態)
static final int SIGNAL = -1;
//在條件隊列中的節點才是這個狀態
static final int CONDITION = -2;
//傳播,一般用於共享鎖
static final int PROPAGATE = -3;
volatile int waitStatus; //等待狀態值
volatile Node prev; //雙向鏈表基操
volatile Node next;
volatile Thread thread; //每一個線程都可以被封裝進一個節點進入到等待隊列
Node nextWaiter; //在等待隊列中表示模式,條件隊列中作為下一個結點的指針
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
在一開始的時候,head
和tail
都是null
,state
為默認值0
:
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
不用擔心雙向鏈表不會進行初始化,初始化是在實際使用時才開始的,我們接着來看其他的初始化內容:
//直接使用Unsafe類進行操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
//記錄類中屬性的在內存中的偏移地址,方便Unsafe類直接操作內存進行賦值等(直接修改對應地址的內存)
private static final long stateOffset; //這里對應的就是AQS類中的state成員字段
private static final long headOffset; //這里對應的就是AQS類中的head頭結點成員字段
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static { //靜態代碼塊,在類加載的時候就會自動獲取偏移地址
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
//通過CAS操作來修改頭結點
private final boolean compareAndSetHead(Node update) {
//調用的是Unsafe類的compareAndSwapObject方法,通過CAS算法比較對象並替換
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
//同上,省略部分代碼
private final boolean compareAndSetTail(Node expect, Node update) {
private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
可以發現,隊列同步器由於要使用到CAS算法,所以,直接使用了Unsafe工具類,
Unsafe類中提供了CAS操作的方法(底層由C++實現)所有對AQS類中成員字段的修改,都有對應的CAS操作封裝。
現在我們大致了解了一下它的底層運作機制,
我們接着來看這個類是如何進行使用的,它提供了一些可重寫的方法(根據不同的鎖類型和機制,可以自由定制規則,並且為獨占式和非獨占式鎖都提供了對應的方法),
以及一些已經寫好的模板方法(模板方法會調用這些可重寫的方法),使用此類只需要將可重寫的方法進行重寫,並調用提供的模板方法,從而實現鎖功能(學習過設計模式會比較好理解一些)
我們首先來看可重寫方法:
//獨占式獲取同步狀態,查看同步狀態是否和參數一致,如果返沒有問題,那么會使用CAS操作設置同步狀態並返回true
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//獨占式釋放同步狀態
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//共享式獲取同步狀態,返回值大於0表示成功,否則失敗
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//共享式釋放同步狀態
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
//是否在獨占模式下被當前線程占用(鎖是否被當前線程持有)
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
可以看到,這些需要重寫的方法默認是直接拋出UnsupportedOperationException
,也就是說根據不同的鎖類型,我們需要去實現對應的方法,
我們可以來看一下ReentrantLock(此類是全局獨占式的)中的公平鎖是如何借助AQS實現的:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
//加鎖操作調用了模板方法acquire
//為了防止各位繞暈,請時刻記住,lock方法一定是在某個線程下為了加鎖而調用的,
//並且同一時間可能會有其他線程也在調用此方法
final void lock() {
acquire(1);
}
...
}
我們先看看加鎖操作干了什么事情,這里直接調用了AQS提供的模板方法acquire()
,我們來看看它在AQS類中的實現細節:
@ReservedStackAccess
//這個是JEP 270添加的新注解,它會保護被注解的方法,
//通過添加一些額外的空間,防止在多線程運行的時候出現棧溢出
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //節點為獨占模式Node.EXCLUSIVE
selfInterrupt();
}
首先會調用tryAcquire()
方法(這里是由FairSync類實現的),如果嘗試加獨占鎖失敗(返回false了)說明可能這個時候有其他線程持有了此獨占鎖,所以當前線程得先等着,那么會調用addWaiter()
方法將線程加入等待隊列中:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 先嘗試使用CAS直接入隊,
//如果這個時候其他線程也在入隊(就是不止一個線程在同一時間爭搶這把鎖)就進入enq()
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//此方法是CAS快速入隊失敗時調用
enq(node);
return node;
}
private Node enq(final Node node) {
//自旋形式入隊,可以看到這里是一個無限循環
for (;;) {
Node t = tail;
if (t == null) { //這種情況只能說明頭結點和尾結點都還沒初始化
if (compareAndSetHead(new Node())) //初始化頭結點和尾結點
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
//只有CAS成功的情況下,才算入隊成功,
//如果CAS失敗,那說明其他線程同一時間也在入隊,並且手速還比當前線程快,
//剛好走到CAS操作的時候,其他線程就先入隊了,
//那么這個時候node.prev就不是我們預期的節點了,
//而是另一個線程新入隊的節點,所以說得進下一次循環再來一次CAS,這種形式就是自旋
return t;
}
}
}
}
在了解了addWaiter()
方法會將節點加入等待隊列之后,我們接着來看,addWaiter()
會返回已經加入的節點,acquireQueued()
在得到返回的節點時,也會進入自旋狀態,等待喚醒(也就是開始進入到拿鎖的環節了):
@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //可以看到當此節點位於隊首(node.prev == head)時,會再次調用tryAcquire方法獲取鎖,如果獲取成功,會返回此過程中是否被中斷的值
setHead(node); //新的頭結點設置為當前結點
p.next = null; // 原有的頭結點沒有存在的意義了
failed = false; //沒有失敗
return interrupted; //直接返回等待過程中是否被中斷
}
//依然沒獲取成功,
if (shouldParkAfterFailedAcquire(p, node) && //將當前節點的前驅節點等待狀態設置為SIGNAL,如果失敗將直接開啟下一輪循環,直到成功為止,如果成功接着往下
parkAndCheckInterrupt()) //掛起線程進入等待狀態,等待被喚醒,如果在等待狀態下被中斷,那么會返回true,直接將中斷標志設為true,否則就是正常喚醒,繼續自旋
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //通過unsafe類操作底層掛起線程(會直接進入阻塞狀態)
return Thread.interrupted();
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true; //已經是SIGNAL,直接true
if (ws > 0) { //不能是已經取消的節點,必須找到一個沒被取消的
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node; //直接拋棄被取消的節點
} else {
//不是SIGNAL,先CAS設置為SIGNAL(這里沒有返回true因為CAS不一定成功,需要下一輪再判斷一次)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false; //返回false,馬上開啟下一輪循環
}
所以,acquire()
中的if條件如果為true,那么只有一種情況,就是等待過程中被中斷了,其他任何情況下都是成功獲取到獨占鎖,所以當等待過程被中斷時,會調用selfInterrupt()
方法:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
這里就是直接向當前線程發送中斷信號了。
上面提到了LockSupport類,它是一個工具類,我們也可以來玩一下這個park
和unpark
:
public static void main(String[] args) throws InterruptedException {
Thread t = Thread.currentThread(); //先拿到主線程的Thread對象
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("主線程可以繼續運行了!");
LockSupport.unpark(t);
//t.interrupt(); 發送中斷信號也可以恢復運行
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("主線程被掛起!");
LockSupport.park();
System.out.println("主線程繼續運行!");
}
接着我們來看公平鎖的tryAcquire()
方法:
static final class FairSync extends Sync {
//可重入獨占鎖的公平實現
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread(); //先獲取當前線程的Thread對象
int c = getState(); //獲取當前AQS對象狀態(獨占模式下0為未占用,大於0表示已占用)
if (c == 0) { //如果是0,那就表示沒有占用,現在我們的線程就要來嘗試占用它
if (!hasQueuedPredecessors() && //等待隊列是否不為空且當前線程沒有拿到鎖,其實就是看看當前線程有沒有必要進行排隊,如果沒必要排隊,就說明可以直接獲取鎖
compareAndSetState(0, acquires)) { //CAS設置狀態,如果成功則說明成功拿到了這把鎖,失敗則說明可能這個時候其他線程在爭搶,並且還比你先搶到
setExclusiveOwnerThread(current); //成功拿到鎖,會將獨占模式所有者線程設定為當前線程(這個方法是父類AbstractOwnableSynchronizer中的,就表示當前這把鎖已經是這個線程的了)
return true; //占用鎖成功,返回true
}
}
else if (current == getExclusiveOwnerThread()) { //如果不是0,那就表示被線程占用了,這個時候看看是不是自己占用的,如果是,由於是可重入鎖,可以繼續加鎖
int nextc = c + acquires; //多次加鎖會將狀態值進行增加,狀態值就是加鎖次數
if (nextc < 0) //加到int值溢出了?
throw new Error("Maximum lock count exceeded");
setState(nextc); //設置為新的加鎖次數
return true;
}
return false; //其他任何情況都是加鎖失敗
}
}
在了解了公平鎖的實現之后,是不是感覺有點恍然大悟的感覺,雖然整個過程非常復雜,但是只要理清思路,還是比較簡單的。
加鎖過程已經OK,我們接着來看,它的解鎖過程,unlock()
方法是在AQS中實現的:
public void unlock() {
sync.release(1); //直接調用了AQS中的release方法,參數為1表示解鎖一次state值-1
}
@ReservedStackAccess
public final boolean release(int arg) {
if (tryRelease(arg)) { //和tryAcquire一樣,也得子類去重寫,釋放鎖操作
Node h = head; //釋放鎖成功后,獲取新的頭結點
if (h != null && h.waitStatus != 0) //如果新的頭結點不為空並且不是剛剛建立的結點(初始狀態下status為默認值0,而上面在進行了shouldParkAfterFailedAcquire之后,會被設定為SIGNAL狀態,值為-1)
unparkSuccessor(h); //喚醒頭節點下一個節點中的線程
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 將等待狀態waitStatus設置為初始值0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//獲取下一個結點
Node s = node.next;
if (s == null || s.waitStatus > 0) { //如果下一個結點為空或是等待狀態是已取消,那肯定是不能通知unpark的,這時就要遍歷所有節點再另外找一個符合unpark要求的節點了
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) //這里是從隊尾向前,因為enq()方法中的t.next = node是在CAS之后進行的,而 node.prev = t 是CAS之前進行的,所以從后往前一定能夠保證遍歷所有節點
if (t.waitStatus <= 0)
s = t;
}
if (s != null) //要是找到了,就直接unpark,要是還是沒找到,那就算了
LockSupport.unpark(s.thread);
}
那么我們來看看tryRelease()
方法是怎么實現的,具體實現在Sync中:
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //先計算本次解鎖之后的狀態值
if (Thread.currentThread() != getExclusiveOwnerThread()) //因為是獨占鎖,那肯定這把鎖得是當前線程持有才行
throw new IllegalMonitorStateException(); //否則直接拋異常
boolean free = false;
if (c == 0) { //如果解鎖之后的值為0,表示已經完全釋放此鎖
free = true;
setExclusiveOwnerThread(null); //將獨占鎖持有線程設置為null
}
setState(c); //狀態值設定為c
return free; //如果不是0表示此鎖還沒完全釋放,返回false,是0就返回true
}
綜上,我們來畫一個完整的流程圖:
這里我們只講解了公平鎖。
公平鎖一定公平嗎?
前面我們講解了公平鎖的實現原理,那么,我們嘗試分析一下,在並發的情況下,公平鎖一定公平嗎?
我們再次來回顧一下tryAcquire()
方法的實現:
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //注意這里,公平鎖的機制是,一開始會查看是否有節點處於等待
compareAndSetState(0, acquires)) { //如果前面的方法執行后發現沒有等待節點,就直接進入占鎖環節了
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
所以hasQueuedPredecessors()
這個環節容不得半點閃失,否則會直接破壞掉公平性,假如現在出現了這樣的情況:
線程1已經持有鎖了,這時線程2來爭搶這把鎖,走到hasQueuedPredecessors()
,判斷出為 false
,線程2繼續運行,然后線程2肯定獲取鎖失敗(因為鎖這時是被線程1占有的),因此就進入到等待隊列中:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 線程2進來之后,肯定是要先走這里的,因為head和tail都是null
if (compareAndSetHead(new Node()))
tail = head; //這里就將tail直接等於head了,注意這里完了之后還沒完,這里只是初始化過程
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) { //由於一開始head和tail都是null,所以線程2直接就進enq()了
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); //請看上面
return node;
}
而碰巧不巧,這個時候線程3也來搶鎖了,按照正常流程走到了hasQueuedPredecessors()
方法,而在此方法中:
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//這里直接判斷h != t,而此時線程2才剛剛執行完 tail = head,所以直接就返回false了
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
因此,線程3這時就緊接着准備開始CAS操作了,又碰巧,這時線程1釋放鎖了,現在的情況就是,線程3直接開始CAS判斷,而線程2還在插入節點狀態,結果可想而知,居然是線程3先拿到了鎖,這顯然是違背了公平鎖的公平機制。
一張圖就是:
因此公不公平全看hasQueuedPredecessors()
,而此方法只有在等待隊列中存在節點時才能保證不會出現問題。所以公平鎖,只有在等待隊列存在節點時,才是真正公平的。
Condition實現原理
通過前面的學習,我們知道Condition類實際上就是用於代替傳統對象的wait/notify操作的,
同樣可以實現等待/通知模式,並且同一把鎖下可以創建多個Condition對象。
那么我們接着來看看,它又是如何實現的呢,我們先從單個Condition對象進行分析:
在AQS中,Condition有一個實現類ConditionObject,而這里也是使用了鏈表實現了條件隊列:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 條件隊列的頭結點 */
private transient Node firstWaiter;
/** 條件隊列的尾結點 */
private transient Node lastWaiter;
//...
這里是直接使用了AQS中的Node類,但是使用的是Node類中的nextWaiter字段連接節點,並且Node的status為CONDITION:
我們知道,當一個線程調用await()
方法時,會進入等待狀態,直到其他線程調用signal()
方法將其喚醒,而這里的條件隊列,正是用於存儲這些處於等待狀態的線程。
我們先來看看最關鍵的await()
方法是如何實現的,為了防止一會繞暈,在開始之前,我們先明確此方法的目標:
- 只有已經持有鎖的線程才可以使用此方法
- 當調用此方法后,會直接釋放鎖,無論加了多少次鎖
- 只有其他線程調用
signal()
或是被中斷時才會喚醒等待中的線程 - 被喚醒后,需要等待其他線程釋放鎖,拿到鎖之后才可以繼續執行,並且會恢復到之前的狀態(await之前加了幾層鎖喚醒后依然是幾層鎖)
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); //如果在調用await之前就被添加了中斷標記,那么會直接拋出中斷異常
Node node = addConditionWaiter(); //為當前線程創建一個新的節點,並將其加入到條件隊列中
int savedState = fullyRelease(node); //完全釋放當前線程持有的鎖,並且保存一下state值,因為喚醒之后還得恢復
int interruptMode = 0; //用於保存中斷狀態
while (!isOnSyncQueue(node)) { //循環判斷是否位於同步隊列中,如果等待狀態下的線程被其他線程喚醒,那么會正常進入到AQS的等待隊列中(之后我們會講)
LockSupport.park(this); //如果依然處於等待狀態,那么繼續掛起
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //看看等待的時候是不是被中斷了
break;
}
//出了循環之后,那線程肯定是已經醒了,這時就差拿到鎖就可以恢復運行了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //直接開始acquireQueued嘗試拿鎖(之前已經講過了)從這里開始基本就和一個線程去搶鎖是一樣的了
interruptMode = REINTERRUPT;
//已經拿到鎖了,基本可以開始繼續運行了,這里再進行一下后期清理工作
if (node.nextWaiter != null)
unlinkCancelledWaiters(); //將等待隊列中,不是Node.CONDITION狀態的節點移除
if (interruptMode != 0) //依然是響應中斷
reportInterruptAfterWait(interruptMode);
//OK,接着該干嘛干嘛
}
實際上await()
方法比較中規中矩,大部分操作也在我們的意料之中,那么我們接着來看signal()
方法是如何實現的,同樣的,為了防止各位繞暈,先明確signal的目標:
- 只有持有鎖的線程才能喚醒鎖所屬的Condition等待的線程
- 優先喚醒條件隊列中的第一個,如果喚醒過程中出現問題,接着找往下找,直到找到一個可以喚醒的
- 喚醒操作本質上是將條件隊列中的結點直接丟進AQS等待隊列中,讓其參與到鎖的競爭中
- 拿到鎖之后,線程才能恢復運行
public final void signal() {
if (!isHeldExclusively()) //先看看當前線程是不是持有鎖的狀態
throw new IllegalMonitorStateException(); //不是?那你不配喚醒別人
Node first = firstWaiter; //獲取條件隊列的第一個結點
if (first != null) //如果隊列不為空,獲取到了,那么就可以開始喚醒操作
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null) //如果當前節點在本輪循環沒有后繼節點了,條件隊列就為空了
lastWaiter = null; //所以這里相當於是直接清空
first.nextWaiter = null; //將給定節點的下一個結點設置為null,因為當前結點馬上就會離開條件隊列了
} while (!transferForSignal(first) && //接着往下看
(first = firstWaiter) != null); //能走到這里只能說明給定節點被設定為了取消狀態,那就繼續看下一個結點
}
final boolean transferForSignal(Node node) {
/*
* 如果這里CAS失敗,那有可能此節點被設定為了取消狀態
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//CAS成功之后,結點的等待狀態就變成了默認值0,接着通過enq方法直接將節點丟進AQS的等待隊列中,相當於喚醒並且可以等待獲取鎖了
//這里enq方法返回的是加入之后等待隊列隊尾的前驅節點,就是原來的tail
Node p = enq(node);
int ws = p.waitStatus; //保存前驅結點的等待狀態
//如果上一個節點的狀態為取消, 或者嘗試設置上一個節點的狀態為SIGNAL失敗(可能是在ws>0判斷完之后馬上變成了取消狀態,導致CAS失敗)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); //直接喚醒線程
return true;
}
其實最讓人不理解的就是倒數第二行,明明上面都正常進入到AQS等待隊列了,應該是可以開始走正常流程了,那么這里為什么還要提前來一次unpark呢?
這里其實是為了進行優化而編寫,直接unpark會有兩種情況:
- 如果插入結點前,AQS等待隊列的隊尾節點就已經被取消,則滿足wc > 0
- 如果插入node后,AQS內部等待隊列的隊尾節點已經穩定,滿足tail.waitStatus == 0,但在執行ws >
0之后!compareAndSetWaitStatus(p, ws,
Node.SIGNAL)之前被取消,則CAS也會失敗,滿足compareAndSetWaitStatus(p, ws,
Node.SIGNAL) == false
如果這里被提前unpark,那么在await()
方法中將可以被直接喚醒,並跳出while循環,直接開始爭搶鎖,因為前一個等待結點是被取消的狀態,沒有必要再等它了。
所以,大致流程下:
只要把整個流程理清楚,還是很好理解的。
自行實現鎖類
既然前面了解了那么多AQS的功能,那么我就仿照着這些鎖類來實現一個簡單的鎖:
- 要求:同一時間只能有一個線程持有鎖,不要求可重入(反復加鎖無視即可)
public class Main {
public static void main(String[] args) throws InterruptedException {
}
/**
* 自行實現一個最普通的獨占鎖
* 要求:同一時間只能有一個線程持有鎖,不要求可重入
*/
private static class MyLock implements Lock {
/**
* 設計思路:
* 1. 鎖被占用,那么exclusiveOwnerThread應該被記錄,並且state = 1
* 2. 鎖沒有被占用,那么exclusiveOwnerThread為null,並且state = 0
*/
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if(isHeldExclusively()) return true; //無需可重入功能,如果是當前線程直接返回true
if(compareAndSetState(0, arg)){ //CAS操作進行狀態替換
setExclusiveOwnerThread(Thread.currentThread()); //成功后設置當前的所有者線程
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if(getState() == 0)
throw new IllegalMonitorStateException(); //沒加鎖情況下是不能直接解鎖的
if(isHeldExclusively()){ //只有持有鎖的線程才能解鎖
setExclusiveOwnerThread(null); //設置所有者線程為null
setState(0); //狀態變為0
return true;
}
return false;
}
@Override
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
protected Condition newCondition(){
return new ConditionObject(); //直接用現成的
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
}
原子類
前面我們說到,如果要保證i++
的原子性,那么我們的唯一選擇就是加鎖,那么,除了加鎖之外,還有沒有其他更好的解決方法呢?
JUC為我們提供了原子類,底層采用CAS算法,它是一種用法簡單、性能高效、線程安全地更新變量的方式。
所有的原子類都位於java.util.concurrent.atomic
包下。
原子類介紹
常用基本數據類,有對應的原子類封裝:
- AtomicInteger:原子更新int
- AtomicLong:原子更新long
- AtomicBoolean:原子更新boolean
那么,原子類和普通的基本類在使用上有沒有什么區別呢?我們先來看正常情況下使用一個基本類型:
public class Main {
public static void main(String[] args) {
int i = 1;
System.out.println(i++);
}
}
現在我們使用int類型對應的原子類,要實現同樣的代碼該如何編寫:
public class Main {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(1);
System.out.println(i.getAndIncrement()); //如果想實現i += 2這種操作,可以使用 addAndGet() 自由設置delta 值
}
}
我們可以將int數值封裝到此類中(注意必須調用構造方法,它不像Integer那樣有裝箱機制),並且通過調用此類提供的方法來獲取或是對封裝的int值進行自增,乍一看,這不就是基本類型包裝類嘛,有啥高級的。
確實,還真有包裝類那味,但是它可不僅僅是簡單的包裝,它的自增操作是具有原子性的:
public class Main {
private static AtomicInteger i = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
Runnable r = () -> {
for (int j = 0; j < 100000; j++)
i.getAndIncrement();
System.out.println("自增完成!");
};
new Thread(r).start();
new Thread(r).start();
TimeUnit.SECONDS.sleep(1);
System.out.println(i.get());
}
}
同樣是直接進行自增操作,我們發現,使用原子類是可以保證自增操作原子性的,就跟我們前面加鎖一樣。怎么會這么神奇?
我們來看看它的底層是如何實現的,直接從構造方法點進去:
private volatile int value;
public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() {
}
可以看到,它的底層是比較簡單的,其實本質上就是封裝了一個volatile
類型的int值,這樣能夠保證可見性,在CAS操作的時候不會出現問題。
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
可以看到最上面是和AQS采用了類似的機制,因為要使用CAS算法更新value的值,所以得先計算出value字段在對象中的偏移地址,CAS直接修改對應位置的內存即可(可見Unsafe類的作用巨大,很多的底層操作都要靠它來完成)
接着我們來看自增操作是怎么在運行的:
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
可以看到這里調用了unsafe.getAndAddInt()
,套娃時間到,我們接着看看Unsafe里面寫了什么:
public final int getAndAddInt(Object o, long offset, int delta) { //delta就是變化的值,++操作就是自增1
int v;
do {
//volatile版本的getInt()
//能夠保證可見性
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta)); //這里是開始cas替換int的值,每次都去拿最新的值去進行替換,如果成功則離開循環,不成功說明這個時候其他線程先修改了值,就進下一次循環再獲取最新的值然后再cas一次,直到成功為止
return v;
}
可以看到這是一個do-while
循環,那么這個循環在做一個什么事情呢?感覺就和我們之前講解的AQS隊列中的機制差不多,也是采用自旋形式,來不斷進行CAS操作,直到成功。
可見,原子類底層也是采用了CAS算法來保證的原子性,包括getAndSet
、getAndAdd
等方法都是這樣。原子類也直接提供了CAS操作方法,我們可以直接使用:
public static void main(String[] args) throws InterruptedException {
AtomicInteger integer = new AtomicInteger(10);
System.out.println(integer.compareAndSet(30, 20));
System.out.println(integer.compareAndSet(10, 20));
System.out.println(integer);
}
如果想以普通變量的方式來設定值,那么可以使用lazySet()
方法,這樣就不采用volatile
的立即可見機制了。
AtomicInteger integer = new AtomicInteger(1);
integer.lazySet(2);
除了基本類有原子類以外,基本類型的數組類型也有原子類:
- AtomicIntegerArray:原子更新int數組
- AtomicLongArray:原子更新long數組
- AtomicReferenceArray:原子更新引用數組
其實原子數組和原子類型一樣的,不過我們可以對數組內的元素進行原子操作:
public static void main(String[] args) throws InterruptedException {
AtomicIntegerArray array = new AtomicIntegerArray(new int[]{0, 4, 1, 3, 5});
Runnable r = () -> {
for (int i = 0; i < 100000; i++)
array.getAndAdd(0, 1);
};
new Thread(r).start();
new Thread(r).start();
TimeUnit.SECONDS.sleep(1);
System.out.println(array.get(0));
}
在JDK8之后,新增了DoubleAdder
和LongAdder
,
在高並發情況下,LongAdder
的性能比AtomicLong
的性能更好,主要體現在自增上,
它的大致原理如下:
在低並發情況下,和AtomicLong
是一樣的,對value值進行CAS操作,
但是出現高並發的情況時,AtomicLong
會進行大量的循環操作來保證同步,
而LongAdder
會將對value值的CAS操作分散為對數組cells
中多個元素的CAS操作(內部維護一個Cell[] as數組,每個Cell里面有一個初始值為0的long型變量,
在高並發時會進行分散CAS,就是不同的線程可以對數組中不同的元素進行CAS自增,這樣就避免了所有線程都對同一個值進行CAS),只需要最后再將結果加起來即可。
使用如下:
public static void main(String[] args) throws InterruptedException {
LongAdder adder = new LongAdder();
Runnable r = () -> {
for (int i = 0; i < 100000; i++)
adder.add(1);
};
for (int i = 0; i < 100; i++)
new Thread(r).start(); //100個線程
TimeUnit.SECONDS.sleep(1);
System.out.println(adder.sum()); //最后求和即可
}
兩者的性能對比(這里用到了CountDownLatch):
public class Main {
public static void main(String[] args) throws InterruptedException {
System.out.println("使用AtomicLong的時間消耗:"+test2()+"ms");
System.out.println("使用LongAdder的時間消耗:"+test1()+"ms");
}
private static long test1() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(100);
LongAdder adder = new LongAdder();
long timeStart = System.currentTimeMillis();
Runnable r = () -> {
for (int i = 0; i < 100000; i++)
adder.add(1);
latch.countDown();
};
for (int i = 0; i < 100; i++)
new Thread(r).start();
latch.await();
return System.currentTimeMillis() - timeStart;
}
private static long test2() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(100);
AtomicLong atomicLong = new AtomicLong();
long timeStart = System.currentTimeMillis();
Runnable r = () -> {
for (int i = 0; i < 100000; i++)
atomicLong.incrementAndGet();
latch.countDown();
};
for (int i = 0; i < 100; i++)
new Thread(r).start();
latch.await();
return System.currentTimeMillis() - timeStart;
}
}
除了對基本數據類型支持原子操作外,對於引用類型,也是可以實現原子操作的:
public static void main(String[] args) throws InterruptedException {
String a = "Hello";
String b = "World";
AtomicReference<String> reference = new AtomicReference<>(a);
reference.compareAndSet(a, b);
System.out.println(reference.get());
}
JUC還提供了字段原子更新器,可以對類中的某個指定字段進行原子操作(注意字段必須添加volatile關鍵字):
public class Main {
public static void main(String[] args) throws InterruptedException {
Student student = new Student();
AtomicIntegerFieldUpdater<Student> fieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(Student.class, "age");
System.out.println(fieldUpdater.incrementAndGet(student));
}
public static class Student{
volatile int age;
}
}
ABA問題及解決方案
我們來想象一下這種場景:
線程1和線程2同時開始對a
的值進行CAS修改,但是線程1的速度比較快,將a的值修改為2之后緊接着又修改回1,這時線程2才開始進行判斷,發現a的值是1,所以CAS操作成功。
很明顯,這里的1已經不是一開始的那個1了,而是被重新賦值的1,這也是CAS操作存在的問題(無鎖雖好,但是問題多多),它只會機械地比較當前值是不是預期值,但是並不會關心當前值是否被修改過,這種問題稱之為ABA
問題。
那么如何解決這種ABA
問題呢,JUC提供了帶版本號的引用類型,只要每次操作都記錄一下版本號,並且版本號不會重復,那么就可以解決ABA問題了:
public static void main(String[] args) throws InterruptedException {
String a = "Hello";
String b = "World";
AtomicStampedReference<String> reference = new AtomicStampedReference<>(a, 1); //在構造時需要指定初始值和對應的版本號
reference.attemptStamp(a, 2); //可以中途對版本號進行修改,注意要填寫當前的引用對象
System.out.println(reference.compareAndSet(a, b, 2, 3)); //CAS操作時不僅需要提供預期值和修改值,還要提供預期版本號和新的版本號
}
並發容器
傳統容器線程安全嗎
我們來測試一下,100個線程同時向ArrayList中添加元素會怎么樣:
public class Main {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
Runnable r = () -> {
for (int i = 0; i < 100; i++)
list.add("lbwnb");
};
for (int i = 0; i < 100; i++)
new Thread(r).start();
TimeUnit.SECONDS.sleep(1);
System.out.println(list.size());
}
}
不出意外的話,肯定是會報錯的:
Exception in thread "Thread-0" java.lang.ArrayIndexOutOfBoundsException: 73
at java.util.ArrayList.add(ArrayList.java:465)
at com.test.Main.lambda$main$0(Main.java:13)
at java.lang.Thread.run(Thread.java:750)
Exception in thread "Thread-19" java.lang.ArrayIndexOutOfBoundsException: 1851
at java.util.ArrayList.add(ArrayList.java:465)
at com.test.Main.lambda$main$0(Main.java:13)
at java.lang.Thread.run(Thread.java:750)
9773
那么我們來看看報的什么錯,從棧追蹤信息可以看出,是add方法出現了問題:
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e; //這一句出現了數組越界
return true;
}
也就是說,同一時間其他線程也在瘋狂向數組中添加元素,那么這個時候有可能在ensureCapacityInternal
(確認容量足夠)執行之后,elementData[size++] = e;
執行之前,其他線程插入了元素,導致size的值超出了數組容量。這些在單線程的情況下不可能發生的問題,在多線程下就慢慢出現了。
我們再來看看比較常用的HashMap呢?
public static void main(String[] args) throws InterruptedException {
Map<Integer, String> map = new HashMap<>();
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
for (int j = 0; j < 100; j++)
map.put(finalI * 1000 + j, "lbwnb");
}).start();
}
TimeUnit.SECONDS.sleep(2);
System.out.println(map.size());
}
經過測試發現,雖然沒有報錯,但是最后的結果並不是我們期望的那樣,實際上它還有可能導致Entry對象出現環狀數據結構,引起死循環。
所以,在多線程環境下,要安全地使用集合類,我們得找找解決方案了。
並發容器介紹
怎么才能解決並發情況下的容器問題呢?
我們首先想到的肯定是給方法前面加個synchronzed
關鍵字,這樣總不會搶了吧,在之前我們可以使用Vector或是Hashtable來解決,但是它們的效率實在是太低了,完全依靠鎖來解決問題,因此現在已經很少再使它們了
JUC提供了專用於並發場景下的容器,比如我們剛剛使用的ArrayList,在多線程環境下是沒辦法使用的,我們可以將其替換為JUC提供的多線程專用集合類:
public static void main(String[] args) throws InterruptedException {
List<String> list = new CopyOnWriteArrayList<>(); //這里使用CopyOnWriteArrayList來保證線程安全
Runnable r = () -> {
for (int i = 0; i < 100; i++)
list.add("lbwnb");
};
for (int i = 0; i < 100; i++)
new Thread(r).start();
TimeUnit.SECONDS.sleep(1);
System.out.println(list.size());
}
我們發現,使用了CopyOnWriteArrayList
之后,再沒出現過上面的問題。
那么它是如何實現的呢,我們先來看看它是如何進行add()
操作的:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); //直接加鎖,保證同一時間只有一個線程進行添加操作
try {
Object[] elements = getArray(); //獲取當前存儲元素的數組
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); //直接復制一份數組
newElements[len] = e; //修改復制出來的數組
setArray(newElements); //將元素數組設定為復制出來的數組
return true;
} finally {
lock.unlock();
}
}
可以看到添加操作是直接上鎖,並且會先拷貝一份當前存放元素的數組,然后對數組進行修改,再將此數組替換(CopyOnWrite)接着我們來看讀操作:
public E get(int index) {
return get(getArray(), index);
}
因此,CopyOnWriteArrayList
對於讀操作不加鎖,而對於寫操作是加鎖的,類似於我們前面講解的讀寫鎖機制,這樣就可以保證不丟失讀性能的情況下,寫操作不會出現問題。
接着我們來看對於HashMap的並發容器ConcurrentHashMap
:
public static void main(String[] args) throws InterruptedException {
Map<Integer, String> map = new ConcurrentHashMap<>();
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
for (int j = 0; j < 100; j++)
map.put(finalI * 100 + j, "lbwnb");
}).start();
}
TimeUnit.SECONDS.sleep(1);
System.out.println(map.size());
}
可以看到這里的ConcurrentHashMap就沒有出現之前HashMap的問題了。
因為線程之間會爭搶同一把鎖,我們之前在講解LongAdder的時候學習到了一種壓力分散思想,
既然每個線程都想搶鎖,那我就干脆多搞幾把鎖,讓你們每個人都能拿到,這樣就不會存在等待的問題了,
而JDK7之前,ConcurrentHashMap的原理也比較類似,它將所有數據分為一段一段地存儲,先分很多段出來,每一段都給一把鎖,當一個線程占鎖訪問時,只會占用其中一把鎖,也就是僅僅鎖了一小段數據,而其他段的數據依然可以被其他線程正常訪問。
這里我們重點講解JDK8之后它是怎么實現的,它采用了CAS算法配合鎖機制實現,我們先來回顧一下JDK8下的HashMap是什么樣的結構:
HashMap就是利用了哈希表,
哈希表的本質其實就是一個用於存放后續節點的頭結點的數組,數組里面的每一個元素都是一個頭結點(也可以說就是一個鏈表),當要新插入一個數據時,會先計算該數據的哈希值,找到數組下標,然后創建一個新的節點,添加到對應的鏈表后面。
當鏈表的長度達到8時,會自動將鏈表轉換為紅黑樹,這樣能使得原有的查詢效率大幅度降低!當使用紅黑樹之后,我們就可以利用二分搜索的思想,快速地去尋找我們想要的結果,而不是像鏈表一樣挨個去看。
由於ConcurrentHashMap的源碼比較復雜,所以我們先從最簡單的構造方法開始下手:
我們發現,它的構造方法和HashMap的構造方法有很大的出入,但是大體的結構和HashMap是差不多的,也是維護了一個哈希表,並且哈希表中存放的是鏈表或是紅黑樹,所以我們直接來看put()
操作是如何實現的,只要看明白這個,基本上就懂了:
public V put(K key, V value) {
return putVal(key, value, false);
}
//有點小亂,如果看着太亂,可以在IDEA中折疊一下代碼塊,不然有點難受
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException(); //鍵值不能為空,基操
int hash = spread(key.hashCode()); //計算鍵的hash值,用於確定在哈希表中的位置
int binCount = 0; //一會用來記錄鏈表長度的,忽略
for (Node<K,V>[] tab = table;;) { //無限循環,而且還是並發包中的類,盲猜一波CAS自旋鎖
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); //如果數組(哈希表)為空肯定是要進行初始化的,然后再重新進下一輪循環
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果哈希表該位置為null,直接CAS插入結點作為頭結即可(注意這里會將f設置當前哈希表位置上的頭結點)
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // 如果CAS成功,直接break結束put方法,失敗那就繼續下一輪循環
} else if ((fh = f.hash) == MOVED) //頭結點哈希值為-1,這里只需要知道是因為正在擴容即可
tab = helpTransfer(tab, f); //幫助進行遷移,完事之后再來下一次循環
else { //特殊情況都完了,這里就該是正常情況了,
V oldVal = null;
synchronized (f) { //在前面的循環中f肯定是被設定為了哈希表某個位置上的頭結點,這里直接把它作為鎖加鎖了,防止同一時間其他線程也在操作哈希表中這個位置上的鏈表或是紅黑樹
if (tabAt(tab, i) == f) {
if (fh >= 0) { //頭結點的哈希值大於等於0說明是鏈表,下面就是針對鏈表的一些列操作
...實現細節略
} else if (f instanceof TreeBin) { //肯定不大於0,肯定也不是-1,還判斷是不是TreeBin,所以不用猜了,肯定是紅黑樹,下面就是針對紅黑樹的情況進行操作
//在ConcurrentHashMap並不是直接存儲的TreeNode,而是TreeBin
...實現細節略
}
}
}
//根據鏈表長度決定是否要進化為紅黑樹
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); //注意這里只是可能會進化為紅黑樹,如果當前哈希表的長度小於64,它會優先考慮對哈希表進行擴容
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
怎么樣,是不是感覺看着挺復雜,其實也還好,總結一下就是:
我們接着來看看get()
操作:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); //計算哈希值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果頭結點就是我們要找的,那直接返回值就行了
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//要么是正在擴容,要么就是紅黑樹,負數只有這兩種情況
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//確認無誤,肯定在列表里,開找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
//沒找到只能null了
return null;
}
綜上,ConcurrentHashMap的put操作,實際上是對哈希表上的所有頭結點元素分別加鎖,
理論上來說哈希表的長度很大程度上決定了ConcurrentHashMap在同一時間能夠處理的線程數量,
這也是為什么treeifyBin()
會優先考慮為哈希表進行擴容的原因。
顯然,這種加鎖方式比JDK7的分段鎖機制性能更好。
其實這里也只是簡單地介紹了一下它的運行機制,ConcurrentHashMap真正的難點在於擴容和遷移操作,我們主要了解的是他的並發執行機制,有關它的其他實現細節,這里暫時不進行講解。
阻塞隊列
除了我們常用的容器類之外,JUC還提供了各種各樣的阻塞隊列,用於不同的工作場景。
阻塞隊列本身也是隊列,但是它是適用於多線程環境下的,基於ReentrantLock實現的,它的接口定義如下:
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
//入隊,如果隊列已滿,返回false否則返回true(非阻塞)
boolean offer(E e);
//入隊,如果隊列已滿,阻塞線程直到能入隊為止
void put(E e) throws InterruptedException;
//入隊,如果隊列已滿,阻塞線程直到能入隊或超時、中斷為止,入隊成功返回true否則false
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//出隊,如果隊列為空,阻塞線程直到能出隊為止
E take() throws InterruptedException;
//出隊,如果隊列為空,阻塞線程直到能出隊超時、中斷為止,出隊成功正常返回,否則返回null
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//返回此隊列理想情況下(在沒有內存或資源限制的情況下)可以不阻塞地入隊的數量,如果沒有限制,則返回 Integer.MAX_VALUE
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
//一次性從BlockingQueue中獲取所有可用的數據對象(還可以指定獲取數據的個數)
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
比如現在有一個容量為3的阻塞隊列,這個時候一個線程put
向其添加了三個元素,第二個線程接着put
向其添加三個元素,那么這個時候由於容量已滿,會直接被阻塞,
而這時第三個線程從隊列中取走2個元素,線程二停止阻塞,先丟兩個進去,還有一個還是進不去,所以說繼續阻塞。
利用阻塞隊列,我們可以輕松地實現消費者和生產者模式。
所謂的生產者消費者模型,是通過一個容器來解決生產者和消費者的強耦合問題。
通俗的講,就是生產者在不斷的生產,消費者也在不斷的消費,
可是消費者消費的產品是生產者生產的,這就必然存在一個中間容器,
我們可以把這個容器想象成是一個貨架,當貨架空的時候,生產者要生產產品,
此時消費者在等待生產者往貨架上生產產品,而當貨架有貨物的時候,消費者可以從貨架上拿走商品,
生產者此時等待貨架出現空位,進而補貨,這樣不斷的循環。
通過多線程編程,來模擬一個餐廳的2個廚師和3個顧客,假設廚師炒出一個菜的時間為3秒,顧客吃掉菜品的時間為4秒,窗口上只能放一個菜。
我們來看看,使用阻塞隊列如何實現,這里我們就使用ArrayBlockingQueue
實現類:
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
Runnable supplier = () -> {
while (true){
try {
String name = Thread.currentThread().getName();
System.err.println(time()+"生產者 "+name+" 正在准備餐品...");
TimeUnit.SECONDS.sleep(3);
System.err.println(time()+"生產者 "+name+" 已出餐!");
queue.put(new Object());
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
};
Runnable consumer = () -> {
while (true){
try {
String name = Thread.currentThread().getName();
System.out.println(time()+"消費者 "+name+" 正在等待出餐...");
queue.take();
System.out.println(time()+"消費者 "+name+" 取到了餐品。");
TimeUnit.SECONDS.sleep(4);
System.out.println(time()+"消費者 "+name+" 已經將飯菜吃完了!");
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
};
for (int i = 0; i < 2; i++) new Thread(supplier, "Supplier-"+i).start();
for (int i = 0; i < 3; i++) new Thread(consumer, "Consumer-"+i).start();
}
private static String time(){
SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
return "["+format.format(new Date()) + "] ";
}
}
可以看到,阻塞隊列在多線程環境下的作用是非常明顯的,一共有三種常用的阻塞隊列:
- ArrayBlockingQueue:有界帶緩沖阻塞隊列(就是隊列是有容量限制的,裝滿了肯定是不能再裝的,只能阻塞,數組實現)
- SynchronousQueue:無緩沖阻塞隊列(相當於沒有容量的ArrayBlockingQueue,因此只有阻塞的情況)
- LinkedBlockingQueue:無界帶緩沖阻塞隊列(沒有容量限制,也可以限制容量,也會阻塞,鏈表實現)
這里我們以ArrayBlockingQueue為例進行源碼解讀,我們先來看看構造方法:
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair); //底層采用鎖機制保證線程安全性,這里我們可以選擇使用公平鎖或是非公平鎖
notEmpty = lock.newCondition(); //這里創建了兩個Condition(都屬於lock)一會用於入隊和出隊的線程阻塞控制
notFull = lock.newCondition();
}
接着我們來看put
和offer
方法是如何實現的:
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock; //可以看到這里也是使用了類里面的ReentrantLock進行加鎖操作
lock.lock(); //保證同一時間只有一個線程進入
try {
if (count == items.length) //直接看看隊列是否已滿,如果沒滿則直接入隊,如果已滿則返回false
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock; //同樣的,需要進行加鎖操作
lock.lockInterruptibly(); //注意這里是可以響應中斷的
try {
while (count == items.length)
notFull.await(); //可以看到當隊列已滿時會直接掛起當前線程,在其他線程出隊操作時會被喚醒
enqueue(e); //直到隊列有空位才將線程入隊
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //出隊操作會調用notFull的signal方法喚醒被掛起處於等待狀態的線程
return x;
}
接着我們來看出隊操作:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock(); //出隊同樣進行加鎖操作,保證同一時間只能有一個線程執行
try {
return (count == 0) ? null : dequeue(); //如果隊列不為空則出隊,否則返回null
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //可以響應中斷進行加鎖
try {
while (count == 0)
notEmpty.await(); //和入隊相反,也是一直等直到隊列中有元素之后才可以出隊,在入隊時會喚醒此線程
return dequeue();
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); //對notEmpty的signal喚醒操作
}
接着我們來看一個比較特殊的隊列SynchronousQueue,它沒有任何容量,也就是說正常情況下出隊必須和入隊操作成對出現,我們先來看它的內部,可以看到內部有一個抽象類Transferer,它定義了一個transfer
方法:
abstract static class Transferer<E> {
/**
* 可以是put也可以是take操作
*
* @param e 如果不是空,即作為生產者,那么表示會將傳入參數元素e交給消費者
* 如果為空,即作為消費者,那么表示會從生產者那里得到一個元素e並返回
* @param 是否可以超時
* @param 超時時間
* @return 不為空就是從生產者那里返回的,為空表示要么被中斷要么超時。
*/
abstract E transfer(E e, boolean timed, long nanos);
}
乍一看,有點迷惑,難不成還要靠這玩意去實現put和take操作嗎?
實際上它是直接以生產者消費者模式進行的,由於不需要依靠任何容器結構來暫時存放數據,所以我們可以直接通過transfer
方法來對生產者和消費者之間的數據進行傳遞。
比如一個線程put一個新的元素進入,這時如果沒有其他線程調用take方法獲取元素,那么會持續被阻塞,直到有線程取出元素,而transfer
正是需要等生產者消費者雙方都到齊了才能進行交接工作,單獨只有其中一方都需要進行等待。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException(); //判空
if (transferer.transfer(e, false, 0) == null) { //直接使用transfer方法進行數據傳遞
Thread.interrupted(); //為空表示要么被中斷要么超時
throw new InterruptedException();
}
}
它在公平和非公平模式下,有兩個實現,這里我們來看公平模式下的SynchronousQueue是如何實現的:
static final class TransferQueue<E> extends Transferer<E> {
//頭結點(頭結點僅作為頭結點,后續節點才是真正等待的線程節點)
transient volatile QNode head;
//尾結點
transient volatile QNode tail;
/** 節點有生產者和消費者角色之分 */
static final class QNode {
volatile QNode next; // 后繼節點
volatile Object item; // 存儲的元素
volatile Thread waiter; // 處於等待的線程,和之前的AQS一樣的思路,每個線程等待的時候都會被封裝為節點
final boolean isData; // 是生產者節點還是消費者節點
公平模式下,Transferer的實現是TransferQueue,是以先進先出的規則的進行的,內部有一個QNode類來保存等待的線程。
好了,我們直接上transfer()
方法的實現(這里再次提醒各位,多線程環境下的源碼分析和單線程的分析不同,我們需要時刻關注當前代碼塊的加鎖狀態,如果沒有加鎖,一定要具有多線程可能會同時運行的意識,這個意識在以后你自己處理多線程問題伴隨着你,才能保證你的思路在多線程環境下是正確的):
E transfer(E e, boolean timed, long nanos) { //注意這里面沒加鎖,肯定會多個線程之間競爭
QNode s = null;
boolean isData = (e != null); //e為空表示消費者,不為空表示生產者
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // 頭結點尾結點任意為空(但是在構造的時候就已經不是空了)
continue; // 自旋
if (h == t || t.isData == isData) { // 頭結點等於尾結點表示隊列中只有一個頭結點,肯定是空,或者尾結點角色和當前節點一樣,這兩種情況下,都需要進行入隊操作
QNode tn = t.next;
if (t != tail) // 如果這段時間內t被其他線程修改了,如果是就進下一輪循環重新來
continue;
if (tn != null) { // 繼續校驗是否為隊尾,如果tn不為null,那肯定是其他線程改了隊尾,可以進下一輪循環重新來了
advanceTail(t, tn); // CAS將新的隊尾節點設置為tn,成不成功都無所謂,反正這一輪肯定沒戲了
continue;
}
if (timed && nanos <= 0) // 超時返回null
return null;
if (s == null)
s = new QNode(e, isData); //構造當前結點,准備加入等待隊列
if (!t.casNext(null, s)) // CAS添加當前節點為尾結點的下一個,如果失敗肯定其他線程又搶先做了,直接進下一輪循環重新來
continue;
advanceTail(t, s); // 上面的操作基本OK了,那么新的隊尾元素就修改為s
Object x = awaitFulfill(s, e, timed, nanos); //開始等待s所對應的消費者或是生產者進行交接,比如s現在是生產者,那么它就需要等到一個消費者的到來才會繼續(這個方法會先進行自旋等待匹配,如果自旋一定次數后還是沒有匹配成功,那么就掛起)
if (x == s) { // 如果返回s本身說明等待狀態下被取消
clean(t, s);
return null;
}
if (!s.isOffList()) { // 如果s操作完成之后沒有離開隊列,那么這里將其手動丟棄
advanceHead(t, s); // 將s設定為新的首節點(注意頭節點僅作為頭結點,並非處於等待的線程節點)
if (x != null) // 刪除s內的其他信息
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e; //假如當前是消費者,直接返回x即可,x就是從生產者那里拿來的元素
} else { // 這種情況下就是與隊列中結點類型匹配的情況了(注意隊列要么為空要么只會存在一種類型的節點,因為一旦出現不同類型的節點馬上會被交接掉)
QNode m = h.next; // 獲取頭結點的下一個接口,准備進行交接工作
if (t != tail || m == null || h != head)
continue; // 判斷其他線程是否先修改,如果修改過那么開下一輪
Object x = m.item;
if (isData == (x != null) || // 判斷節點類型,如果是相同的操作,那肯定也是有問題的
x == m || // 或是當前操作被取消
!m.casItem(x, e)) { // 上面都不是?那么最后再進行CAS替換m中的元素,成功表示交接成功,失敗就老老實實重開吧
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // 成功交接,新的頭結點可以改為m了,原有的頭結點直接不要了
LockSupport.unpark(m.waiter); // m中的等待交接的線程可以繼續了,已經交接完成
return (x != null) ? (E)x : e; // 同上,該返回什么就返回什么
}
}
}
所以,總結為以下流程:
在JDK7的時候,基於SynchronousQueue產生了一個更強大的TransferQueue,它保留了SynchronousQueue的匹配交接機制,並且與等待隊列進行融合。
我們知道,SynchronousQueue並沒有使用鎖,而是采用CAS操作保證生產者與消費者的協調,但是它沒有容量,而LinkedBlockingQueue雖然是有容量且無界的,但是內部基本都是基於鎖實現的,性能並不是很好。
這時,我們就可以將它們各自的優點單獨拿出來,揉在一起,就成了性能更高的LinkedTransferQueue
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
queue.put("1"); //插入時,會先檢查是否有其他線程等待獲取,如果是,直接進行交接,否則插入到存儲隊列中
queue.put("2"); //不會像SynchronousQueue那樣必須等一個匹配的才可以
queue.forEach(System.out::println); //直接打印所有的元素,這在SynchronousQueue下只能是空,因為單獨的入隊或出隊操作都會被阻塞
}
相比 SynchronousQueue
,它多了一個可以存儲的隊列,我們依然可以像阻塞隊列那樣獲取隊列中所有元素的值,簡單來說,LinkedTransferQueue
其實就是一個多了存儲隊列的SynchronousQueue
。
接着我們來了解一些其他的隊列:
- PriorityBlockingQueue - 是一個支持優先級的阻塞隊列,元素的獲取順序按優先級決定。
- DelayQueue - 它能夠實現延遲獲取元素,同樣支持優先級。
我們先來看優先級阻塞隊列:
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<Integer> queue =
new PriorityBlockingQueue<>(10, Integer::compare); //可以指定初始容量(可擴容)和優先級比較規則,這里我們使用升序
queue.add(3);
queue.add(1);
queue.add(2);
System.out.println(queue); //注意保存順序並不會按照優先級排列,所以可以看到結果並不是排序后的結果
System.out.println(queue.poll()); //但是出隊順序一定是按照優先級進行的
System.out.println(queue.poll());
System.out.println(queue.poll());
}
我們的重點是DelayQueue,它能實現延時出隊,也就是說當一個元素插入后,如果沒有超過一定時間,那么是無法讓此元素出隊的。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
可以看到此類只接受Delayed的實現類作為元素:
public interface Delayed extends Comparable<Delayed> { //注意這里繼承了Comparable,它支持優先級
//獲取剩余等待時間,正數表示還需要進行等待,0或負數表示等待結束
long getDelay(TimeUnit unit);
}
這里我們手動實現一個:
private static class Test implements Delayed {
private final long time; //延遲時間,這里以毫秒為單位
private final int priority;
private final long startTime;
private final String data;
private Test(long time, int priority, String data) {
this.time = TimeUnit.SECONDS.toMillis(time); //秒轉換為毫秒
this.priority = priority;
this.startTime = System.currentTimeMillis(); //這里我們以毫秒為單位
this.data = data;
}
@Override
public long getDelay(TimeUnit unit) {
long leftTime = time - (System.currentTimeMillis() - startTime); //計算剩余時間 = 設定時間 - 已度過時間(= 當前時間 - 開始時間)
return unit.convert(leftTime, TimeUnit.MILLISECONDS); //注意進行單位轉換,單位由隊列指定(默認是納秒單位)
}
@Override
public int compareTo(Delayed o) {
if(o instanceof Test)
return priority - ((Test) o).priority; //優先級越小越優先
return 0;
}
@Override
public String toString() {
return data;
}
}
接着我們在主方法中嘗試使用:
public static void main(String[] args) throws InterruptedException {
DelayQueue<Test> queue = new DelayQueue<>();
queue.add(new Test(1, 2, "2號")); //1秒鍾延時
queue.add(new Test(3, 1, "1號")); //1秒鍾延時,優先級最高
System.out.println(queue.take()); //注意出隊順序是依照優先級來的,即使一個元素已經可以出隊了,依然需要等待優先級更高的元素到期
System.out.println(queue.take());
}
我們來研究一下DelayQueue是如何實現的,首先來看add()
方法:
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); //注意這里是向內部維護的一個優先級隊列添加元素,並不是DelayQueue本身存儲元素
if (q.peek() == e) { //如果入隊后隊首就是當前元素,那么直接進行一次喚醒操作(因為有可能之前就有其他線程等着take了)
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public void put(E e) {
offer(e);
}
可以看到無論是哪種入隊操作,都會加鎖進行,屬於常規操作。我們接着來看take()
方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //出隊也要先加鎖,基操
lock.lockInterruptibly();
try {
for (;;) { //無限循環,常規操作
E first = q.peek(); //獲取隊首元素
if (first == null) //如果為空那肯定隊列為空,先等着吧,等有元素進來
available.await();
else {
long delay = first.getDelay(NANOSECONDS); //獲取延遲,這里傳入的時間單位是納秒
if (delay <= 0)
return q.poll(); //如果獲取到延遲時間已經小於0了,那說明ok,可以直接出隊返回
first = null;
if (leader != null) //這里用leader來減少不必要的等待時間,如果不是null那說明有線程在等待,為null說明沒有線程等待
available.await(); //如果其他線程已經在等元素了,那么當前線程直接進永久等待狀態
else {
Thread thisThread = Thread.currentThread();
leader = thisThread; //沒有線程等待就將leader設定為當前線程
try {
available.awaitNanos(delay); //獲取到的延遲大於0,那么就需要等待延遲時間,再開始下一次獲取
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal(); //當前take結束之后喚醒一個其他永久等待狀態下的線程
lock.unlock(); //解鎖,完事
}
}
到此,有關並發容器的講解就到這里。