眾所周知,開啟線程2種方法:第一是實現Runable接口,第二繼承Thread類。(當然內部類也算...)常用的,這里就不再贅述。
一、線程池
1.newCachedThreadPool
(1)緩存型池子,先查看池中有沒有以前建立的線程,如果有,就reuse,如果沒有,就建立一個新的線程加入池中;
(2)緩存型池子,通常用於執行一些生存周期很短的異步型任務;因此一些面向連接的daemon型server中用得不多;
(3)能reuse的線程,必須是timeout IDLE內的池中線程,缺省timeout是60s,超過這個IDLE時長,線程實例將被終止及移出池。
(4)注意,放入CachedThreadPool的線程不必擔心其結束,超過TIMEOUT不活動,其會自動被終止
2.newFixedThreadPool--本人常用
(1)newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的線程
(2)其獨特之處:任意時間點,最多只能有固定數目的活動線程存在,此時如果有新的線程要建立,只能放在另外的隊列中等待,直到當前的線程中某個線程終止直接被移出池子
(3)和cacheThreadPool不同,FixedThreadPool沒有IDLE機制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的TCP或UDP IDLE機制之類的),所以FixedThreadPool多數針對一些很穩定很固定的正規並發線程,多用於服務器
(4)從方法的源代碼看,cache池和fixed 池調用的是同一個底層池,只不過參數不同:
fixed池線程數固定,並且是0秒IDLE(無IDLE)
cache池線程數支持0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE
3.ScheduledThreadPool
(1)調度型線程池
(2)這個池子里的線程可以按schedule依次delay執行,或周期執行
4.SingleThreadExecutor
(1)單例線程,任意時間池中只能有一個線程
(2)用的是和cache池和fixed池相同的底層池,但線程數目是1-1,0秒IDLE(無IDLE)
二、常用線程調度類
1.wait、notify、notifyAll-----不建議新手直接使用
顧名思義,wait是等待,notify是通知一個等待線程、notifyAll喚醒所有等待線程。
2.CountDownLatch----很適合用來將一個任務分為n個獨立的部分,等這些部分都完成后繼續接下來的任務
隸屬於java.util.concurrent包。CountDownLatch類是一個同步計數器,構造時傳入int參數,該參數就是計數器的初始值,每調用一次countDown()方法,計數器減1,計數器大於0 時,await()方法會阻塞程序繼續執行.當多個線程達到預期時(latch.countDown()),喚醒多個其他等待中的線程,即執行latch.await()后面的代碼。樣例是,張三、李四合作完成任務,張三5秒,李四8秒,當張三李四都完成后,總任務結束。代碼如下:
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch; public class CountDownLatchDemo { final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) throws InterruptedException { CountDownLatch latch=new CountDownLatch(2);//兩個工人的協作 Worker worker1=new Worker("張三", 5000, latch); Worker worker2=new Worker("李四", 8000, latch); worker1.start(); worker2.start(); latch.await();//阻塞!等待所有工人完成工作 System.out.println("all work done at "+sdf.format(new Date())); } static class Worker extends Thread{ String workerName; int workTime; CountDownLatch latch; public Worker(String workerName ,int workTime ,CountDownLatch latch){ this.workerName=workerName; this.workTime=workTime; this.latch=latch; } public void run(){ System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date())); doWork();//工作了 System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date())); latch.countDown();//工人完成工作,計數器減一 } private void doWork(){ try { Thread.sleep(workTime); } catch (InterruptedException e) { e.printStackTrace(); } } }
-----------------------------------------------
Worker 李四 do work begin at 2016-11-02 18:25:28
Worker 張三 do work begin at 2016-11-02 18:25:28
Worker 張三 do work complete at 2016-11-02 18:25:33
Worker 李四 do work complete at 2016-11-02 18:25:36
all work done at 2016-11-02 18:25:36
測試可見,張三李四共同協作完成。
3.CyclicBarrier----適合多線程循環到達屏障后再執行
字面意思循環屏障,可理解為柵欄,協同多個線程都執行到barrier.await時,如果構造CyclicBarrier barrier=new CyclicBarrier(2, Runnable)時,第一個參數代碼線程數,如果有第二參Runnable,那么所有線程都await時,先執行Runnable,再各自執行await后續的代碼。
CyclicBarrier和CountDownLatch區別:
1.CountDownLatch在多個線程都執行完畢latch.countDown后喚醒await線程,多個countDown子線程在執行完countDown后可繼續執行后續代碼。
2.CyclicBarrier可循環使用,CountDownLatch只1次。見代碼示例:
3.CountDownLatch需要latch.countDown和latch.await()配合使用。CyclicBarrier就一個barrier.await。
下面舉例:鳥、魚2個線程同時運行問題。
1 package study.thread; 2 3 import java.util.concurrent.BrokenBarrierException; 4 import java.util.concurrent.CyclicBarrier; 5 6 /** 7 * 循環柵欄(屏障) 8 * 問題:一個池塘,有很多鳥和很多魚,鳥每分鍾產生一個后代,魚每30秒鍾產生2個后代。 9 * 鳥每10秒鍾要吃掉一條魚。建一個池塘,初始化一些魚和鳥,看看什么時候鳥把魚吃光。 10 * 11 */ 12 public class CyclicBarrierDemo { 13 14 long time ; 15 long birdNum ; 16 long fishNum ; 17 Object lock = new Object() ; 18 CyclicBarrier barrier ; 19 20 public CyclicBarrierDemo(long birdNum , long fishNum){ 21 this.birdNum = birdNum ; 22 this.fishNum = fishNum ; 23 } 24 25 /** 26 * 入口 27 * @param args 28 */ 29 public static void main(String[] args) { 30 //構造demo,初始化5只秒,20條魚 31 CyclicBarrierDemo bf = new CyclicBarrierDemo(5 , 20) ; 32 //生態圈開啟 33 bf.start(); 34 } 35 36 //生態圈開啟 37 public void start(){ 38 //構造魚,鳥,時間線 39 FishThread fish = new FishThread() ; 40 BirdThread bird = new BirdThread() ; 41 TimeLine tl = new TimeLine() ; 42 43 //初始化環形屏障,當barrier對象的await方法被調用兩次之后,將會執行tl線程 44 barrier = new CyclicBarrier(2, tl) ;//這里要注意第一個參數,如果大於調用await的線程數,會死鎖。 45 46 //魚、鳥動起來 47 fish.start(); 48 bird.start(); 49 50 } 51 52 public void printInfo(String source){ 53 System.out.printf(source+"time[%d]:birdNum[%d] ,fishNum[%d]\n" ,time , birdNum , fishNum); 54 } 55 56 private class TimeLine implements Runnable { 57 @Override 58 public void run() { //所有子任務都調用了await方法后,將會執行該方法, 然后所有子線程繼續執行 59 System.out.println("TimeLine start!"); 60 //如果魚數量<=0,結束程序 61 if(fishNum <= 0){ 62 System.exit(-1); 63 } 64 //時間加10秒 65 time += 10 ; 66 System.out.println("TimeLine end,時間加10秒!"); 67 } 68 } 69 70 private class FishThread extends Thread { 71 @Override 72 public void run() { 73 //循環 74 while(true){ 75 try { 76 System.out.println("魚已經就位!到達await!"); 77 barrier.await() ; //進入睡眠, 等待所有子任務都進入睡眠 然后再繼續 78 } catch (InterruptedException | BrokenBarrierException e) { 79 e.printStackTrace(); 80 } 81 synchronized (lock) { 82 //魚每30秒鍾產生2個后代 83 if(time % 30 == 0){ 84 fishNum += fishNum * 2; 85 printInfo("魚動作執行!"); 86 } 87 } 88 } 89 } 90 } 91 92 private class BirdThread extends Thread{ 93 @Override 94 public void run() { 95 //循環 96 while(true){ 97 try { 98 System.out.println("鳥已經就位!到達await!"); 99 barrier.await() ; //進入睡眠, 等待所有子任務都進入睡眠 然后再繼續 100 } catch (InterruptedException | BrokenBarrierException e) { 101 e.printStackTrace(); 102 } 103 synchronized (lock) { 104 //鳥每10秒鍾要吃掉一條魚 105 if(time % 10 == 0){ 106 fishNum = fishNum >= birdNum ? fishNum - birdNum : 0 ; 107 //鳥每分鍾產生一個后代 108 if(time % 60 == 0){ 109 birdNum += birdNum ; 110 } 111 printInfo("鳥動作執行!"); 112 } 113 } 114 115 } 116 117 } 118 119 } 120 121 }
4.Semaphore---通過控制操作系統的信號量數目來控制並發,比控制線程並發數粒度更細。
管理固定數值的信號量,用以控制並發的數量。把需要並發的代碼放在acquire、release之間即可。acquire獲取信號,release釋放信號。如果Semaphore管理一個信號量,就是互斥鎖。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreTest { public static void main(String[] args) { // 線程池 ExecutorService exec = Executors.newCachedThreadPool(); // 只能5個線程同時訪問 final Semaphore semp = new Semaphore(5); // 模擬20個客戶端訪問 for (int index = 0; index < 20; index++) { final int NO = index; Runnable run = new Runnable() { public void run() { try { //獲取許可 semp.acquire(); System.out.println("Accessing: " + NO); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally{ //釋放 semp.release();
System.out.println("-----------------"+semp.availablePermits()); } } }; exec.execute(run); } // 退出線程池 exec.shutdown(); } }
5.Exchanger
用於兩個線程之間進行數據交換,先執行exchanger.exchange()的線程等待后來的線程到達,然后交換數據,最后再繼續向下執行。
import java.util.ArrayList; import java.util.List; import java.util.concurrent.Exchanger; /** * * @ClassName: ExchangerDemo * @Description: 用於兩個線程之間進行數據交換,先執行exchanger.exchange()的線程等待后來的線程到達,然后交換數據,最后再繼續向下執行。 * @author denny.zhang * @date 2016年11月4日 下午1:27:29 * */ public class ExchangerDemo { public static void main(String[] args) { final Exchanger<List<Integer>> exchanger = new Exchanger<List<Integer>>(); new Thread(){ public void run(){ List<Integer> list = new ArrayList<Integer>(); list.add(1); list.add(2); try { list = exchanger.exchange(list); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread1"+list); } }.start(); new Thread(){ public void run(){ List<Integer> list = new ArrayList<Integer>(); list.add(3); list.add(4); try { list = exchanger.exchange(list); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread2"+list); } }.start(); } }
6.Future和FutrueTask---常用!
Future是接口,FutrueTask是接口實現類。場景:多線程並發執行,返回結果放進list.
import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * * @ClassName: FutureDemo * @Description: Future * @author denny.zhang * @date 2016年11月4日 下午1:50:32 * */ public class FutureDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { //結果集 List<Integer> list = new ArrayList<Integer>(); //開啟多線程 ExecutorService exs = Executors.newFixedThreadPool(3); List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); //啟動線程池,固定線程數為3 for(int i=0;i<3;i++){ //提交任務,添加返回 futureList.add(exs.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { return 1; } })); } //結果歸集 for (Future<Integer> future : futureList) { while (true) { if (future.isDone()&& !future.isCancelled()) { Integer i = future.get(); list.add(i); break; } else { Thread.sleep(100); } } } System.out.println("list="+list); } }
返回:list=[1, 1, 1]
====================================
參考:
《大型網站系統與java中間件實踐》