線程池詳解


前提:線程池創建有兩種方式,一種是Executors使用默認方法創建,另一種是通過ThreadPoolExecutor自定義,不推薦前者是因為前者的配置很多都是取得integer得最大值,很容易造成OOM

1、線程池核心概念:

  int corePoolSize    核心線程數
  int maximumPoolSize  最大線程數
  long keepAliveTime   核心線程數滿了之后創建的最大線程多久后釋放
  TimeUnit unit      釋放時間的單位,m,h,d
  BlockingQueue<Runnable> workQueue  阻塞消息隊列
  ThreadFactory threadFactory    線程工廠
  RejectedExecutionHandler handler  拒絕策略

2、詳細解釋

  2.1 核心線程數

    當線程是IO密集型時,主要消耗磁盤的讀寫性能,可以設置為2*n,n為當前服務器核數(比如8核16G的服務器設置為16,Runtime.getRuntime().availableProcessors()獲取)

    當線程是CPU密集型時,主要消耗cpu性能,設置為n+1

  2.2最大線程數

    當核心線程核消息隊列都滿了之后才會去創建最大線程,直到達到最大線程數,之后的線程就會執行拒絕策略

  2.3 阻塞消息隊列

    ArrayBlockingQueue:基於數組的先進先出隊列,此隊列創建時必須指定大小,讀寫用一把鎖,性能較差;

    LinkedBlockingQueue:基於鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;一般是用這個,指定了大小則限制具體大小,寫核讀分兩把鎖進行操作,所以性能較好

    synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

    注意:當核心線程數滿了之后,新線程會先存儲在消息隊列中,當消息隊列也滿了之后才會去創建最大線程,直到達到最大線程數,之后的線程就會執行拒絕策略

  2.4 線程工廠

    創建線程的類,可以用默認工廠,也可以自定義線程工廠實現 implements ThreadFactory類,實現newThread方法,自定義工廠的話可以設置線程名或者定義輔助線程

  2.5拒絕策略

    ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。

    ThreadPoolExecutor.DiscardPolicy:丟棄任務,但是不拋出異常。

    ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新提交被拒絕的任務

    ThreadPoolExecutor.CallerRunsPolicy:由調用線程(提交任務的線程)處理該任務

    

    自定義拒絕策略

      需要繼承implements RejectedExecutionHandler 實現public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)方法,此方法中可以通過類型轉換的形式拿到線程具體的類,從而獲取線程相關信息,詳細代碼見下方詳細代碼,需要注意的是,只能轉換線程池execute的Runnable類,如果線程池執行的線程是通過實現Callable完成的,在執行前會把線程封裝成FutureTask,這樣相當於轉換再轉換,就沒法轉換成原來的對象了。

     備注:execute只能提交Runnable線程,submit可以提交所有的Runnable、Callable、Thread線程

-------------------------------分割線-------------------------------------------------------------

詳細實現代碼如下

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author xiufengd
* @date 2021/2/22 14:34
* 未來可期
* 線程池
*/
public class ExecutorTest {

public static class MyThread implements Runnable{

private Integer temp;

public MyThread(Integer temp){
this.temp=temp;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+":MyThread--------------"+temp);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static class MyThread2 extends Thread{

@Override
public void run() {

}

}

public static class MyThread3 implements Callable<Integer>{
Integer temp;
public MyThread3(Integer temp){
this.temp=temp;
}
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName()+":MyThread3--------------"+temp);
Thread.sleep(3000);
return temp;
}
}

public static void main(String[] args) {
ExecutorService pools =
new ThreadPoolExecutor(3, 10, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(5),new Factory(),new Handler());
try {
for (int i = 0; i < 50; i++) {
pools.execute(new MyThread(i));
}
} finally {
pools.shutdown();
}
}



private static class Factory implements ThreadFactory{

private AtomicInteger count = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = "Factory----------------" + count.addAndGet(1);
t.setName(threadName);
System.out.println("線程" + threadName+"創建完成");
return t;
}
}

private static class Handler implements RejectedExecutionHandler{

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
MyThread myThread3 = (MyThread) r;
System.out.println(myThread3.temp+"被阻塞了");
// r.run();
}
}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM