java 線程池(線程的復用)


一. 線程池簡介 

1. 線程池的概念:

          線程池就是首先創建一些線程,它們的集合稱為線程池。使用線程池可以很好地提高性能,線程池在系統啟動時即創建大量空閑的線程,程序將一個任務傳給線程池,線程池就會啟動一條線程來執行這個任務,執行結束以后,該線程並不會死亡,而是再次返回線程池中成為空閑狀態,等待執行下一個任務。

2. 線程池的工作機制

         2.1 在線程池的編程模式下,任務是提交給整個線程池,而不是直接提交給某個線程,線程池在拿到任務后,就在內部尋找是否有空閑的線程,如果有,則將任務交給某個空閑的線程。

         2.1 一個線程同時只能執行一個任務,但可以同時向一個線程池提交多個任務。

3. 使用線程池的好處

 Java中的線程池是運用場景最多的並發框架,幾乎所有需要異步或並發執行任務的程序都可以使用線程池。在開發過程中,合理地使用線程池能夠帶來3個好處:

  第一:降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。

  第二:提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。

  第三:提高線程的可管理性。線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控。但是,要做到合理利用線程池,必須對其實現原理了如指掌。


 二:JDK對線程池的支持

 JDK提供的Executor框架

  JDK提供了Executor框架,可以讓我們有效的管理和控制我們的線程,其實質也就是一個線程池。Executor下的接口和類繼承關系如下:

 Executors:提供了一系列靜態工廠方法用於創建各種線程池

 其中常用幾類如下:

public static ExecutorService newFixedThreadPool()
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool()

  1、newFixedThreadPool:該方法返回一個固定線程數量的線程池;

  2、newSingleThreadExecutor:該方法返回一個只有一個現成的線程池;

  3、newCachedThreadPool:返回一個可以根據實際情況調整線程數量的線程池;

  4、newSingleThreadScheduledExecutor:該方法和newSingleThreadExecutor的區別是給定了時間執行某任務的功能,可以進行定時執行等;

  5、newScheduledThreadPool:在4的基礎上可以指定線程數量。

 創建線程池實質調用的還是ThreadPoolExecutor

 在Executors類中,我們拿出來一個方法簡單分析一下:

  可以看出,類似的其他方法一樣,在Executors內部創建線程池的時候,實際創建的都是一個ThreadPoolExecutor對象,只是對ThreadPoolExecutor構造方法,進行了默認值的設定。ThreadPoolExecutor的構造方法如下:

參數含義如下:

1、corePoolSize 核心線程池大小;
2、maximumPoolSize 線程池最大容量大小;
3、keepAliveTime 線程池空閑時,線程存活的時間;
4、TimeUnit 時間單位;
5、ThreadFactory 線程工廠;
6、BlockingQueue任務隊列;
7、RejectedExecutionHandler 線程拒絕策略;

Executor框架實例

1、實例一:

public class ThreadPoolDemo {
 
    public static void main(String[] args) {
 
        ExecutorService executorService = Executors.newFixedThreadPool(4);
 
        for (int i = 0; i < 10; i++) {
            int index = i;
            executorService.submit(() -> System.out.println("i:" + index +
                    " executorService"));
        }
        executorService.shutdown();
    }
}

submit(Runnable task)方法提交一個線程。

但是使用最新的“阿里巴巴編碼規范插件”檢測一下會發現:

線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,
這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。 
說明:Executors各個方法的弊端:
 
1)newFixedThreadPoolnewSingleThreadExecutor:
  主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至OOM。
2)newCachedThreadPoolnewScheduledThreadPool:
  主要問題是線程數最大數是Integer.MAX_VALUE,可能會創建數量非常多的線程,甚至OOM

2、實例二:

遵循阿里巴巴編碼規范的提示,示例如下:

public class ThreadPoolDemo {
 
    public static void main(String[] args) {
 
        ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, 
                TimeUnit.MILLISECONDS, 
                new LinkedBlockingQueue<>(10), 
                Executors.defaultThreadFactory(), 
                new ThreadPoolExecutor.AbortPolicy());
 
        for (int i = 0; i < 10; i++) {
            int index = i;
            executorService.submit(() -> System.out.println("i:" + index + 
                    " executorService"));
        }
        executorService.shutdown();
    }
}

或者這樣:

public class ThreadPoolDemo {
 
    public static void main(String[] args) {
 
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 2, 0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(10),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
 
        for (int i = 0; i < 10; i++) {
            int index = i;
            pool.submit(() -> System.out.println("i:" + index +
                    " executorService"));
        }
        pool.shutdown();
    }
}

3、實例三:

自定義ThreadFactory、自定義線程拒絕策略

public static void main(String[] args) {
 
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(10),
                new ThreadFactory() { //自定義ThreadFactory
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName(r.getClass().getName());
                        return thread;
                    }
                },
                new ThreadPoolExecutor.AbortPolicy()); //自定義線程拒絕策略
 
        for (int i = 0; i < 10; i++) {
            int index = i;
            executorService.submit(() -> System.out.println("i:" + index));
        }
 
        executorService.shutdown();
    }
}

使用submit的坑

首先看一下實例:

public class ThreadPoolDemo3 {
 
    public static void main(String[] args) {
 
        ExecutorService executorService = Executors.newFixedThreadPool(4);
 
        for (int i = 0; i < 5; i++) {
            int index = i;
            executorService.submit(() -> divTask(100, index));
        }
        executorService.shutdown();
    }
private static void divTask(int a, int b) { double result = a / b; System.out.println(result); } }

運行結果:

  上述代碼,可以看出運行結果為4個,因該是有5個的,但是當i=0的時候,100/0是會報錯的,但是日志信息中沒有任何信息,是為什么那?如果使用了submit(Runnable task) 就會出現這種情況,任何的錯誤信息都出現不了!

  這是因為使用submit(Runnable task) 的時候,錯誤的堆棧信息跑出來的時候會被內部捕獲到,所以打印不出來具體的信息讓我們查看,解決的方法有如下兩種:

 1、使用execute()代替submit()

public class ThreadPoolDemo3 {
 
    public static void main(String[] args) {
 
        ExecutorService executorService = Executors.newFixedThreadPool(4);
 
        for (int i = 0; i < 5; i++) {
            int index = i;
            executorService.execute(() -> divTask(100, index));
        }
        executorService.shutdown();
    }
 
    private static void divTask(int a, int b) {
        double result = a / b;
        System.out.println(result);
    }
}

運行結果:

 

2、使用Future

public class ThreadPoolDemo3 {
 
    public static void main(String[] args) {
 
        ExecutorService executorService = Executors.newFixedThreadPool(4);
 
        for (int i = 0; i < 5; i++) {
            int index = i;
            Future future = executorService.submit(() -> divTask(200, index));
            try {
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        executorService.shutdown();
    }
 
    private static void divTask(int a, int b) {
        double result = a / b;
        System.out.println(result);
    }
}

運行結果:

 

3、execute和submit的區別

(1)execute()方法用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功。通過以下代碼可知execute()方法輸入的任務是一個Runnable類的實例。

(2)submit()方法用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間后立即返回,這時候有可能任務沒有執行完。

 


思考:線程池中的工作線程是如何實現線程復用的?

  一個線程一般在執行完任務后就結束了,怎么再讓他執行下一個任務呢? 

  線程重用的核心是,我們知道, Thread.start()只能調用一次,一旦這個調用結束,則該線程就到了stop狀態,不能再次調用start
則要達到復用的目的,則必須從Runnable接口的run()方法上入手,可以這樣設計這個Runnable.run()方法(就叫外面的run()方法):
本質上是個無限循環,跑的過程中不斷檢查我們是否有新加入的子Runnable對象(就叫內部的runnable:run()吧,它就是用來實現我們自己的任務),有就調一下我們的run(),其實就一個大run()把其它小run()#1,run()#2,...給串聯起來了,基本原理就這么簡單
不停地處理我們提交的Runnable任務。
public void run() {
    while(true) {
        if(tasks available) {
           Runnable task = taskqueue.dequeue();
           task.run();
        } else {
           // wait or whatever
        }
    }
}

 下面舉個代碼實例來模擬實現線程池復用線程

  生產了兩個 線程作為工人

  生產了10個同樣的任務,讓他們執行

  利用復用讓 2個線程完成10個任務

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;

public class Mythreadpool {
    LinkedList<Task> taskList = new LinkedList<Task>();
    
    class Task { //任務類
        int id;
        Task(int id){
            this.id=id;
            System.out.println("第"+id+"個任務產生");
        }
        public void run() {//具體的工作
            System.out.println("第"+id+"個任務正在執行");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第"+id+"個任務執行完畢");
        }
    }
    
    class Worker extends Thread { //工人實體
        String name;
        Worker(String name) {
            this.name = name;
        }
        
        public void run() {
            while(true) {
                if(taskList.size() == 0) {
                    try {
                        synchronized (taskList) {
                            System.out.println("Worker " + name+" 沒有任務");
                            taskList.wait(); //沒得到任務,進入tasklist的等待隊列
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                synchronized (taskList) {
                    System.out.println("Worker " + name+" 得到任務");
                    taskList.removeFirst().run();
                }
            }
        }
    }
    
    void pool() {  //工人。只生產了兩個工人
         ArrayList<Worker> wokerlist=new ArrayList<Worker>();
         for(int i=0;i<2;i++) {
             Worker k = new Worker("第"+(i+1)+"個工人");
             k.start();
             wokerlist.add(k);//
         }
    }
    
    class Factory extends Thread{ //生產任務的線程,總共會生產10個任務
        public void run() {
            for(int i=0;i<10;i++) {
                synchronized(taskList) {
                    taskList.addLast(new Task(i+1));
                    taskList.notify();
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    public static void main(String[] args) {
        Mythreadpool mythreadpool = new Mythreadpool();
        mythreadpool.pool(); //初始化工人
        Mythreadpool.Factory m= mythreadpool.new Factory();
        m.start();
    }
}

執行效果:


分析jdk中是如何實現線程復用的

線程復用

即,如何將放入線程中的諸多任務,在N個線程中執行的。

ThreadPoolExecutor.execute()

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
分析:可以看出:ThreadPoolExecutor.execute()的功能就是:
1、將任務添加至阻塞隊列workQueue,workQueue.offer(command)
2、根據core和maxPool,選擇是否創建Worker,addWorker()

 因此,線程復用的實現應該在worker中,打開addWorker()方法觀察

addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
                //創建worker
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
                //啟動worker
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
            //ThreadExecutor的全局鎖,在創建\銷毀worker工作池的時候,才會用到
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
分析:addworker分為兩部分:1、創建worker,2、啟動worker
規則校驗:
與core和maxPool數量的規則相同
創建worker:
獲取ThreadLocal的全局鎖。 安全的創建Worker。
t.start();

因此:重點又回到了Worker的run方法上

Worker.run()

public void run() {
            runWorker(this);
}
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
分析:這里就比較清晰了:
1、通過getTask()方法,獲取待執行的任務。
2、通過task.run();執行具體的任務。
3、正常情況,只有當所有任務執行完畢才會停止運行。

 因此:
  1、進一步分析getTask()
  2、執行task.run()方法。-->>這里可以看出,事實上線程在執行任務的時候,本質上是調用了任務自身的run/call方法。

==》》有點像是thread.get(threadlocal) 本質上是調用了 threadlocalMap.get(thread) 的感覺

getTask()

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
分析:也不用把代碼完全細節完全深究,可以發現方法是從workQueue中獲取task的,所以最終的問題就是看這個變量workQueue是誰的成員變量。
public class ThreadPoolExecutor extends AbstractExecutorService {
    private final BlockingQueue<Runnable> workQueue;
    。。。
}
分析,getTask是從線程池中,獲取的任務。即所有的任務都放在ThreadPoolExecutor中,線程池啟動多個Worker去執行任務,每個worker不停的從ThreadPoolExector的workQueue中取出任務,比你高執行task.run()方法,直至所有的任務執行完畢。

 至此分析完畢。

 

資料出處:

  https://blog.csdn.net/yinni11/article/details/81348210

    https://www.jianshu.com/p/93c26498a3c5

    https://blog.csdn.net/qq_38966984/article/details/80415736


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM