java線程並發工具類


  本次內容主要講Fork-Join、CountDownLatch、CyclicBarrier以及Callable、Future和FutureTask,最后再手寫一個自己的FutureTask,絕對干貨滿滿!

 

1、Fork-Join

1.1 什么是Fork-Join

  Java多線程的開發可以我們自己啟用多線程,線程池,還可以使用forkjoin。forkjoin可以讓我們不去了解諸如Thread、Runnable等相關的知識,只要遵循forkjoin的開發模式,就可以寫出很好的多線程並發程序。

   forkjoin采用的是分而治之。分而治之思想是:將一個難以直接解決的大問題,分割成一些規模較小的相同問題,以便各個擊破,分而治之。分而治之的策略是:對於一個規模為n的問題,若該問題可以容易地解決(比如說規模n較小)則直接解決,否則將其分解為m個規模較小的子問題,這些子問題互相獨立且與原問題形式相同(子問題相互之間有聯系就會變為動態規范算法),遞歸地解這些子問題,然后將各子問題的解合並得到原問題的解,這種算法設計策略叫做分治法。用一張圖來表示forkjoin原理。

  我們可以了解一下計算機的十大經典算法:快速排序、堆排序、歸並排序 、二分查找、BFPRT(線性查找)、DFS(深度優先搜索)、BFS(廣度優先搜索)、Dijkstra、動態規划、朴素貝葉斯分類。其中有哪一些用到的是分而治之呢?有3個,分別是快速排序、歸並排序和二分查找。

  歸並排序是建立在歸並操作上的一種有效的排序算法。該算法是采用分治法的一個非常典型的應用。將已有序的子序列合並,得到完全有序的序列;即先使每個子序列有序,再使子序列段間有序。若將兩個有序表合並成一個有序表,稱為2路歸並,與之對應的還有多路歸並。對於給定的一組數據,利用遞歸與分治技術將數據序列划分成為越來越小的半子表,在對半子表排序后,再用遞歸方法將排好序的半子表合並成為越來越大的有序序列。為了提升性能,有時我們在半子表的個數小於某個數(比如15)的情況下,對半子表的排序采用其他排序算法,比如插入排序。下面演示一下歸並排序的過程。

1.2 歸並排序(升序)示例

 先將數組划分為左右2個子表:

 然后繼續對左右2個子表進行拆分:

對拆分好的4個子表進行排序:

 對有序子表進行比較合並:

 對合並后的子表繼續比較合並:

 第二次合並后,數組呈有序排列。

 1.3 Fork-Join工作竊取

  工作竊取是指當前線程的Task已經全被執行完畢,則自動取到其他線程的Task隊列中取出Task繼續執行。ForkJoinPool中維護着多個線程在不斷地執行Task,每個線程除了執行自己職務內的Task之外,還會根據自己工作線程的閑置情況去獲取其他繁忙的工作線程的Task,如此一來就能能夠減少線程阻塞或是閑置的時間,提高CPU利用率。用一張圖進行說明。

1.3 Fork-Join使用

   Fork-Join使用兩個類來完成以上兩件事情:ForkJoinTask和ForkJoinPool。我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork和join的操作機制,通常我們不直接繼承ForkjoinTask類,只需要直接繼承其子類。

(1)RecursiveAction,用於沒有返回結果的任務

(2)RecursiveTask,用於有返回值的任務

 task要通過ForkJoinPool來執行,使用submit 或 invoke 提交,兩者的區別是:invoke是同步執行,調用之后需要等待任務完成,才能執行后面的代碼;submit是異步執行。join()和get方法當任務完成的時候返回計算結果。調用get/join方法的時候會阻塞。還是用一個圖來說明forkjoin的工作流程。

  在我們自己實現的compute方法里,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在調用invokeAll方法時,又會進入compute方法,看看當前子任務是否需要繼續分割成孫任務,如果不需要繼續分割,則執行當前子任務並返回結果。使用join方法會等待子任務執行完並得到其結果。

 1.4 Fork-Join VS 單線程

  假設有一個業務場景,數據庫中有編號為0到1千萬的會員信息,要統計所有會員的余額總和。為了對比結果的一致性,用戶的余額不用隨機數表示,就用編號代表用戶的余額。現在的做法是每次從數據庫查詢出5000條數據進行統計,直到所有數據統計完成,進行匯總。對比看看單線程和Fork-Join的差異。

先看單線程場景:

public class SingleThreadSumNumber {
    /**
     * 每次查詢5000條進行統計
     */
    private static final int THRESHOLD = 5000;

    /**
     * 最小值
     */
    private static final int MIN = 0;

    /**
     * 最大值
     */
    private static final int MAX = 10000000;

    public void sumNumber() {
        long sum = 0;
        long startTime = System.currentTimeMillis();
        int start = MIN;
        int end = MIN + THRESHOLD;

        boolean isFirstTime = true;
        while (end <= MAX) {
            sum = sum + batchSum(start, end);
            if (isFirstTime) {
                start = start + THRESHOLD + 1;
                isFirstTime = false;
            } else {
                start = start + THRESHOLD;
            }
            end = end + THRESHOLD;
        }
        System.out.println("The result is " + sum
                + " spend time:" + (System.currentTimeMillis() - startTime) + "ms");
    }

    /**
     * 統計每次查詢出來的余額總和
     * @param start
     * @param end
     * @return
     */
    public long batchSum(int start, int end) {
        long sum = 0;
        try {
            Thread.sleep(15);//休眠15毫秒模擬查詢業務
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = start; i <= end; i++) {
            sum += i;
        }
        return sum;
    }

    public static void main(String[] args) {
        SingleThreadSumNumber thread = new SingleThreadSumNumber();
        thread.sumNumber();
    }
}

運行程序輸出以下結果:

余額總和為50000005000000,花費了30119毫秒,下面使用forkjoin來進行統計:

 1 import java.util.concurrent.ForkJoinPool;
 2 import java.util.concurrent.RecursiveTask;
 3 
 4 public class ForkJoinDemo {
 5     /**
 6      * 門限值,如果任務門限低於此值,則進行計算
 7      */
 8     private static final int THRESHOLD = 5000;
 9 
10     /**
11      * 最小值
12      */
13     private static final int MIN = 0;
14 
15     /**
16      * 最大值
17      */
18     private static final int MAX = 10000000;
19 
20     private static class SumNumberTask extends RecursiveTask<Long> {
21         private int start;
22         private int end;
23 
24         public SumNumberTask(int start, int end) {
25             this.start = start;
26             this.end = end;
27         }
28 
29         @Override
30         protected Long compute() {
31             if (end - start < THRESHOLD) {
32                 return sumBatch(start, end);
33             } else {
34                 int mid = (start + end) / 2;
35                 SumNumberTask left = new SumNumberTask(start, mid);
36                 SumNumberTask right = new SumNumberTask(mid + 1, end);
37                 invokeAll(left, right);
38                 long leftResult = left.join();
39                 long rightResult = right.join();
40                 return leftResult + rightResult;
41             }
42         }
43     }
44 
45     public void sumNumber() {
46         ForkJoinPool pool = new ForkJoinPool();
47         long start = System.currentTimeMillis();
48         int recordMin = MIN;
49         int recordMax = MAX;
50         SumNumberTask sumTask = new SumNumberTask(recordMin, recordMax);
51         pool.invoke(sumTask);
52         System.out.println("Task is Running.....");
53         Long result = sumTask.join();
54         System.out.println("The result is " + result + " spend time:"
55                 + (System.currentTimeMillis() - start) + "ms");
56     }
57 
58     /**
59      * 統計每次任務的總和
60      * @param fromId
61      * @param toId
62      * @return
63      */
64     public static long sumBatch(int fromId, int toId) {
65         long sum = 0;
66         try {
67             Thread.sleep(15);//休眠15毫秒模擬查詢業務
68         } catch (InterruptedException e) {
69             e.printStackTrace();
70         }
71         for (int i = fromId; i <= toId; i++) {
72             sum += i;
73         }
74         return sum;
75     }
76 
77     public static void main(String[] args) {
78         ForkJoinDemo forkJoinDemo = new ForkJoinDemo();
79         forkJoinDemo.sumNumber();
80     }
81 }

輸出結果:

   余額總和為50000005000000,和使用單線程統計時一致,使用forkjoin達到了同樣的目的,但是只花費了4078毫秒,性能提升了7倍多。為了使性能有進一步提升,我們可以在第44行指定並發數量。不傳參情況下,默認並發量是當前服務器的邏輯CPU個數。我們把並發量調整成64,即ForkJoinPool pool = new ForkJoinPool(16 * 4),執行程序,輸出結果為:

 統計結果一致,花費了567毫秒,比起單線程統計,性能提升了53倍之多,由此可見forkjoin的並發威力。

2、CountDownLatch

 2.1 什么是CountDownLatch

  JDK對CountDownLatch的解釋是:一種同步輔助器,它允許一個或多個線程等待,直到在其他線程中執行的一組操作完成為止。舉個例子來理解CountDownLatch:隔壁寢室的老王今天要參加學校運動會的400米決賽,跟小王一起爭奪冠軍的還有另外5個人,不管這6位選手的內心多激動多澎湃,也要等裁判的發令槍響了之后才能起跑,裁判不發出指令,選手就只能在起跑線等待,這就是CountDownLatch的作用。但是實際場景並不只有一個發令裁判,參加過學校運動會的同學都知道,還可能需要若干個裁判進行手動計時,要等所有的裁判都就位后,發令槍一響,運動員才能起跑。假設有3個計時裁判,一個發令裁判,用一個圖來說明。

   在比賽開始前,發令裁判會用洪荒之力吼一聲,各~就~各~位,此時發令裁判會用炯炯有神的目光和3位計時裁判交流,3位裁判分別點頭示意已經准備好了,此時發令裁判會再次大吼一聲,預備~~~跑!!!此時憋了許久的6位運動員飛奔出去,當然老王遙遙領先,畢竟女神給他說了跑第一名的話晚上有獎勵。發令裁判的任務完成,不用繼續執行下去,而3個計時裁判繼續工作,對6位選手的成績進行一個記錄。

 2.1 CountDownLatch實戰

用一段程序來模擬老王參加運動會400米決賽的場景。

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {

    /**
     * 運動員在計時裁判和發令裁判就位后才能起跑
     */
    static CountDownLatch sportsManLatch = new CountDownLatch(4);

    /**
     * 發令裁判在3個計時裁判准備好之后才能發令
     */
    static CountDownLatch orderRefereeLatch = new CountDownLatch(3);

    /**
     * 計時裁判
     */
    static class TimeReferee implements Runnable {
        private int no;

        public TimeReferee(int no) {
            this.no = no;
        }

        @Override
        public void run() {
            System.out.println(no + "號計時裁判就位");
            orderRefereeLatch.countDown();
            sportsManLatch.countDown();
        }
    }

    /**
     * 發令裁判
     */
    static class OrderReferee implements Runnable {
        @Override
        public void run() {
            try {
                orderRefereeLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("發令裁判發出指令~~~~~~");
            sportsManLatch.countDown();
        }
    }

    /***
     * 運動員
     */
    static class SportsMan implements Runnable {
        private int no;

        public SportsMan(int no) {
            this.no = no;
        }

        @Override
        public void run() {
            try {
                System.out.println(no + "號運動員已經就位");
                sportsManLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(no + "號選手說,我要跑第一");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //6個運動員就位
        for (int i = 0; i < 6; i++) {
            new Thread(new SportsMan(i)).start();
        }

        //發令裁判和計時裁判眼神確認,等計時裁判都准備好之后發令
        new Thread(new OrderReferee()).start();

        //3個計時裁判就位
        for (int i = 0; i < 3; i++) {
            new Thread(new TimeReferee(i)).start();
        }
    }
}

程序輸出:

3、CyclicBarrier

 3.1 什么是CyclicBarrier

  JDK對CyclicBarrier的解釋是:一種同步輔助工具,它允許一組線程全部互相等待以到達一個公共的障礙點。我們可以從字面意思理解它,可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會打開,所有被屏障攔截的線程才能繼續運行。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),parties表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然后當前線程被阻塞。CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties,Runnable barrierAction),用於在parties個線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。還用一張圖來說明。

 3.2 CyclicBarrier實戰

  CyclicBarrier可以用於多線程計算數據,最后合並計算結果的場景。我們模擬3個子線程向一個map中添加數據,它們添加數據完成后,到一個屏障點進行等待,由統計線程對數據進行打印,統計線程工作結束后,3個子線程再被統一釋放去干其他工作。我們設置2個屏障點來演示,,體現其可循環使用的特征。

public class CyclicBarrierDemo {
    private static CyclicBarrier barrier = new CyclicBarrier(3, new CollectThread());

    /**存放子線程產生數據的容器*/
    private static ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            Thread thread = new Thread(new WorkThread());
            thread.start();
        }
        Thread.sleep(5);
    }

    /**
     * 負責對子線程的結果進行其他處理
     */
    private static class CollectThread implements Runnable {
        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for (Map.Entry<String, Long> workResult : map.entrySet()) {
                result.append("[" + workResult.getValue() + "]");
            }
            System.out.println("the result = " + result);
            System.out.println("CollectThread do other things");
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("CollectThread end........");
        }
    }

    /**
     * 實際工作的子線程
     */
    private static class WorkThread implements Runnable {
        @Override
        public void run() {
            long id = Thread.currentThread().getId();
            map.put(id + "", id);
            Random r = new Random();
            try {
                Thread.sleep(r.nextInt(1000));
                System.out.println("Thread_" + id + " first do something ");
                //第一次到達屏障
                barrier.await();
                System.out.println("Thread_" + id + " first do other things");

                Thread.sleep(r.nextInt(500));
                map.put(id * 2 + "", id * 2);
                System.out.println("Thread_" + id + " second do something ");
                //第二次到達屏障
                barrier.await();
                System.out.println("Thread_" + id + " second other things ");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

程序輸出:

3.3 CountDownLatch和CyclicBarrier對比

  CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以反復使用。CountDownLatch.await()一般阻塞工作線程,所有的進行預備工作的線程執行countDown(),而CyclicBarrier通過工作線程調用await()從而自行阻塞,直到所有工作線程達到指定屏障,再大家一起往下走。在控制多個線程同時運行上,CountDownLatch可以不限線程數量,而CyclicBarrier是固定線程數。同時,CyclicBarrier還可以提供一個barrierAction,合並多線程計算結果。

4、Callable、Future和FutureTask

4.1 Runnable、Callable、Future和FutureTask之間的關系

  Runnable是一個接口,在它里面只聲明了一個run()方法,由於run()方法返回值為void類型,所以在執行完任務之后無法返回任何結果。Callable位於java.util.concurrent包下,它也是一個接口,在它里面也只聲明了一個方法,只不過這個方法叫做call(),這是一個泛型接口,call()函數返回的類型就是傳遞進來的V類型。Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。要獲取返回結果時可以調用get方,該方法會阻塞直到任務返回結果。因為Future只是一個接口,所以是無法直接用來創建對象使用的,因此就有了FutureTask。FutureTask類實現了RunnableFuture接口,RunnableFuture繼承了Runnable接口和Future接口,所以它既可以作為Runnable被線程執行,又可以作為Future得到Callable的返回值。用一個圖來說明。

  因此當我們想通過一個線程運行Callable,但是Thread不支持構造方法中傳遞Callable的實例,我們需要通過FutureTask把一個Callable包裝成Runnable,然后再通過這個FutureTask拿到Callable運行后的返回值。要想new出一個FutureTask的實例,有2種方式,直接貼出代碼。

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }


    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

4.2 Callable和FutureTask實戰

  這個例子比較簡單,在一個主線程中創建一個callable來對1到10000進行累加,再休眠3秒,然后把這個callable封裝成一個futureTask,交給一個線程去運行,最終查看callable的返回結果和阻塞效果。

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable<Long> callable = new Callable<Long>() {
            long sum = 0;

            @Override
            public Long call() throws Exception {
                for (int i = 0; i <= 10000; i++) {
                    sum += i;
                }
                Thread.sleep(3000);//主要是為了演示get()時候的阻塞效果
                return sum;
            }
        };
        FutureTask<Long> futureTask = new FutureTask<>(callable);
        new Thread(futureTask).start();
        Thread.sleep(10);
        System.out.println("main線程繼續執行");
        System.out.println("獲取callable計算結果 = " + futureTask.get());
        System.out.println("main線程繼續執行 ");
    }
}

程序輸出:

 可以看到當futureTask.get()沒有獲取到返回結果時,主線程是處於阻塞狀態。

4.3 手寫一個FutureTask

   要實現一個簡易的FutureTask,通過上面對幾個接口之間關系的介紹,以及閱讀FutureTask代碼可以看出,只需定義一個類,實現Runnable和Future接口,並實現run()方法和get()方法就可以了,核心思想就是上一篇文章中提到的通知/等待機制。直接上代碼:

import java.util.concurrent.*;

public class MyFutureTask<V> implements Runnable, Future<V> {
    private Callable<V> callable;

    private V result = null;

    public MyFutureTask(Callable<V> callable) {
        this.callable = callable;
    }

    @Override
    public void run() {
        V temp = null;
        try {
            temp = callable.call();
        } catch (Exception e) {
            e.printStackTrace();
        }
        synchronized (this) {
            result = temp;
            this.notifyAll();
        }
    }

    @Override
    public V get() throws InterruptedException {
        if (result != null) {
            return result;
        }
        System.out.println("等待結果執行完成。。。。。");
        synchronized (this) {
            this.wait();
        }
        return result;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return false;
    }


    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return null;
    }
}

 為了驗證效果,把上一段代碼中的FutureTask改成MyFutureTask,其余代碼都不變。

import java.util.concurrent.Callable;

public class FutureTaskDemo {
    public static void main(String[] args) throws InterruptedException {
        Callable<Long> callable = new Callable<Long>() {
            long sum = 0;

            @Override
            public Long call() throws Exception {
                for (int i = 0; i <= 10000; i++) {
                    sum += i;
                }
                Thread.sleep(3000);//主要是為了演示get()時候的阻塞效果
                return sum;
            }
        };
        MyFutureTask<Long> futureTask = new MyFutureTask<>(callable);
        new Thread(futureTask).start();
        Thread.sleep(10);
        System.out.println("main線程繼續執行");
        System.out.println("獲取callable計算結果 = " + futureTask.get());
        System.out.println("main線程繼續執行 ");
    }
}

運行程序,可以看到輸出結果和阻塞現象與使用FutureTask一致:

 5、結語

  這篇隨筆就介紹這么多內容,希望大家看了有收獲。原子操作CAS在下一篇文章中介紹,閱讀過程中如發現描述有誤,請指出,謝謝。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM