在JDK 1.5之前,提到並發,java程序員們一般想到的是wait()、notify()、Synchronized關鍵字等,但是並發除了要考慮競態資源、死鎖、資源公平性等問題,往往還需要考慮性能問題,在一些業務場景往往還會比較復雜,這些都給java coder們造成不小的難題。JDK 1.5的concurrent包幫我們解決了不少問題。
Concurrent包中包含了幾個比較常用的並發模塊,這個系列,LZ就和大家一起來學習各個模塊,Let’s Go!
一、線程池的基本用法
一般並發包里有三個常用的線程池實例化方法,在Executors這個工廠類中。
- ·newFixedThreadPool(int size):
創建一個可重用固定線程數的線程池,以共享的無界隊列(LinkedBlockingQueue)方式來運行這些線程。任何時刻最多只有size大小的線程在執行各自任務。當線程池中所有線程都在運行,后續新來的線程將在無界隊列中等待,直到有空閑線程為止。如果有線程在執行任務期間終止退出,一個新的線程會替代原先的線程繼續后面任務的執行。線程池中的線程會一直存在,除非顯示的關閉某個線程。
ExecuterService fixedPool = Executors. newFixedThreadPool(5); //創建一個固定包含5個線程的線程池
- ·newCachedThreadPool()
創建一個可以根據需要創建線程的線程池,當一個任務過來,如果線程池中有空閑可用線程,先用空閑線程,沒有就創建一個新線程。這種線程池對於執行short-lived asynchronous tasks(短期異步任務)通常會提高性能。默認空閑60s的線程會從線程池中移除,如果長時間空閑的話,這個線程池是不會占用任何資源的。但是以我個人的經驗,這種線程池要慎用,用不好經常會造成堆外內存溢出,從而造成應用反應慢,最后導致宕機。
ExecuterService fixedPool = Executors. newCachedThreadPool();
- ·newSingleThreadExecutor()
創建一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。可以保證順序執行各個任務。有人說這不就是newFixedThreadPool(1)么,錯矣。看源碼就知道外部還包了一層FinalizableDelegatedExecutorService,這個是Executors的一個靜態內部類,這個內部類繼承了DelegatedExecutorService,返回的這個ExecutorService實現只包含ExecutorService接口定義的一些方法,不同於newFixedThreadPool有自己擴展的方法。源碼注釋的區別是:newSingleThreadExecutor可以保證使用其他線程時,返回的Executor不用重新配置。
ExecuterService fixedPool = Executors. newSingleThreadExecutor();
- ·newScheduledThreadPool(int size)
創建一個線程池,它可安排在給定延遲后運行命令或者定期地執行。這是個無界大小的線程池。其中主要的執行方法是scheduleAtFixedRate和scheduleWithFixedDelay方法,最終的核心方法是delayedExecute,這兩個方法有什么區別呢,簡單來說,scheduleAtFixedRate是以任務執行時間開始為延遲時間起點的,scheduleWithFixedDelay是以任務執行結束時間為延時時間起點的。
接下來對並發包里面常用的一些類進行使用解析
1) 信號量 Semaphore:
一個計數信號量,維護了一個許可集。通俗的講,就是對有限共享資源記錄使用情況。
舉個日常生活中的例子好了:有20個人去登記信息表格,但是只有3只筆,一個人用完之后給下面一個人用,直到所有人都用完。
沒有並發包之前,我們是怎么設計的呢?
定義一個變量a,用synchronized修飾,值為3,新建20個線程模擬20個人訪問,每個人用筆之前,先看是否有空余的筆,如果有,就使用,並將變量a減少1,使用完成之后,再放回去,即將變量a加1,各個線程之間對於變量a的訪問需要互斥進行。Synchronized是一個重量級的同步操作,涉及到用戶態與核心態的切換,所以性能一般。
讓我們再看看信號量是怎么實現的,Semaphore實現的思想跟上面差不多。只不過Semaphore已經幫我們做了同步處理,有兩個關鍵的方法:
- ·acquire() 獲取一個信號量許可
- ·release() 釋放一個信號量許可
此外還有其他一些方法:availablePermits() 返回信號量可用的許可數,talk is cheap show me the code:
public class SemaphoreTest extends Thread{
private String name;
private Semaphore sh;
public SemaphoreTest(String name,Semaphore sh){
this.name = name;
this.sh = sh;
}
public void run(){
if(sh.availablePermits()>0){
System.out.println("有筆");
}else{
System.out.println("筆沒了,等等");
}
try {
sh.acquire();//信號量減1
System.out.println(this.name+"號在用筆");
Thread.sleep((long) (Math.random()*1000));
sh.release();
System.out.println(this.name+"號用完了");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
Semaphore sh = new Semaphore(3);
for(int i=0;i<20;i++){
es.submit(new SemaphoreTest(i+"",sh));
}
es.shutdown();
}
}
上述代碼直接運行即可。
這個信號量是如何同步資源的,就需要讀讀jdk相關源碼了。核心類是AbstractQueuedSynchronizer,該類實現了一個FIFO的列表,列表中Node表示列表節點,該Node有prev、next、Thread等屬性,表示前節點后節點以及線程等。並發包中每個需要實現個性化同步機制的都要擴展該類,以實現不同的同步功能。比如Semaphore、countDownLatch、CyclicBarrier等,具體原理要分析源碼,篇幅較長,這里不做展開,有興趣的可以研究一下,對volatile關鍵字、CAS等都會有更深的理解。
2) CountDownLatch
用給定的計數 初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之后,會釋放所有等待的線程,await 的所有后續調用都將立即返回。
比較重要的2個方法:countDown()和await()
下面來個常見的例子:導游帶了20人團到了飯店吃飯,要等到20個人到齊了,才開始吃飯,還要等到20人都吃完了,才可以繼續下個景點。
來看看不用並發包你會怎么實現,首先會設置20個人到來以及吃完的標志,每個線程過來更改自己的標志,主程序for或者while不停的循環監聽,制止全部到來以及吃完才繼續。如果是100人1000人呢,未免臃腫。
CountDownLatch的實現:
public class CountDownLautchTest {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(20);
final CountDownLatch endFlag = new CountDownLatch(20);
final CountDownLatch startFlag = new CountDownLatch(1);
final CountDownLatch comeFlag = new CountDownLatch(20);
for(int i=0;i<20;i++){
final int j = i + 1;
Runnable person = new Runnable(){
@Override
public void run() {
System.out.println(j+"號游客來了");
comeFlag.countDown();
try {
startFlag.await();
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
System.out.println(j+"號吃完了");
endFlag.countDown();
}
}
};
es.submit(person);
}
try {
comeFlag.await(); //保證所有人都到齊了
System.out.println("人都齊了,大家一起吃飯");
startFlag.countDown();//開吃
endFlag.await();//等待所有人都吃完了
System.out.println("全都吃完了,繼續下個景點");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
es.shutdown();
}
}
CountDownLatch是一種不可逆的降序計數操作,所以上述代碼里面定義了2個分別表示等候以及吃飯的標志。如果需要重復計數,就需要用到下面這個類:CyclicBarrier。
3) CyclicBarrier
一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環 的 barrier。
還是上述的例子吧,再加一點場景,吃完飯還要游玩,游玩結束返程回家。讓我們來看看CyclicBarrier是如何實現的,上代碼:
public class CyclicBarrierTest {
public static void main(String[] args) {
final CyclicBarrier barrier = new CyclicBarrier(20);
final ExecutorService es = Executors.newFixedThreadPool(20);
System.out.println("****等人齊****");
for(int i=0;i<20;i++){
final int j = i+1;
Runnable person = new Runnable(){
@Override
public void run() {
try {
System.out.println(j+"號來了");
if(barrier.await()==0){
System.out.println("****人都到齊了****");
}
System.out.println(j+"號開吃");
if(barrier.await()==0){
System.out.println("****吃完了出發****");
}
System.out.println(j+"號玩好了");
if(barrier.await()==0){
System.out.println("****游玩結束回家****");
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
es.submit(person);
}
es.shutdown();
}
}
代碼直接運行即可,應該比較好懂吧,這里有個小竅門,代碼中if(barrier.await()==0)這個條件,有點類似於CountDownLatch的await往下執行的條件了,每次判斷都會減1,減到0,即完成所有線程的等待,繼續下面的操作。
結語
關於並發包一些常用的類就介紹到這里吧,工作中具體怎么使用要具體分析各個業務場景,選擇合適的方法,隨機應變,觸類旁通吧,后續會介紹一下Lock源碼以及實現,涉及AQS的原理等。
未完,待續……