一、等待多線程完成的CountDownLatch
需求場景:當我們需要解析一個Excel里多個sheet的數據,此時可以考慮使用多線程,每個線程解析一個sheet里的數據,等到sheet都解析完之后,程序需要提示解析完成。
當然我們可以使用join方法,join用於讓當前線程等待join線程執行結束。在JDK1.5之后的並發包中提供的CountDownLatch也可以實現join的功能。
CountDownLatch允許一個或多個線程等待其他線程完成操作。
CountDownLatch的構造函數接收一個int類型的參數作為計數器,如果你想等待N個點完成,就傳入N。當我們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變為零。由於countDown方法可以用在任何地方,所以這里說的N個點,可以是N個線程,也可以是1個線程里的N個執行步驟。用在多個線程時,只需要把這個CountDownLatch的引用傳遞到線程里即可。
public class CountDownLatchTest {
static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
// new Thread(new Runnable() {
// @Override
// public void run() {
// System.out.println(1);
// c.countDown();
// System.out.println(2);
// c.countDown();
//
// }
// }).start();
// c.await();
// System.out.println(3);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
c.countDown();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(2);
c.countDown();
}
}).start();
c.await();
System.out.println(3);
}
}
上面執行結果可能是2 1 finished
,也可能是1 2 finished
,總之1、2在finished之前輸出。
二、同步屏障CyclicBarrier
CyclicBarrier要做的事是讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行。
public class CyclicBarrierTest {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(2);
}
}
因為主線程和子線程的調度是由CPU決定的,兩個線程都有可能先執行,所以輸出1 2,有可能是輸出2 1。
如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),則主線程和子線程會永遠等待,因為沒有第三個線程執行await()方法,既沒有第三個線程到達屏障,所以兩個線程都不會繼續執行。
CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties, Runnable barrieAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。
public class CyclicBarrierTest1 {
static CyclicBarrier c=new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
System.out.println("initialize....");
}
});
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(2);
}
}
上面會首先輸出initialize....
應用場景:
CyclicBarrier可以用於多線程計算數據,最后合並計算結果的場景。例如,用一個Excel保存了用戶所有銀行流水,每個Sheet保存一個賬戶近一年的每筆銀行流水,現在需要統計用戶的日均銀行流水,先用多線程處理每個sheet里的銀行流水,都執行完之后,得到每個sheet的日均銀行流水,最后,再用barrierAction根據這些線程的計算結果,計算出整個Excel的日均銀行流水。
public class CyclicBarrierTest2 implements Runnable{
private CyclicBarrier c = new CyclicBarrier(4, this);
private Executor executor = Executors.newFixedThreadPool(4);
private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
private void count() {
for (int i=0;i<4;i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
//模擬計算當前sheet的流水數據。
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
try {
//銀流計算完成,插入一個屏障
c.await();
} catch (Exception e){
e.printStackTrace();
}
}
});
}
}
@Override
public void run() {
int result=0;
//所有線程到達屏障后,匯總每個sheet計算出的結果
for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
result+=sheet.getValue();
}
sheetBankWaterCount.put("result", result);
System.out.println(result);
}
public static void main(String[] args) {
CyclicBarrierTest2 barrierTest2=new CyclicBarrierTest2();
barrierTest2.count();
}
}
控制並發線程數的Semaphore
Semaphore用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理地使用公共資源。
應用場景:
Semaphore可以用於做流量控制,特別是公共資源有限的應用場景,比如數據庫連接。假如有一個需求要讀取幾萬個文件的數據,因為都是IO密集型任務,我們可以啟動幾十個線程並發地讀取,但是如果讀到內存中,還需要存儲到數據庫中,而數據庫的連接數只有10個,這是我們必須控制只有10個線程同時獲取數據庫連接保存數據,否則會報錯無法獲取數據庫連接。這個時候,就可以使用Semaphore來做流量控制。
public class SemaphoreTest {
private static final int THREAD_COUNT=30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args){
for (int i=0;i<THREAD_COUNT;i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
threadPool.shutdown();
}
}
在代碼中,雖然有30個線程在執行,但是只允許10個並發執行。Semaphore的構造方法Semaphore(int permits)接受一個整數的數字,表示可用的許可證數量。Semaphore(10)表示允許10個線程獲取許可證,也就是最大並發數10.Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()方法獲取一個許可證,使用完之后調用release()方法歸還許可證。還可以用tryAcquire()方法嘗試獲取許可證。
線程間交換數據的Exchanger
Exchanger是一個用於線程間協作的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。這兩個線程通過exchanger方法交換數據,如果第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。
應用場景:
Exchanger可用與校對工作,比如我們需要將紙質銀行流水通過人工的方式錄入成電子銀行流水,為了避免錯誤,采用AB崗兩人進行錄入,錄入到Excel之后,系統需要加載這兩個Excel,並對兩個Excel數據進行校對,看看是否錄入一致。
public class ExchangerTest {
private static final Exchanger<String> exchanger=new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
String a = "銀行流水A";
try {
a=exchanger.exchange(a);
System.out.println("交換后,a="+a);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
String b = "銀行流水B";
try {
b=exchanger.exchange(b);
System.out.println("交換后,b="+b);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threadPool.shutdown();
}
}
上面輸出:
交換后,a=銀行流水B
交換后,b=銀行流水A
如果兩個線程有一個沒有執行exchange()方法,則會一直等待,如果擔心有特殊情況發生,避免一直等待,可以用exchange(V x, long timeout, TimeUnit unit)
設置最大等待時長。
參考:《Java並發編程的藝術》-方騰飛