造輪子-ThreadPoolExecutor


以下代碼的實現邏輯出自於公眾號 碼農翻身

《你管這破玩意叫線程池?》

- PS:劉欣老師在我心中是軟件技術行業的大劉。

線程池接口

public interface Executor {
    public void execute(Runnable r);
}
View Code

接口中只有一個抽象方法,execute(Runnable r);它接收一個Runnable,無返回值實現它的子類只需要將傳入的Runnable執行即可。

NewsThreadExecutor

package com.datang.bingxiang.run;

import com.datang.bingxiang.run.intr.Executor;

public class NewsThreadExecutor implements Executor {
    //每次調用都創建一個新的線程
    @Override
    public void execute(Runnable r) {
        new Thread(r).start();
    }

}
View Code

這個實現類最簡單也最明白,真的每次調用我們都創建一個Thread將參數Runnable執行。這么做的弊端就是每個調用者發布一個任務都需要創建一個新的線程,線程使用后就被銷毀了,對內存造成了很大的浪費。

SingThreadExecutor

package com.datang.bingxiang.run;

import java.util.concurrent.ArrayBlockingQueue;

import com.datang.bingxiang.run.intr.Executor;

//只有一個線程,在實例化后就啟動線程。用戶調用execute()傳遞的Runnable會添加到隊列中。
//隊列有一個固定的容量3,如果隊列滿則拋棄任務。
//線程的run方法不停的循環,從隊列里取Runnable然后執行其run()方法。
public class SingThreadExecutor implements Executor {

    // ArrayBlockingQueue 數組類型的有界隊列
    // LinkedBlockingDeque 鏈表類型的有界雙端隊列
    // LinkedBlockingQueue 鏈表類型的有界單向隊列
    private ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1);

    //線程不停的從隊列獲取任務
    private Thread worker = new Thread(() -> {
        while (true) {
            try {
                //take會在獲取不到任務時阻塞。並且也有Lock鎖
                Runnable r = queue.take();
                r.run();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    });

    // 構造函數啟動線程
    public SingThreadExecutor() {
        worker.start();
    }

    @Override
    public void execute(Runnable r) {
        // 這個offer和add不同的是offer有Lock鎖,如果隊列滿則返回false。
        // add則是隊列滿拋出異常,並且沒有Lock鎖。
        if (!queue.offer(r)) {
            System.out.println("線程等待隊列已滿,不可加入。本次任務丟棄!");
        }
    }

}
View Code

改變下思路,這次線程池實現類只創建一個線程,調用者發布的任務都存放到一個隊列中(隊列符合先進先出的需求)但是注意我們設計線程池一定要選擇有界隊列,因為我們不能無限制的往隊列中添加任務。在隊列滿后,在進來的任務就要被拒絕掉。ArrayBlockingQueue

是一個底層有數組實現的有界阻塞隊列,實例化一個ArrayBlockingQueue傳遞參數為1,表示隊列長度最大為1.唯一的一個工作線程也是成員變量,線程執行后不斷的自旋從隊列中獲取任務,take()方法將隊列頭的元素出隊,若隊列為空則阻塞,這個方法是線程安全的。

execute(r)方法接收到任務后,將任務添加到隊列中,offer()方法將元素添加到隊列若隊列已滿則返回false。execute(r)則直接拒絕掉本次任務。

CorePollThreadExecutor

SingThreadExecutor線程池的缺點是只有一個工作線程,這樣顯然是不夠靈活,CorePollThreadExecutor中增加了corePollSize核心線程數參數,由用戶規定有需要幾個工作線程。這次我們選用的隊列為LinkedBlockingQueue這是一個數據結構為鏈表的有界阻塞單向隊列。

initThread()方法根據corePollSize循環創建N個線程,線程創建后同樣調用take()方法從阻塞隊列中獲取元素,若獲取成功則執行Runnable的run()方法,若獲取隊列中沒有元素則阻塞。execute(r)則還是負責將任務添加到隊列中。

 

CountCorePollThreadExecutor

CorePollThreadExecutor中有三個問題

1 當隊列滿時線程池直接拒絕了任務,這應該讓用戶決定被拒絕的任務如何處理。

2 線程的創建策略也應該交給用戶做處理。

3 初始化后就創建了N個核心線程數,但是這些線程可能會用不到而造成浪費。

RejectedExecutionHandler接口的實現應該讓用戶決定如何處理隊列滿的異常情況。

package com.datang.bingxiang.run.intr;

public interface RejectedExecutionHandler {
    public void rejectedExecution();
}
View Code
package com.datang.bingxiang.run;

import com.datang.bingxiang.run.intr.RejectedExecutionHandler;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution() {
        System.out.println("隊列已經滿了!!!!當前task被拒絕");
    }

}
View Code

ThreadFactory接口的實現應該讓用戶決定創建線程的方法。

package com.datang.bingxiang.run.intr;

public interface ThreadFactory {
    public Thread newThread(Runnable r);
}
View Code
package com.datang.bingxiang.run;

import com.datang.bingxiang.run.intr.ThreadFactory;

public class CustomThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        System.out.println("創建了新的核心線程");
        return new Thread(r);
    }

}
View Code

CountCorePollThreadExecutor的構造函數接收三個參數corePollSize,rejectedExecutionHandler,threadFactory。因為現在我們需要按需創建核心線程,所以需要一個變量workCount記錄當前已經創建的工作線程,為了保證線程之間拿到的workCount是最新的(可見性),我們需要給變量workCount加上volatile修飾,保證改變了的修改能被所有線程看到。execute(r)首先要調用initThread(r)判斷是否有線程被創建,如果沒有線程創建則表示工作線程數已經和核心線程數相同了,此時需要將新的任務添加到隊列中,如果隊列滿,則執行傳入的拒絕策略。重要的方法在於initThread(r)。initThread(r)方法返回true表示有工作線程被創建任務將被工作線程直接執行,無需入隊列。返回false則將任務入隊,隊列滿則執行拒絕策略。

fill變量表示核心線程數是否全部創建,為了保證多線程的環境下不會創建多於corePoolSize個數的線程,所以需要使用同步鎖,initThread(r)都要使用鎖則會降低效率,尤其是當工作線程數已經到達核心線程數后,所以這一塊代碼使用到了雙重判斷,當加鎖后在此判斷工作線程是否已滿。如果已滿返回false。接下來使用threadFactory工廠創建線程,在線程中使用代碼塊,保證當前任務可以被新創建的工作線程執行。新的工作線程依然是從隊列中獲取任務並執行。線程開啟后工作線程++,如果工作線程數等於核心線程數則改變fill標記。返回true,成功創建線程,不要忘記在finally中釋放鎖。

package com.datang.bingxiang.run;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.datang.bingxiang.run.intr.Executor;
import com.datang.bingxiang.run.intr.RejectedExecutionHandler;
import com.datang.bingxiang.run.intr.ThreadFactory;

public class CountCorePollThreadExecutor implements Executor {

    // 核心線程數
    private Integer corePollSize;

    // 工作線程數,也就是線程實例的數量
    private volatile Integer workCount = 0;

    // 線程是否已滿
    private volatile boolean fill = false;

    // 拒絕策略,由調用者傳入,當隊列滿時,執行自定義策略
    private RejectedExecutionHandler rejectedExecutionHandler;

    // 線程工廠,由調用者傳入
    private ThreadFactory threadFactory;

    public CountCorePollThreadExecutor(Integer corePollSize, RejectedExecutionHandler rejectedExecutionHandler,
            ThreadFactory threadFactory) {
        this.corePollSize = corePollSize;
        this.rejectedExecutionHandler = rejectedExecutionHandler;
        this.threadFactory = threadFactory;
    }

    // 這次使用鏈表類型的單向隊列
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1);

    @Override
    public void execute(Runnable r) {
        // 如果沒有創建線程
        if (!initThread(r)) {
            // offer和ArrayBlockingQueue的offer相同的作用
            if (!queue.offer(r)) {
                rejectedExecutionHandler.rejectedExecution();
            }
        }

    }

    // 同步鎖,因為判斷核心線程數和工作線程數的操作需要線程安全
    Lock lock = new ReentrantLock();

    public boolean initThread(Runnable r) {
        // 如果工作線程沒有創建滿則需要創建。
        if (!fill) {
            try {
                lock.lock();// 把鎖 加在判斷里邊是為了不讓每次initThread方法執行時都加鎖
                // 此處進行雙重判斷,因為可能因為多線程原因多個線程都判斷工作線程沒有創建滿,但是不要緊
                // 只有一個線程可以進來,如果后續線程二次判斷已經滿了就直接返回。
                if (fill) {
                    return false;
                }
                Thread newThread = threadFactory.newThread(() -> {
                    // 因為線程是由任務觸發創建的,所以先把觸發線程創建的任務執行掉。
                    {
                        r.run();
                    }

                    while (true) {
                        // 然后該線程則不停的從隊列中獲取任務
                        try {
                            Runnable task = queue.take();
                            task.run();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
                newThread.start();
                // 工作線程數+1
                workCount++;
                // 如果工作線程數已經與核心線程數相等,則不可創建
                if (workCount == corePollSize) {
                    fill = true;
                }
                return true;
            } finally {
                lock.unlock();// 釋放鎖
            }
        } else {
            // 工作線程已滿則不創建
            return false;
        }

    }

}
View Code

ThreadPoolExecutor

最后考慮下,當工作線程數到達核心線程數后,隊列也滿了以后,任務就被拒絕了。能不能想個辦法,當工作線程滿后,多增加幾個線程工作,當任務不多時在將擴展的線程銷毀。ThreadPoolExecutor的構造函數中新增三個參數maximumPoolSize最大線程數keepAliveTime空閑時間,unit空閑時間的單位。

和CountCorePollThreadExecutor相比較在流程上講我們只需要在隊列滿時判斷工作線程是否和最大線程數相等,如果不相等則創建備用線程,並且在備用線程長時間不工作時需要銷毀掉工作線程。create()方法雙重判斷workCount==maximumPoolSize如果已經相等表示已經不能創建線程了,此時只能執行拒絕策略。否則創建備用線程,備用線程創建后自旋的執行poll(l,u)方法,該方法也是取出隊列頭元素,和take()不同的是,poll如果一段時間后仍然從隊列中拿不到元素(隊列為空)則返回null,此時我們需要將該備用線程銷毀。在創建線程后將workCount++。此外需要注意,因為當前隊列滿了,所以才會創建備用線程所以不要將當前的任務給忘了,LinkedBlockingQueue的put(r)方法會阻塞的添加元素,直到添加成功。最后 stop()判讀如果workCount>corePollSize則在線程安全的環境下將線程停止,並且將workCount--。

package com.datang.bingxiang.run;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.datang.bingxiang.run.intr.Executor;
import com.datang.bingxiang.run.intr.RejectedExecutionHandler;
import com.datang.bingxiang.run.intr.ThreadFactory;

public class ThreadPoolExecutor implements Executor {

    // 核心線程數
    private Integer corePollSize;

    // 工作線程數,也就是線程實例的數量
    private Integer workCount = 0;

    // 當隊列滿時,需要創建新的Thread,maximumPoolSize為最大線程數
    private Integer maximumPoolSize;

    // 當任務不多時,需要刪除多余的線程,keepAliveTime為空閑時間
    private long keepAliveTime;

    // unit為空閑時間的單位
    private TimeUnit unit;

    // 線程是否已滿
    private boolean fill = false;

    // 拒絕策略,由調用者傳入,當隊列滿時,執行自定義策略
    private RejectedExecutionHandler rejectedExecutionHandler;

    // 線程工廠,由調用者傳入
    private ThreadFactory threadFactory;

    // 這次使用鏈表類型的單向隊列
    BlockingQueue<Runnable> workQueue;

    public ThreadPoolExecutor(Integer corePollSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler rejectedExecutionHandler) {
        this.corePollSize = corePollSize;
        this.rejectedExecutionHandler = rejectedExecutionHandler;
        this.threadFactory = threadFactory;
        this.workQueue = workQueue;
        this.maximumPoolSize = maximumPoolSize;
        this.keepAliveTime = keepAliveTime;
        this.unit = unit;
    }

    @Override
    public void execute(Runnable r) {
        // 如果沒有創建線程
        if (!initThread(r)) {
            // offer和ArrayBlockingQueue的offer相同的作用
            if (!workQueue.offer(r)) {
                // 隊列滿了以后先不走拒絕策略而是查詢線程數是否到達最大線程數
                if (create()) {
                    Thread newThread = threadFactory.newThread(() -> {
                        while (true) {
                            // 然后該線程則不停的從隊列中獲取任務
                            try {
                                Runnable task = workQueue.poll(keepAliveTime, unit);
                                if (task == null) {
                                    stop();
                                } else {
                                    task.run();
                                }
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                    newThread.start();
                    // 工作線程數+1
                    workCount++;
                    // 增加線程后,還需要將本應該被拒絕的任務添加到隊列
                    try {
                        // 這個put()方法會在隊列滿時阻塞添加,直到添加成功
                        workQueue.put(r);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    rejectedExecutionHandler.rejectedExecution();
                }
            }
        }

    }

    Lock clock = new ReentrantLock();

    private boolean create() {
        //雙重檢查
        if (workCount == maximumPoolSize) {
            return false;
        }
        try {
            clock.lock();
            if (workCount < maximumPoolSize) {
                return true;
            } else {
                return false;
            }
        } finally {
            clock.unlock();
        }

    }

    Lock slock = new ReentrantLock();

    // 銷毀線程
    private void stop() {
        slock.lock();
        try {
            if (workCount > corePollSize) {
                System.out.println(Thread.currentThread().getName() + "線程被銷毀");
                workCount--;
                Thread.currentThread().stop();
            }
        } finally {
            slock.unlock();
        }

    }

    // 獲取當前的工作線程數
    public Integer getworkCount() {
        return workCount;
    }

    // 同步鎖,因為判斷核心線程數和工作線程數的操作需要線程安全
    Lock lock = new ReentrantLock();

    public boolean initThread(Runnable r) {
        // 如果工作線程沒有創建滿則需要創建。
        if (!fill) {
            try {
                lock.lock();// 把鎖 加在判斷里邊是為了不讓每次initThread方法執行時都加鎖
                // 此處進行雙重判斷,因為可能因為多線程原因多個線程都判斷工作線程沒有創建滿,但是不要緊
                // 只有一個線程可以進來,如果后續線程二次判斷已經滿了就直接返回。
                if (fill) {
                    return false;
                }
                Thread newThread = threadFactory.newThread(() -> {
                    // 因為線程是由任務觸發創建的,所以先把觸發線程創建的任務執行掉。
                    {
                        r.run();
                    }

                    while (true) {
                        // 然后該線程則不停的從隊列中獲取任務
                        try {
                            Runnable task = workQueue.take();
                            task.run();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                    }
                });
                newThread.start();
                // 工作線程數+1
                workCount++;
                // 如果工作線程數已經與核心線程數相等,則不可創建
                if (workCount == corePollSize) {
                    fill = true;
                }
                return true;
            } finally {
                lock.unlock();// 釋放鎖
            }
        } else {
            // 工作線程已滿則不創建
            return false;
        }
    }

}
View Code

 

 

測試代碼

package com.datang.bingxiang.run.test;

import java.time.LocalDateTime;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import com.datang.bingxiang.run.CorePollThreadExecutor;
import com.datang.bingxiang.run.CountCorePollThreadExecutor;
import com.datang.bingxiang.run.CustomRejectedExecutionHandler;
import com.datang.bingxiang.run.CustomThreadFactory;
import com.datang.bingxiang.run.NewsThreadExecutor;
import com.datang.bingxiang.run.SingThreadExecutor;
import com.datang.bingxiang.run.ThreadPoolExecutor;
import com.datang.bingxiang.run.intr.Executor;

@RestController
public class TestController {



    private int exe1Count = 1;
    Executor newsThreadExecutor = new NewsThreadExecutor();

    // 每次都創建新的線程執行
    @GetMapping(value = "exe1")
    public String exe1() {
        newsThreadExecutor.execute(() -> {
            System.out.println("正在執行" + exe1Count++);
        });
        return "success";
    }

    /*
     * 等待隊列長度為1,三個線程加入,第一個加入后會迅速的出隊列。剩下兩個只有一個可以成功 加入,另一個 則會被丟棄
     */
    private int exe2Count = 1;
    Executor singThreadExecutor = new SingThreadExecutor();

    @GetMapping(value = "exe2")
    public String exe2() {
        singThreadExecutor.execute(() -> {
            System.out.println("正在執行" + exe2Count++);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        return "success";
    }

    private int exe3Count = 1;
    Executor corePollThreadExecutor = new CorePollThreadExecutor(2);

    @GetMapping(value = "exe3")
    public String exe3() {
        corePollThreadExecutor.execute(() -> {
            System.out.println("正在執行" + exe3Count++);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        return "success";
    }

    private int exe4Count = 1;
    Executor countCorePollThreadExecutor = new CountCorePollThreadExecutor(2, new CustomRejectedExecutionHandler(),
            new CustomThreadFactory());

    @GetMapping(value = "exe4")
    public String exe4() {
        countCorePollThreadExecutor.execute(() -> {
            System.out.println("正在執行" + exe4Count++);
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        return "success";
    }

    // 第一次創建線程並執行 1
    // 第二次進入隊列 2
    // 第三次創建線程取出隊列中的2,將3添加到隊列
    // 第四次拒絕
    // 等待3秒后只剩下一個隊列
    private int exe5Count = 1;
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1), new CustomThreadFactory(), new CustomRejectedExecutionHandler());

    @GetMapping(value = "exe5")
    public String exe5() {
        threadPoolExecutor.execute(() -> {
            System.out.println("正在執行" + exe5Count++);
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        return "success";
    }

    @GetMapping(value = "workCount")
    public Integer getWorkCount() {
        return threadPoolExecutor.getworkCount();
    }
}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM