線程池本質是一個生產者-消費者模式,一邊維護一些線程執行任務,一邊由主線程添加一些任務。現在我們拋棄源碼中一些繁雜的狀態判斷,自己寫一個線程池。
public class poolT {
//可能頻繁增刪任務,鏈表隊列效率較高 private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); private final HashSet<Work> workers = new HashSet<Work>(); private static int num = 3; public poolT(int num) { this.num = num; for (int i = 0; i < num; i++) { Work w = new Work(); w.start(); workers.add(w); } } public void addWork(Runnable r) { workQueue.add(r); } public void close() throws Exception { while (!workQueue.isEmpty()) { Thread.sleep(500); } for (Work work : workers) { // 通知正在運行的結束 work.setDrop(); // 強制結束還在等待的 if (work.getState() == Thread.State.WAITING) { work.interrupt(); } } Thread.sleep(2000); for (Work work : workers) { System.out.println(work.getName() + "狀態:" + work.getState()); } } // 內部線程封裝 private class Work extends Thread { Runnable r = null; // 結束線程標志位 private boolean hasRunning = true; public void setDrop() { this.hasRunning = false; } public void run() { try { while (hasRunning || !workQueue.isEmpty()) { // 阻塞線程執行 r = workQueue.take(); if (r != null) { r.run(); } } } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { poolT p = new poolT(4); for (int i = 0; i < 2; i++) { Runnable newRun = new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "運行任務;"); } catch (InterruptedException e) { e.printStackTrace(); } } }; p.addWork(newRun); } p.close(); System.out.println("主程序完畢"); } }
這里面我使用了一個阻塞隊列,當任務添加時,由隊列隨機選取一個空閑線程進行處理,沒有任務時,進行阻塞。
當然也可以不用阻塞隊列,不過需要自己進行同步
public class MyThreadPool { List<Runnable> taskList = new LinkedList<Runnable>(); private List<MyThread> threadList = new LinkedList<MyThread>(); private static MyThreadPool threadPool; public MyThreadPool(int num) { for (int i = 0; i < num; i++) { threadList.add(new MyThread()); } for (MyThread thread : threadList) { thread.start(); } } public void destroy() { while (!taskList.isEmpty()) {// 如果還有任務沒執行完成,就先睡會吧 try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } // 工作線程停止工作,且置為null for (MyThread thread : threadList) { thread.setDistroy(); } } public void execute(Runnable run) { synchronized (taskList) { taskList.add(run); taskList.notify(); } } private class MyThread extends Thread { public boolean hasRun = true; private void setDistroy() { this.hasRun = false; } @Override public void run() { while (hasRun) { Runnable r = null; System.out.println(Thread.currentThread().getName() + "is running"); synchronized (taskList) { if (taskList.isEmpty() && hasRun) { try { taskList.wait(20); } catch (InterruptedException e) { e.printStackTrace(); } } else { r = taskList.remove(0); } } if (r != null) { r.run(); } } } } public static void main(String[] args) throws Exception { // ExecutorService excutor=Executors.newFixedThreadPool(3); MyThreadPool pool =new MyThreadPool(4); pool.execute(new Runnable() { @Override public void run() { try { Thread.sleep(500); System.out.println("任務一"); } catch (InterruptedException e) { e.printStackTrace(); } } }); pool.execute(new Runnable() { @Override public void run() { try { Thread.sleep(500); System.out.println("任務貳"); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println("End"); pool.destroy(); } }
參考:http://blog.csdn.net/hsuxu/article/details/8985931