Java中的並發工具類


一、等待多線程完成的CountDownLatch

需求場景:當我們需要解析一個Excel里多個sheet的數據,此時可以考慮使用多線程,每個線程解析一個sheet里的數據,等到sheet都解析完之后,程序需要提示解析完成。
當然我們可以使用join方法,join用於讓當前線程等待join線程執行結束。在JDK1.5之后的並發包中提供的CountDownLatch也可以實現join的功能。

CountDownLatch允許一個或多個線程等待其他線程完成操作。
CountDownLatch的構造函數接收一個int類型的參數作為計數器,如果你想等待N個點完成,就傳入N。當我們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變為零。由於countDown方法可以用在任何地方,所以這里說的N個點,可以是N個線程,也可以是1個線程里的N個執行步驟。用在多個線程時,只需要把這個CountDownLatch的引用傳遞到線程里即可。

public class CountDownLatchTest {
    static CountDownLatch c = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
//        new Thread(new Runnable() {
//            @Override
//            public void run() {
//                System.out.println(1);
//                c.countDown();
//                System.out.println(2);
//                c.countDown();
//
//            }
//        }).start();
//        c.await();
//        System.out.println(3);
        
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(1);
                c.countDown();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(2);
                c.countDown();
            }
        }).start();

        c.await();
        System.out.println(3);
    }
}

上面執行結果可能是2 1 finished,也可能是1 2 finished,總之1、2在finished之前輸出。

二、同步屏障CyclicBarrier

CyclicBarrier要做的事是讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行。

public class CyclicBarrierTest {
    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    c.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(1);
            }
        }).start();

        try {
            c.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(2);
    }
}

因為主線程和子線程的調度是由CPU決定的,兩個線程都有可能先執行,所以輸出1 2,有可能是輸出2 1。

如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),則主線程和子線程會永遠等待,因為沒有第三個線程執行await()方法,既沒有第三個線程到達屏障,所以兩個線程都不會繼續執行。

CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties, Runnable barrieAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。

public class CyclicBarrierTest1 {
    static CyclicBarrier c=new CyclicBarrier(2, new Runnable() {
        @Override
        public void run() {
            System.out.println("initialize....");
        }
    });

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println(1);
            }
        }).start();
        try {
            c.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(2);
    }
}

上面會首先輸出initialize....

應用場景:
CyclicBarrier可以用於多線程計算數據,最后合並計算結果的場景。例如,用一個Excel保存了用戶所有銀行流水,每個Sheet保存一個賬戶近一年的每筆銀行流水,現在需要統計用戶的日均銀行流水,先用多線程處理每個sheet里的銀行流水,都執行完之后,得到每個sheet的日均銀行流水,最后,再用barrierAction根據這些線程的計算結果,計算出整個Excel的日均銀行流水。

public class CyclicBarrierTest2 implements Runnable{
    private CyclicBarrier c = new CyclicBarrier(4, this);

    private Executor executor = Executors.newFixedThreadPool(4);

    private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();

    private void count() {
        for (int i=0;i<4;i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //模擬計算當前sheet的流水數據。
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                    try {
                        //銀流計算完成,插入一個屏障
                        c.await();
                    } catch (Exception e){
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    @Override
    public void run() {
        int result=0;
        //所有線程到達屏障后,匯總每個sheet計算出的結果
        for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
            result+=sheet.getValue();
        }
        sheetBankWaterCount.put("result", result);
        System.out.println(result);
    }

    public static void main(String[] args) {
        CyclicBarrierTest2 barrierTest2=new CyclicBarrierTest2();
        barrierTest2.count();
    }
}

控制並發線程數的Semaphore

Semaphore用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理地使用公共資源。
應用場景:
Semaphore可以用於做流量控制,特別是公共資源有限的應用場景,比如數據庫連接。假如有一個需求要讀取幾萬個文件的數據,因為都是IO密集型任務,我們可以啟動幾十個線程並發地讀取,但是如果讀到內存中,還需要存儲到數據庫中,而數據庫的連接數只有10個,這是我們必須控制只有10個線程同時獲取數據庫連接保存數據,否則會報錯無法獲取數據庫連接。這個時候,就可以使用Semaphore來做流量控制。

public class SemaphoreTest {
    private static final int THREAD_COUNT=30;
    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore s = new Semaphore(10);

    public static void main(String[] args){
        for (int i=0;i<THREAD_COUNT;i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        s.acquire();
                        System.out.println("save data");
                        s.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        threadPool.shutdown();
    }
}

在代碼中,雖然有30個線程在執行,但是只允許10個並發執行。Semaphore的構造方法Semaphore(int permits)接受一個整數的數字,表示可用的許可證數量。Semaphore(10)表示允許10個線程獲取許可證,也就是最大並發數10.Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()方法獲取一個許可證,使用完之后調用release()方法歸還許可證。還可以用tryAcquire()方法嘗試獲取許可證。

線程間交換數據的Exchanger

Exchanger是一個用於線程間協作的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。這兩個線程通過exchanger方法交換數據,如果第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。

應用場景:
Exchanger可用與校對工作,比如我們需要將紙質銀行流水通過人工的方式錄入成電子銀行流水,為了避免錯誤,采用AB崗兩人進行錄入,錄入到Excel之后,系統需要加載這兩個Excel,並對兩個Excel數據進行校對,看看是否錄入一致。

public class ExchangerTest {
    private static final Exchanger<String> exchanger=new Exchanger<>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                String a = "銀行流水A";
                try {
                    a=exchanger.exchange(a);
                    System.out.println("交換后,a="+a);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                String b = "銀行流水B";
                try {
                    b=exchanger.exchange(b);
                    System.out.println("交換后,b="+b);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        threadPool.shutdown();
    }
}

上面輸出:

交換后,a=銀行流水B
交換后,b=銀行流水A

如果兩個線程有一個沒有執行exchange()方法,則會一直等待,如果擔心有特殊情況發生,避免一直等待,可以用exchange(V x, long timeout, TimeUnit unit)設置最大等待時長。

參考:《Java並發編程的藝術》-方騰飛


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM