JUC學習筆記(三):線程池和ForkJoin


使用線程池以前,需要先了解阻塞隊列:

阻塞隊列

  • BlockingQueue

  • 先進先出的數據結構。

  • 寫:如果隊列滿了,就必須阻塞等待消費。

  • 取:如果隊列是空的,就必須阻塞等待生產。

  • 一般在多線程並發處理和線程池中使用。

  • 有這些實現類,重要的是紅色中的

 

 

  • 繼承結構其實和List類似

 

 

主要API

阻塞隊列有4組API,其實就是添加、移除和查看隊首元素的4組方法。

這4組方法面對無法執行的時候會有着不同的行為,可以根據業務需求去使用。

比如隊列滿了,無法添加元素的時候,add方法會拋出異常,offer則會返回false....

方式 拋出異常 有返回值 阻塞等待 超時等待
添加 add offer put offer(,int,TimeUnit)
移除 remove poll take poll(int,TimeUnit)
判斷隊列首 element peek    

 

  1. 拋出異常

        //1.拋出異常
        public static void test1(){
            //實例化一個容納3個元素的隊列
            ArrayBlockingQueue<Object> blockQueue = new ArrayBlockingQueue<>(3);
            //添加元素會返回true false
            System.out.println(blockQueue.add("A"));
            System.out.println(blockQueue.add("B"));
            System.out.println(blockQueue.add("C"));
            //添加第4個元素會拋出異常 :java.lang.IllegalStateException: Queue full
    //        System.out.println(blockQueue.add("D"));
    //彈出元素會返回元素值
            System.out.println(blockQueue.remove());
            System.out.println(blockQueue.remove());
            System.out.println(blockQueue.remove());
    ​
            //空的集合再彈出元素會拋出異常: java.util.NoSuchElementException
            System.out.println(blockQueue.remove());
        }

     

  2. 返回值

       //2.不拋出異常給返回值
        public static void test2(){
            ArrayBlockingQueue<Object> blockQueue = new ArrayBlockingQueue<>(3);
            System.out.println(blockQueue.offer("A"));
            System.out.println(blockQueue.offer("B"));
            System.out.println(blockQueue.offer("C"));
            System.out.println(blockQueue.offer("D")); //不拋出異常,返回false
    ​
            
            System.out.println(blockQueue.element());//查看隊首元素值,拋異常
            System.out.println(blockQueue.poll());
            System.out.println(blockQueue.element());
            System.out.println(blockQueue.poll());
            System.out.println(blockQueue.poll());
            System.out.println(blockQueue.poll());//不拋出異常,返回null
            System.out.println(blockQueue.peek());//查看隊首元素值,返回null
            System.out.println(blockQueue.element());
        }
    ​

     

     
  3. 阻塞等待

       //3.阻塞,一直等着直到隊列中有空的位置
        public static void test3() throws InterruptedException {
            ArrayBlockingQueue<String> blockQueue = new ArrayBlockingQueue<>(3);
            blockQueue.put("A");
            blockQueue.put("B");
            blockQueue.put("C");
    //        blockQueue.put("D");//會一直阻塞等待
    ​
            System.out.println(blockQueue.take());
            System.out.println(blockQueue.take());
            System.out.println(blockQueue.take());
            System.out.println(blockQueue.take());//會一直阻塞等待
    ​
    ​
        }
    ​

     

     

  4. 超時等待


       //超時退出
        public static void test4() throws InterruptedException {
            ArrayBlockingQueue<String> blockQueue = new ArrayBlockingQueue<>(3);
            blockQueue.offer("A");
            blockQueue.offer("B");
            blockQueue.offer("C");
            blockQueue.offer("D",2, TimeUnit.SECONDS); //等待超過兩秒退出
    ​
            System.out.println(blockQueue.poll());
            System.out.println(blockQueue.poll());
            System.out.println(blockQueue.poll());
            System.out.println(blockQueue.poll(2,TimeUnit.SECONDS));
        }
    ​
     

     

 

 

同步隊列

  • 同步隊列:SynchronousQueue

  • 實際上是AbstractQueue(非阻塞隊列)的子類,但是又繼承了BlockingQueue(阻塞隊列)

public class SynchronousQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, Serializable

 

  • 這個隊列沒有容量。

  • 1個線程要往隊列里插入內容,必須是有另一個往隊列里刪除內容的線程存在,兩個線程共存才能插入、刪除成功。否則無法插入,也沒有元素可以刪除( poll() 會返回 null)。

  • 這個隊列也不能迭代,因為沒有容量。

package com.rzp.rw;
​
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
​
//同步隊列
public class SynchronousQueueDemo {
    public static void main(String[] args) {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+"put 1");
                synchronousQueue.put("1");
                System.out.println(Thread.currentThread().getName()+"put 2");
                synchronousQueue.put("2");
                System.out.println(Thread.currentThread().getName()+"put 3");
                synchronousQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"T1").start();
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName()+"="+synchronousQueue.take());
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName()+"="+synchronousQueue.take());
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName()+"="+synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"T2").start();
    }
}

 

線程池

池化技術

  • 程序的運行,本質就是占用系統的資源,為了優化資源的使用,就出現了池化技術。

  • 線程池、連接池、內存池、對象池。。。

  • 池化技術:因為開啟和關閉特別消耗資源,池化技術的核心就是事先准備好一定的資源,需要用的時候在這里取,用完以后再還回去。

好處

  • 因為減少了創建和關閉資源的行為,因此:

    • 降低資源的消耗

    • 提高響應的速度

    • 方便管理

  • 線程復用,可以控制最大並發數,管理線程。

阿里巴巴

  • 摘自阿里巴巴開發手冊

【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣
的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
說明:Executors 返回的線程池對象的弊端如下:  
1)FixedThreadPool SingleThreadPool :
   允許的請求隊列長度為 Integer.MAX_VALUE(約為21億),可能會堆積大量的請求,從而導致 OOM。
2)CachedThreadPool ScheduledThreadPool :
   允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。

三大方法

package com.rzp.pool;
​
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
​
public class Demo1 {
​
    public static void main(String[] args) {
        ExecutorsTest();
    }
​
    public static void ExecutorsTest() {
        //這個就是使用Executor創建線程池的3大方法。
//        ExecutorService threadPool = Executors.newSingleThreadExecutor();//單個線程
//        ExecutorService threadPool = Executors.newFixedThreadPool(5); //創建一個固定大小的線程池
        ExecutorService threadPool = Executors.newCachedThreadPool(); //創建可伸縮線程池
try {
            for (int i = 0; i < 10; i++) {
                //從線程池中獲取線程
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //關閉線程池
            threadPool.shutdown();
        }
    }
}

 

 

 

 

 

七大參數

三大方法的源碼:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            //new ThreadPoolExecutor
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newFixedThreadPool(int nThreads) {
        //new ThreadPoolExecutor
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool() {
        //new ThreadPoolExecutor
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

 


7大參數

  • 可以看到,3個方法其實都是調用new ThreadPoolExecutor

//    ThreadPoolExecutor構造方法的7個參數,就是所謂的7大參數
public ThreadPoolExecutor(int corePoolSize, //核心線程池大小
                              int maximumPoolSize, //最大線程池大小
                              long keepAliveTime, //超時等待時間
                              TimeUnit unit, //超時單位
                              BlockingQueue<Runnable> workQueue //阻塞隊列
                               ThreadFactory threadFactory, //線程工廠
                              RejectedExecutionHandler handler //拒絕策略
                         ) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
​

 


作用

  • 7大參數是如何起作用的:

  1. 線程工廠:創建線程的工廠,這個不需要改變。

  2. 核心線程池大小:開啟線程池后,就會啟用的線程數量,設為2。

    • 假如核心線程池都被占用,也就是2條都在使用了,這時候如果有第3個程序要求調用,第3個程序就會進入阻塞隊列之中等待:

  3. 阻塞隊列:用於存放等待獲取線程的容器。

    • 假如核心線程池中有線程釋放了,3就會獲得線程。

    • 假如核心線程一直在執行,這時又來了4、5...,這些都會進入阻塞隊列,一旦阻塞隊列也滿了,這時候就會開啟新的線程:

  4. 最大線程池大小:最多允許開始的線程數量,阻塞隊列滿員后逐個開放使用,直到達到最大值。

    • 一旦達到最大值了、並且阻塞隊列也滿了,就會啟用拒絕策略:

  5. 拒絕策略:最大線程池、阻塞隊列都滿員的情況下,如果還有新的程序要求調用線程,就會按拒絕策略對該程序反饋。

  6. 超時等待時間、超時單位:核心線程池中的線程,超過等待時間都沒有被調用,就會被關閉。

    • 也就是說,上面的例子中,如果最后阻塞隊列空了,所有線程也都釋放了,那么除了核心線程中的2條線程會持續開啟以外,其他線程超過等待時間后就會關閉掉。

 

創建線程池

  public static void ThreadPoolTest() {
​
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2, 5, 3, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());
​
        try {
            for (int i = 0; i < 15; i++) {
                int num = i;
                //從線程池中獲取線程
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "=="+num+"ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //關閉線程池
            threadPool.shutdown();
        }
    }

 


拒絕策略

  • 就是這4個類

 

 

    /**  拋出RejectedExecutionException異常
     */
    public static class AbortPolicy implements RejectedExecutionHandler 
    //測試結果:拋出異常
    
        
    /**  在原線程中直接調用run方法,讓原線程執行
     * 在下面例子中,把for循環數量調高,可以看到線程名稱是main
     * 如果再來就會阻塞等待
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler 
    //測試結果:表面看沒有異常,而且所有任務都會執行完
    
  /** 把隊首的任務丟棄,然后把新任務放在隊尾
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler
    //測試結果,會有任務丟失
        
        
    /**隊列滿了,不會拋出異常,但是任務會被拋棄
     */
    public static class DiscardPolicy implements RejectedExecutionHandler
     //測試結果,會有任務丟失

 

使用策略

  • 如何定義線程池的最大數量?一般有兩種策略

  1. Cpu密集型 :按處理器數量定義,Cpu利用率最高。

            //獲取處理器數量
            System.out.println(Runtime.getRuntime().availableProcessors());
    ​
            //用處理器數量定義
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                    2, Runtime.getRuntime().availableProcessors(), 3, TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(3),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.CallerRunsPolicy());
  2. IO密集型 :判斷程序中消耗IO的線程,定義為IO線程的兩倍

學習ForkJoin前需要對1.8的函數式接口學習

四大函數式接口

函數式接口

  • 只有一個抽象方法的接口。

四大函數式接口

  • 是指java.util.function這個包下的這四個接口:

Consumer
Function
Predicate
Supplier
  • 這四個是function這個包下主要的四個接口,其他都是這四個接口的輔助包,可以從一些名字看出來

 

 

Function

  • Function這個接口要調用apply()方法,輸入和輸入都可以是object(任意類型)

    public static void main(String[] args) {
        //Function函數型接口原始寫法
        Function function = new Function<String,String>() {
            @Override
            public String apply(String o) {
                return o;
            }
        };
        System.out.println(function.apply("asd0"));;
​
        //Function lamda表達式簡寫 ,寫一個工具方法就很簡單了
        Function<String,String> functiong = (str)->{return str;};
​
    }

 


對比scala語言,scala寫一個函數,雖然沒scala簡單,但是確實比沒函數式接口之前簡單很多了:

    val f2 = (n1:Int,n2:Int) => n1+n2

 

Predicate

  • 調用test方法,輸入是object,返回是Boolean,,用於判斷

    //predicate的特點是輸入是object,返回的只能是Boolean,調用的方法名是test,用於判斷
    public static void predicate(){
        Predicate<String> emptyIf = str -> { return str.isEmpty();};
​
        System.out.println(emptyIf.test(" "));;
    }

 

Consumer

  • 調用accept()方法,輸入是object,沒有返回值

public static void ComsumeTest(){
    //Consumer  accept(T t); 只有輸入,沒有返回值
    Consumer<String> consumer = str ->{System.out.println(str); };
    consumer.accept("123");
​
}

 

Supplier

  • 調用get(T t); 沒有輸入,只有返回值

    public static void SupplierTest(){
        //Consumer  get(T t); 沒有輸入,只有返回值
        Supplier supplier = () ->{return 1024;  };
        System.out.println(supplier.get());
​
    }

 

Stream流式計算

  • 和scala那套那不多,寫起來還復雜點

package com.rzp.streamCount;
​
import com.sun.xml.internal.ws.api.model.wsdl.WSDLOutput;
​
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
​
public class Demo1 {
    public static void main(String[] args) {
        User user1 = new User(1, "a", 21);
        User user2 = new User(2, "b", 22);
        User user3 = new User(3, "c", 23);
        User user4 = new User(4, "d", 24);
        User user5 = new User(5, "e", 25);
        //存儲交給集合
        List<User> users = Arrays.asList(user1, user2, user3, user4, user5);
​
        //計算交給流
        //轉換成流
        Stream<User> stream = users.stream();
​
        //使用流的filter函數式接口,filter只能輸入Predicate函數
        //和scala非常類似,就是把容器中的每個元素放在函數里,把判定為true的放在一個新的數組里
        Stream<User> filterStream = stream.filter(u -> {
            return u.getId() % 2 == 0;
        });
        //map,輸入Function函數
        Stream<String> userStream = filterStream.map(u -> {
            return u.getName().toUpperCase();
        });
​
        //排序
        Stream<String> sorted = userStream.sorted((u1, u2) -> {
            return u1.compareTo(u2);
        });
​
        //只要第一個
        Stream<String> limit = sorted.limit(1);
​
        //foreach,只能輸入Consumer函數遍歷每個元素
        limit.forEach(System.out::println);
​
        //和scala一樣,寫成一行代碼
        users.stream().filter(u->{return u.getId() % 2 == 0;})
                .map(u -> {return u.getName().toUpperCase();})
                .sorted((u1, u2) -> {return u1.compareTo(u2);}).limit(1).forEach(System.out::println);
​
    }
}

 


ForkJoin

  • JDK 1.7中出現。

  • 並行執行任務,提高效率,大數據量計算。

  • 類似MR,就是把大任務拆分成小任務並行計算。

  • ForkJoin的特點是工作竊取,就是B先完成自己的任務了,可以去把A未完成的任務拿來做。

案例:計算10億的求和

ForkJoin使用方法

  • 繼承這兩個類中的一個,重寫compute方法。

  • 在compute方法里面要實現拆分的邏輯,拆分要遞歸調用自身構造器。

 

 

package com.rzp.forkjoin;
​
import java.util.concurrent.RecursiveTask;
​
public class Demo1 extends RecursiveTask<Long> {
​
    /**
     * ForkJoin使用方法
     */private Long start;
    private Long end;
    //臨界值
    private Long temp = 10000L;
​
    public Demo1(Long start, Long end) {
        this.start = start;
        this.end = end;
    }
​
    @Override
    protected Long compute() {
        if ((end - start) < temp) {
            Long sum = 0l;
            for (Long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            long middle = (start + end) / 2;
            //拆分任務
            Demo1 task1 = new Demo1(start, middle);
            task1.fork();//把任務壓入線程隊列
            Demo1 task2 = new Demo1(middle+1,end);
            task2.fork();
            return task1.join()+task2.join();
        }
    }
}

 

測試對比

package com.rzp.forkjoin;
​
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
​
public class TestA {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
//        test1(); //耗時:5517
//        test2(); //耗時:4325 可以修改臨界值,增加更多的任務來計算
        test3(); //耗時: 148
    }
​
    //傳統方式
    public static void test1() {
        Long sum = 0l;
        long start = System.currentTimeMillis();
        for (Long i = 1l; i <= 10_0000_0000; i++) {
            sum += i;
        }
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+" 時間: "+(end - start));
    }
​
    //ForkJoin
    public static void test2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        //實例化forkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //通過execute方法輸入ForkJoinTask執行(我們繼承了RecursiveTask,也是ForkJoinTask的子類)
//        forkJoinPool.execute(new Demo1(0L,10_0000_0000l)); //執行任務
        ForkJoinTask<Long> submit = forkJoinPool.submit(new Demo1(0L, 10_0000_0000l));//提交任務,和執行任務的差異是提交任務會返回結果
​
        Long sum = submit.get();
​
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+" 時間: "+(end - start));
    }
​
    public static void test3(){
        long start = System.currentTimeMillis();
        //Stream並行流
        //parallel 使用多線程並行計算
        //reduce 和scala一樣,參數1是初始值,后面是遞歸計算的函數
        //Long::sum就是調用Long的sum方法,可以寫成:(u1,u2) ->{ return  Long.sum(u1,u2);}
        long sum = LongStream.rangeClosed(0l, 10_0000_0000l).parallel().reduce(0,Long::sum) ;
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+" 時間: "+(end - start));
    }
}

 


備注:多次修改數量級測試:

  • 發現千萬級以上流式處理才優於傳統方法(幾十微秒),億級流式是傳統速度的10倍,因此不要盲目使用流式。

異步回調

Future

  • 類似Ajax,是Java原生的異步方式。

runAsync

package com.rzp.future;
​
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
​
//異步調用
public class Demo1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //發起一個請求
        //沒有返回值的異步回調,使用runAsync方法,如果沒有返回值,泛型可以寫Void
        CompletableFuture<Void> comFuture = CompletableFuture.runAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"runAsync => Void");
        });
        //不需要等待返回值,直接往下執行
        System.out.println("1111111111111111");
        //輸出
        //1111111111111111
        //ForkJoinPool.commonPool-worker-9runAsync => Void
​
        comFuture.get();
    }
​
}

 


supplyAsync

  • 有返回值的異步回調

package com.rzp.future;
​
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
​
public class Demo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //發起一個請求
        //有返回值的異步回調,使用runAsync方法,如果沒有返回值,泛型可以寫Void
        CompletableFuture<Integer> supFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer");
            int i = 10 /0;
            return 1024;
        });
​
        System.out.println(supFuture.whenComplete((t,u)->{
            System.out.println("t="+t); //正常的返回結果
            System.out.println("u="+u); //正常的時候是null,如果出現錯誤,就是錯誤信息
        }).exceptionally((e)->{
            System.out.println(e.getMessage());  
            return 233; //可以獲得錯誤的返回結果
        }).get());;
​
    }
}
​

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM