Java多線程系列之:線程的並發工具類


一,Fork-Join

1,定義:

Fork-Join框架:就是在必要的情況下,將一個大任務,進行拆分(fork)成若干個小任務(拆到不能再拆時),再將一個個的小任務運算的結果進行join匯總。

2,,Fork-Join體現了分而治之。什么是分而治之?

規模為N的問題, 當N < 閾值,直接解決。當 N > 閾值, 將N分解為k個小規模子問題,子問題互相獨立,與原問題形式相同。將子問題的解合並得到原問題大的解。

3,工作密取(workStealing)

4,Fork-Join實戰

4.1,Fork/Join的同步調用同時演示返回值結果: 統計整型數組中所有元素的和

/**
 * 產生整型數組工具類
 */
public class MakeArray {

    //數組長度
    public static final int ARRAY_LENGTH = 4000;
    public static int[] makeArray(){
        //new一個隨機數發生器
        Random rd = new Random();
        int[] result = new int[ARRAY_LENGTH];
        for (int i = 0; i < ARRAY_LENGTH;i++){
            //用隨機數填充數組
            result[i] = rd.nextInt(ARRAY_LENGTH*3);
        }
        return result;
    }
}
/**
 * 使用Fork-Join框架進行計算
 */
public class SumArray {

    private static class SumTask extends RecursiveTask<Integer>{

        private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10;
        private int[] src;//要實際應用的數組
        private int fromIndex;//開始統計的下標
        private int toIndex;//統計到哪里結束的下標

        public SumTask(int[] src,int fromIndex,int toIndex){
            this.src = src;
            this.fromIndex = fromIndex;
            this.toIndex = toIndex;
        }
        
        @Override
        protected Integer compute() {
            if(toIndex - fromIndex < THRESHOLD){
                int count = 0;
                for(int i = fromIndex;i <= toIndex;i++){
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    count = count + src[i];
                }
                return count;
            }else {
                //fromIndex ..... mid....... toIndex。這里我們自己定義的算法:大於閾值就平均分為兩部分
                int mid = (fromIndex + toIndex)/2;
                SumTask left = new SumTask(src,fromIndex,mid);
                SumTask right = new SumTask(src,mid,toIndex);
                invokeAll(left,right);
                return left.join() + right.join();
            }
        }
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        int[] src = MakeArray.makeArray();
        SumTask innerFind = new SumTask(src,0,src.length-1);
        long start = System.currentTimeMillis();
        pool.invoke(innerFind);//同步調用
        System.out.println("Task is Running.......");
        System.out.println("the count is "+ innerFind.join()+" spend time:"+(System.currentTimeMillis()-start)+"ms");
    }

    /**
     * 注意:
     * 對於這種簡單的相加操作,其實單線程處理的速度更快。
     * 使用forkjoin后,使用多線程進行處理。由於需要線程間的切換(上下文切換),導致forkjoin的處理方式花的時間更多。
     * 所以使用forkjoin一定要注意場合。
     * 這也是redis雖然使用單進程單線程模式,但是處理能力非常強的原因,就是因為redis處理的數據比較簡單(String)。
     * 並且使用單線程處理避免了進程間的切換。
     */
}

4.2,Fork/Join的異步調用同時演示不要求返回值:遍歷指定目錄(含子目錄),尋找指定類型文件

/**
 * 遍歷指定目錄(含子目錄),找尋指定類型文件
 * 不需要返回值的的Fork/Join
 */
public class FindDirsFiles extends RecursiveAction{

    //當前任務需要搜尋的目錄
    private File path;

    public FindDirsFiles(File path){
        this.path = path;
    }

    @Override
    protected void compute() {
        List<FindDirsFiles> subTasks = new ArrayList<>();
        File[] files = path.listFiles();//拿到目錄下文件
        if (files != null){
            for (File file : files){
                if (file.isDirectory()){
                    //對每一個子目錄都新建一個子任務
                    subTasks.add(new FindDirsFiles(file));
                }else {
                    //遇到文件,檢查
                    if (file.getAbsolutePath().endsWith("txt")){
                        System.out.println("文件:"+ file.getAbsolutePath());
                    }
                }
            }
            if (!subTasks.isEmpty()){

                for (FindDirsFiles subTask:invokeAll(subTasks)){
                    //上面的invlkeAll():用來遞交子任務
                    subTask.join();//等待子任務
                }
            }
        }
    }

    public static void main(String[] args) {
        try {
            //用一個ForkJoinPool 實例調度總任務
            ForkJoinPool pool = new ForkJoinPool();
            FindDirsFiles task = new FindDirsFiles(new File("D:\\yishang"));
            pool.execute(task);

            System.out.println("task is running........");

            //主線程做一些自己的事情
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int otherWork = 0;
            for (int i = 0; i<100;i++){
                otherWork = otherWork + i;
            }

            System.out.println("main Thread done sth ....., otherWork = "+otherWork);
            task.join();//阻塞方法,
            System.out.println("task end");

        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

 二,CountDownLatch:計數器

1,方法:

  latch.countDown():調用該方法,計數器的數量減一

  latch.await():調用該方法,如果計數器的數量沒有減為0,那么就該方法會阻塞,知道計數器的數量為0才繼續執行后面的代碼

2,示例代碼:當初始化工作完成以后,才執行業務邏輯代碼

/**
 * 演示CountDownLatch,有5個初始化的線程,6個扣除點。
 * 扣除完畢以后,主線程和業務線程才能繼續自己的工作
 */
public class UseCountDownLatch {

    static CountDownLatch latch = new CountDownLatch(6);

    /**
     * 初始化線程
     */
    private static class InitThread implements Runnable{
        @Override
        public void run() {
            System.out.println("InitThread_"+Thread.currentThread().getId()+
            " ready init work .......");
            latch.countDown();//初始化線程完成工作了
            //初始化線程調用了countDown()以后,還是可以繼續走自己的邏輯的
            for (int i = 0; i < 2; i++) {
                System.out.println("InitThread_"+Thread.currentThread().getId()+
                ".....continue to its work");
            }
        }
    }

    /**
     * 業務線程
     * 等所有的初始化線程的初始化工作做完了,業務線程才能執行
     */
    private static class BusiThread implements Runnable{
        @Override
        public void run() {
            try {
                //業務線程阻塞,直到countDown減為0,才往下執行
                latch.await();//阻塞方法
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (int i = 0; i < 2; i++) {
                System.out.println("BusiThread_"+Thread.currentThread().getId()+
                " do business");
            }
        }
    }
    
    public static void main(String[] args)throws InterruptedException {

        //單獨的初始化線程,初始化分為2,需要扣減2次
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread_"+Thread.currentThread().getId()+"ready init work step 1st....");
                latch.countDown();//每完成一步初始化工作,扣減一次
                System.out.println("begin step 2nd.......");
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread_"+Thread.currentThread().getId()+"ready init work step 2nd....");
                latch.countDown();//每完成一步初始化工作,扣減一次

            }
        }).start();

        new Thread(new BusiThread()).start();

        for (int i = 0; i < 4; i++) {
            Thread thread = new Thread(new InitThread());
            thread.start();
        }
        //主線程阻塞,必須等countDown減為0,才往下執行
        latch.await();
        System.out.println("main do its work .........");


    }
}

三,CyclicBarrier:柵欄

1,方法:

  barrier.await():等所有線程執行到該方法時,才能繼續向前執行。否則,一直阻塞在這里

2,示例代碼:

/**
 * 演示:CyclicBarrier,當所有的線程都來到了barrier.await();線程才繼續往下執行。不然就一直阻塞在這個方法前
 * 可以類比人員到指定的集合場地,然后在一起出發的場景。比如出去旅游,等所有的人都來到集合地點,然后大家才一起出發。
 */
public class UseCyslicBarrier {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    //工作線程
    private static class SubThread implements Runnable{
        @Override
        public void run() {
            long id = Thread.currentThread().getId();

            //為了模擬真實環境,每個線程到達barrier.await()方法的時間不一樣。隨即決定工作線程是否睡眠
            Random random = new Random();
            try {
                if (random.nextBoolean()){
                    Thread.sleep(1000+id);
                    System.out.println("Thread_"+id+" 在來的路上堵車了,堵車時間 "+(1000+id)+"ms");
                }
                System.out.println("Thread_"+id+" 在來的路上沒有堵車,提前到達集合地點,然后在集合地點等待其他人員.... ");
                //當5個線程都執行到了這個地方,然后所有的線程繼續往下執行。
                barrier.await();
                Thread.sleep(1000+id);
                System.out.println("Thread_"+id+"開始上車");
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(new SubThread());
            thread.start();
        }
    }
}

四,Semaphore:信號燈(控制並發執行的線程個數

1,方法:

  sp.acquire():獲得信號燈

  sp.release():釋放信號燈

2,圖示理解:

  

3,示例代碼:

/**
 * 信號燈:控制並發執行的線程個數
 */
public class SemaphoreTest {
    public static void main(String[] args) {

        //最多運行3個線程並發執行
        final Semaphore sp=new Semaphore(3);

        Runnable runnable=new Runnable() {
            @Override
            public void run() {
                try {
                    sp.acquire();//獲得信號燈
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("線程"+Thread.currentThread().getName()+"進入,還有"+(3-sp.availablePermits())+"個線程");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("線程"+Thread.currentThread().getName()+"離開,還有"+(3-sp.availablePermits())+"個線程");
                //釋放信號燈
                sp.release();
            }
        };

        //開啟20個線程
        for (int i = 0; i < 20; i++) {
            Thread thread = new Thread(runnable);
            thread.start();
        }
    }
}

 五,Exchanger(兩個線程之間做數據交換)

1,方法:

  exchanger.exchange(data):該方法一直阻塞到另外一個線程過來交換數據

2,示例代碼:

public class ExchangerTest {

    public static void main(String[] args) {

        final Exchanger exchanger = new Exchanger();

        //線程1
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String data1 = "aaa";
                    System.out.println("線程"+Thread.currentThread().getName()+":當前的數據是"+data1+
                            ",該線程正在准備把 "+data1+"換出去");
                    String data2 = (String) exchanger.exchange(data1);
                    System.out.println("線程"+Thread.currentThread().getName()+
                            "換回的數據為"+data2);
                }catch (InterruptedException e){
                }
            }
        }).start();

        //線程二
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(new Random().nextInt(3000));
                    String data1 = "bbb";
                    System.out.println("線程"+Thread.currentThread().getName()+":當前的數據是"+data1+
                            ",該線程正在准備把 "+data1+"換出去");
                    String data2 = (String) exchanger.exchange(data1);
                    System.out.println("線程"+Thread.currentThread().getName()+
                            "換回的數據為"+data2);
                }catch (InterruptedException e){
                }
            }
        }).start();

    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM