理解線程池,自己實現一個線程池


線程池本質是一個生產者-消費者模式,一邊維護一些線程執行任務,一邊由主線程添加一些任務。現在我們拋棄源碼中一些繁雜的狀態判斷,自己寫一個線程池。

public class poolT {
   //可能頻繁增刪任務,鏈表隊列效率較高
private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); private final HashSet<Work> workers = new HashSet<Work>(); private static int num = 3; public poolT(int num) { this.num = num; for (int i = 0; i < num; i++) { Work w = new Work(); w.start(); workers.add(w); } } public void addWork(Runnable r) { workQueue.add(r); } public void close() throws Exception { while (!workQueue.isEmpty()) { Thread.sleep(500); } for (Work work : workers) { // 通知正在運行的結束 work.setDrop(); // 強制結束還在等待的 if (work.getState() == Thread.State.WAITING) { work.interrupt(); } } Thread.sleep(2000); for (Work work : workers) { System.out.println(work.getName() + "狀態:" + work.getState()); } } // 內部線程封裝 private class Work extends Thread { Runnable r = null; // 結束線程標志位 private boolean hasRunning = true; public void setDrop() { this.hasRunning = false; } public void run() { try { while (hasRunning || !workQueue.isEmpty()) { // 阻塞線程執行 r = workQueue.take(); if (r != null) { r.run(); } } } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { poolT p = new poolT(4); for (int i = 0; i < 2; i++) { Runnable newRun = new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "運行任務;"); } catch (InterruptedException e) { e.printStackTrace(); } } }; p.addWork(newRun); } p.close(); System.out.println("主程序完畢"); } }

這里面我使用了一個阻塞隊列,當任務添加時,由隊列隨機選取一個空閑線程進行處理,沒有任務時,進行阻塞。

當然也可以不用阻塞隊列,不過需要自己進行同步

public class MyThreadPool {

    List<Runnable> taskList = new LinkedList<Runnable>();

    private List<MyThread> threadList = new LinkedList<MyThread>();

    private static MyThreadPool threadPool;

    public MyThreadPool(int num) {
        for (int i = 0; i < num; i++) {
            threadList.add(new MyThread());
        }
        for (MyThread thread : threadList) {
            thread.start();
        }
    }

      public void destroy() {  
            while (!taskList.isEmpty()) {// 如果還有任務沒執行完成,就先睡會吧  
                try {  
                    Thread.sleep(10);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            // 工作線程停止工作,且置為null  
            for (MyThread thread : threadList) {
                thread.setDistroy();
            }
        }  
      
    public void execute(Runnable run) {

        synchronized (taskList) {
            taskList.add(run);
            taskList.notify();
        }
    }

    private class MyThread extends Thread {
        public boolean hasRun = true;

        private void setDistroy() {
            this.hasRun = false;
        }

        @Override
        public void run() {
            while (hasRun) {
                Runnable r = null;
                System.out.println(Thread.currentThread().getName() + "is running");
                synchronized (taskList) {
                    if (taskList.isEmpty() && hasRun) {
                        try {
                            taskList.wait(20);
                        } catch (InterruptedException e) {

                            e.printStackTrace();
                        }
                    } else {
                        r = taskList.remove(0);
                    }
                }
                if (r != null) {
                    r.run();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
//         ExecutorService excutor=Executors.newFixedThreadPool(3);
        MyThreadPool pool =new MyThreadPool(4);
        pool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(500);
                    System.out.println("任務一");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        });
        pool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(500);
                    System.out.println("任務貳");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        });

        System.out.println("End");
        pool.destroy();
    }
}

 參考:http://blog.csdn.net/hsuxu/article/details/8985931


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM