實現簡單的線程池


什么是線程池

  線程池就是以一個或多個線程[循環執行]多個應用邏輯的線程集合.

線程池的作用:

  線程池作用就是限制系統中執行線程的數量。

  根據系統的環境情況,可以自動或手動設置線程數量,達到運行的最佳效果;少了浪費了系統資源,多了造成系統擁擠效率不高。用線程池控制線程數量,其他線程排隊等候。一個任務執行完畢,再從隊列的中取最前面的任務開始執行。若隊列中沒有等待進程,線程池的這一資源處於等待。當一個新任務需要運行時,如果線程池中有等待的工作線程,就可以開始運行了;否則進入等待隊列。

線程池接口:

/**
 * 線程池方法定義
 */
public interface ThreadPools<Job extends Runnable>{

    /**
     * 執行一個任務(Job),這個Job必須實現Runnable
     * @param job
     */
    public void execute(Job job);

    /**
     * 關閉線程池
     */
    public void shutdown();

    /**
     * 增加工作者線程,即用來執行任務的線程
     * @param num
     */
    public void addWorkers(int num);

    /**
     * 減少工作者線程
     * @param num
     */
    public void removeWorker(int num);

    /**
     * 獲取正在等待執行的任務數量
     */
    public int getJobSize();
}

  客戶端可以通過execute(Job)方法將Job提交入線程池來執行,客戶端完全不用等待Job的執行完成。除了execute(Job)方法以外,線程池接口提供了增加/減少工作者線程以及關閉線程池的方法。每個客戶端提交的Job都會進入到一個工作隊列中等待工作者線程的處理。

 

線程池默認實現

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 線程池
 * @param <Job>
 */
public class DefaultThreadPool<Job extends Runnable> implements ThreadPools<Job>{
    /**
     * 線程池維護工作者線程的最大數量
     */
    private static final int MAX_WORKER_NUMBERS=30;

    /**
     * 線程池維護工作者線程的最默認工作數量
     */
    private static final int DEFAULT_WORKER_NUMBERS = 5;

    /**
     * 線程池維護工作者線程的最小數量
     */
    private static final int MIN_WORKER_NUMBERS = 1;

    /**
     * 維護一個工作列表,里面加入客戶端發起的工作
     */
    private final LinkedList<Job> jobs = new LinkedList<Job>();

    /**
     * 工作者線程的列表
     */
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());

    /**
     * 工作者線程的數量
     */
    private int workerNum;
    /**
     *每個工作者線程編號生成
     */
    private AtomicLong threadNum = new AtomicLong();

    /**
     * 第一步:構造函數,用於初始化線程池
     * 首先判斷初始化線程池的線程個數是否大於最大線程數,如果大於則線程池的默認初始化值為 DEFAULT_WORKER_NUMBERS
     */
    public DefaultThreadPool(int num){
        if (num > MAX_WORKER_NUMBERS) {
            this.workerNum =DEFAULT_WORKER_NUMBERS;
        } else {
            this.workerNum = num;
        }
        initializeWorkers(workerNum);
    }

    /**
     * 初始化每個工作者線程
     */
    private void initializeWorkers(int num) {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            //添加到工作者線程的列表
            workers.add(worker);
            //啟動工作者線程
            Thread thread = new Thread(worker);
            thread.start();
        }
    }

    /**
     * 執行一個任務(Job),這個Job必須實現Runnable
     * @param job
     */
    @Override
    public void execute(Job job) {
        //如果job為null,拋出空指針
        if (job==null){
            throw new NullPointerException();
        }
        //這里進行執行 TODO 當供大於求時候,考慮如何臨時添加線程數
        if (job != null) {
            //根據線程的"等待/通知機制"這里必須對jobs加鎖
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }

    }

    /**
     * 關閉線程池
     */
    @Override
    public void shutdown() {
        for (Worker worker:workers) {
            worker.shutdown();
        }
    }

    /**
     * 增加工作者線程,即用來執行任務的線程
     * @param num
     */
    @Override
    public void addWorkers(int num) {
        //加鎖,防止該線程還沒增加完成而下個線程繼續增加導致工作者線程超過最大值
        synchronized (jobs) {
            if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWorkers(num);
            this.workerNum += num;
        }
    }

    /**
     * 減少工作者線程
     * @param num
     */
    @Override
    public void removeWorker(int num) {
        synchronized (jobs) {
            if(num>=this.workerNum){
                throw new IllegalArgumentException("超過了已有的線程數量");
            }
            for (int i = 0; i < num; i++) {
                Worker worker = workers.get(i);
                if (worker != null) {
                    //關閉該線程並從列表中移除
                    worker.shutdown();
                    workers.remove(i);
                }
            }
            this.workerNum -= num;
        }

    }

    /**
     * 獲取正在等待執行的任務數量
     */
    @Override
    public int getJobSize() {
        return workers.size();
    }

    /**
     * 消費者
     */
    class Worker implements Runnable {
        // 表示是否運行該worker
        private volatile boolean running = true;

        @Override
        public void run() {
            while (running) { //這個工作線程就一直循環,不停的檢測是否還有任務區執行
                Job job = null;
                //線程的等待/通知機制
                synchronized (jobs) {
                    if (jobs.isEmpty()) {//工作隊列為空
                        try {
                            jobs.wait();//線程等待喚醒
                        } catch (InterruptedException e) {
                            //感知到外部對該線程的中斷操作,返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    // 取出一個job
                    job = jobs.removeFirst();
                }
                //執行job
                if (job != null) {
                    job.run();
                }
            }
        }

        /**
         * 終止該線程
         */
        public void shutdown() {
            running = false;
        }
    }

}

  從線程池的實現中可以看出,當客戶端調用execute(Job)方法時,會不斷地向任務列表jobs中添加Job,而每個工作者線程會不讀的從jobs上獲取Job來執行,當jobs為空時,工作者線程進入WAITING狀態。

  當添加一個Job后,對工作隊列jobs調用其notify()方法來喚醒一個工作者線程。此處我們不調用notifyAll(),避免將等待隊列中的線程全部移動到阻塞隊列中而造成資源浪費。

  線程池的本質就是使用了一個線程安全的工作隊列連接工作者線程和客戶端線程。客戶端線程把任務放入工作隊列后便返回,而工作者線程則不端的從工作隊列中取出工作並執行。當工作隊列為空時,工作者線程進入WAITING狀態,當有客戶端發送任務過來后會通過任意一個工作者線程,隨着大量任務的提交,更多的工作者線程被喚醒。

Job實現

public class Job implements Runnable{

    @Override
    public void run() {
        try {
            Thread.sleep(2500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("當前線程名稱:"+Thread.currentThread().getName()+";"+"job被指執行了");
    }
}

測試程序

public class WorkTest {
    public static void main(String[] args) {
        DefaultThreadPool defaultThreadPool = new DefaultThreadPool(10);
        for (int i=0;i<10000;i++){
            if (i==30){
                defaultThreadPool.addWorkers(10);
            }
            Job job = new Job();
            defaultThreadPool.execute(job);
        }
    }
}

console結果:

   

 

 

使用Executors工具類創建線程池:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 運行結果:總共只會創建5個線程, 開始執行五個線程,
 * 當五個線程都處於活動狀態,再次提交的任務都會加入隊列等到其他線程運行結束,當線程處於空閑狀態時會被下一個任務復用
 *
 */
public class newFixedThreadPoolTest {
    public static void main(String[] args) {
        //Executors工廠類創建一個可重用固定線程數的線程池,以共享的無界隊列方式來運行這些線程
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for(int i = 0; i < 20; i++) {
            Runnable synRunnable = new Runnable() {
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            };
            executorService.execute(synRunnable);
        }
    }
}

console結果:

    

 

 

出處:https://www.jianshu.com/p/d5d7035b6f26

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM