自己實現簡易線程池


使用線程池的原因

服務器創建和銷毀線程要花費時間和系統資源,每個線程本身都會占用一定的內存(200多K)

系統不可能無限的創建線程 ,線程數太多,cpu在切換線程時候,會影響效率

使用線程池的好處:提高效率和復用

 

線程池

任務隊列:可以往里面添加任務

拒絕策略:向任務隊列提交的任務大於某一個數量限制時,可以拒絕接收提交的任務,或者阻塞

線程集合:包含所有的線程

線程數量控制:可以根據任務隊列中任務數量,控制線程集合中線程的數量

 

public class MyThreadPool extends Thread {

    private static volatile int seq = 0; //線程序號

    private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>(); //任務隊列

    private final static List<Worker> THREAD_COLLECTION = new ArrayList<>();   //線程集合

    private int size; //當前線程數量

    private int min;  //最小線程數量

    private int corePoolSize; //核心線程數量
    
    private int max;  //最大線程數量

    private final int queueSize;  // 任務隊列大小

    private final DiscardPolicy discardPolicy;
    
    private volatile boolean destroy = false;
    
    private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group");
    
    public MyThreadPool() {
        this(4, 8, 12, 100, () -> {
            throw new DiscardException("Discard This Runnable.");
        });
    }

    public MyThreadPool(int min, int corePoolSize, int max, int queueSize, DiscardPolicy discardPolicy) {
        this.min = min;
        this.corePoolSize = corePoolSize;
        this.max = max;
        this.queueSize = queueSize;
        this.discardPolicy = discardPolicy;
        init();
    }

    private void init() {
        for (int i = 0; i < this.min; i++) {
            createWorker();
        }
        this.size = min;
        this.start();
    }

    //提交任務
    public void submit(Runnable runnable) {
        if (destroy)
            throw new IllegalStateException("The thread pool already destroy and not allow submit task.");

        synchronized (TASK_QUEUE) {
            if (TASK_QUEUE.size() > queueSize)
                discardPolicy.discard();
            TASK_QUEUE.addLast(runnable);
            TASK_QUEUE.notifyAll();
        }
    }

    @Override
    public void run() {
        while (!destroy) {
            System.out.printf("Pool#Min:%d,corePoolSize:%d,Max:%d,Current:%d,QueueSize:%d\n",
                    this.min, this.corePoolSize, this.max, this.size, TASK_QUEUE.size());
            try {
                //當任務隊列中的任務個數過多時,增加線程數
                Thread.sleep(5_000L);
                if (TASK_QUEUE.size() > corePoolSize && size < corePoolSize) {
                    for (int i = size; i < corePoolSize; i++) {
                        createWorker();
                    }
                    System.out.println("The pool has incremented to corePoolSize.");
                    size = corePoolSize;
                } else if (TASK_QUEUE.size() > max && size < max) {
                    for (int i = size; i < max; i++) {
                        createWorker();
                    }
                    System.out.println("The pool has incremented to max.");
                    size = max;
                }
                //任務隊列為空,將超過核心線程數量的線程關閉
                synchronized (THREAD_COLLECTION) {
                    if (TASK_QUEUE.isEmpty() && size > corePoolSize) {
                        System.out.println("=========Reduce========");
                        int releaseSize = size - corePoolSize;
                        for (Iterator<Worker> it = THREAD_COLLECTION.iterator(); it.hasNext(); ) {
                            if (releaseSize <= 0)
                                break;
                            Worker worker = it.next();
                            if(worker.getTaskState() == TaskState.BLOCKED) {
                                worker.close();
                                worker.interrupt();
                                releaseSize--;
                            }
                            it.remove();
                        }
                        size = corePoolSize;
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void createWorker() {
        Worker worker = new Worker(GROUP, "THREAD_POOL-" + (++seq));
        worker.start();
        THREAD_COLLECTION.add(worker);
    }
    
    //線程的包裝類
    private static class Worker extends Thread {

        private volatile TaskState taskState = TaskState.FREE;

        public Worker(ThreadGroup group, String name) {
            super(group, name);
        }

        public TaskState getTaskState() {
            return this.taskState;
        }

        public void run() {
            OUTER:
            while (this.taskState != TaskState.DEAD) {
                Runnable runnable;
                synchronized (TASK_QUEUE) {
                    while (TASK_QUEUE.isEmpty()) {
                        try {
                            taskState = TaskState.BLOCKED;
                            TASK_QUEUE.wait();
                        } catch (InterruptedException e) {
                            System.out.println("Closed.");
                            break OUTER;
                        }
                    }
                    runnable = TASK_QUEUE.removeFirst();
                }

                if (runnable != null) {
                    taskState = TaskState.RUNNING;
                    runnable.run();
                    taskState = TaskState.FREE;
                }
            }
        }

        public void close() {
            this.taskState = TaskState.DEAD;
        }
    }
    
    //自定義狀態
    private enum TaskState {
        FREE, RUNNING, BLOCKED, DEAD
    }
    
    //拒絕策略拋出異常
    public static class DiscardException extends RuntimeException {

        private static final long serialVersionUID = 1L;

        public DiscardException(String message) {
            super(message);
        }
    }
    //拒絕策略
    public interface DiscardPolicy {

        void discard() throws DiscardException;
    }
    
    //關閉線程池
    public void shutdown() throws InterruptedException {
        while (!TASK_QUEUE.isEmpty()) { //任務隊列不為空,繼續等待
            Thread.sleep(50);
        }
        synchronized (THREAD_COLLECTION) {
            int initVal = THREAD_COLLECTION.size();
            while (initVal > 0) {
                for (Worker worker : THREAD_COLLECTION) {
                    if (worker.getTaskState() == TaskState.BLOCKED) { //如果是blocked狀態,直接關閉
                        worker.close();
                        worker.interrupt();
                        initVal--;
                    } else {   //其他狀態,就等待10毫秒
                        Thread.sleep(10);
                    }
                }
            }
        }
        this.destroy = true;
        System.out.println("The thread pool is shutdown");
    }


    public static void main(String[] args) throws InterruptedException {
        MyThreadPool threadPool = new MyThreadPool();
        for (int i = 1; i <= 40; i++) {
            threadPool.submit(new MyRunnable(i));
        }

        Thread.sleep(40000);
        threadPool.shutdown();

    }
    
    public static class MyRunnable  implements Runnable {
        int index ;
        public  MyRunnable(int index) {
            this.index = index;
        }
        @Override
        public void run() {
            System.out.println("The runnable "+ index +" be serviced by " + Thread.currentThread() + " start.");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("The runnable "+ index +" be serviced by " + Thread.currentThread() + " finished.");
        }
        
    }
    
    
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM