線程同步輔助類,主要學習兩點:
1、上述幾種同步輔助類的作用以及常用的方法
2、適用場景,如果有適當的場景可以用到,那無疑是最好的
semaphore(seməˌfôr)
含義
信號量就是可以聲明多把鎖(包括一把鎖:此時為互斥信號量)。
舉個例子:一個房間如果只能容納5個人,多出來的人必須在門外面等着。如何去做呢?一個解決辦法就是:房間外面掛着五把鑰匙,每進去一個人就取走一把鑰匙,沒有鑰匙的不能進入該房間而是在外面等待。每出來一個人就把鑰匙放回原處以方便別人再次進入。

常用方法
acquire():獲取信號量,信號量內部計數器減1
release():釋放信號量,信號量內部計數器加1
tryAcquire():這個方法試圖獲取信號量,如果能夠獲取返回true,否則返回false
信號量控制的線程數量在聲明時確定。例如:
Semphore s = new Semphore(2);
一個例子
實現一個功能:一個打印隊列,被三台打印機打印
public class PrintQueue { private Semaphore semaphore; private boolean freePrinters[]; private Lock lockPrinters; public PrintQueue(){ semaphore=new Semaphore(3); freePrinters=new boolean[3]; for (int i=0; i<3; i++){ freePrinters[i]=true; } lockPrinters=new ReentrantLock(); } public void printJob (Object document){ try { semaphore.acquire(); int assignedPrinter=getPrinter(); Long duration=(long)(Math.random()*10); System.out.printf("%s: PrintQueue: Printing a Job in Printer %d during %d seconds\n",Thread.currentThread().getName(),assignedPrinter,duration); TimeUnit.SECONDS.sleep(duration); freePrinters[assignedPrinter]=true; } catch (InterruptedException e) { e.printStackTrace(); } finally { // Free the semaphore semaphore.release(); } } private int getPrinter() { int ret=-1; try { lockPrinters.lock(); for (int i=0; i<freePrinters.length; i++) { if (freePrinters[i]){ ret=i; freePrinters[i]=false; break; } } } catch (Exception e) { e.printStackTrace(); } finally { lockPrinters.unlock(); } return ret; } }
聲明一個Job類,使用打印隊列
1 public class Job implements Runnable { 2 private PrintQueue printQueue; 3 4 public Job(PrintQueue printQueue){ 5 this.printQueue=printQueue; 6 } 7 8 @Override 9 public void run() { 10 System.out.printf("%s: Going to print a job\n",Thread.currentThread().getName()); 11 printQueue.printJob(new Object()); 12 System.out.printf("%s: The document has been printed\n",Thread.currentThread().getName()); 13 } 14 }
Main方法
public static void main (String args[]){ PrintQueue printQueue=new PrintQueue(); Thread thread[]=new Thread[12]; for (int i=0; i<12; i++){ thread[i]=new Thread(new Job(printQueue),"Thread "+i); } for (int i=0; i<12; i++){ thread[i].start(); } }
需要注意的地方
1、對於信號量聲明的臨界區,雖然可以控制線程訪問的數量,但是不能保證代碼塊之間是線程安全的。所以上面的例子在方法printJob()方法里面使用了鎖保證數據安全性。
2、信號量也涉及到公平性問題。和鎖公平性一樣,這里默認是非公平的。可以通過構造器顯示聲明鎖的公平性。
public Semaphore(int permits, boolean fair)
應用場景
流量控制,即控制能夠訪問的最大線程數。
CountDownLatch
含義
CountDownLatch可以理解為一個計數器在初始化時設置初始值,當一個線程需要等待某些操作先完成時,需要調用await()方法。這個方法讓線程進入休眠狀態直到等待的所有線程都執行完成。每調用一次countDown()方法內部計數器減1,直到計數器為0時喚醒。這個可以理解為特殊的CyclicBarrier。線程同步點比較特殊,為內部計數器值為0時開始。
方法
核心方法兩個:countDown()和await()
countDown():使CountDownLatch維護的內部計數器減1,每個被等待的線程完成的時候調用
await():線程在執行到CountDownLatch的時候會將此線程置於休眠
例子
開會的例子:會議室里等與會人員到齊了會議才能開始。
1 public class VideoConference implements Runnable{ 2 private final CountDownLatch controller; 3 4 public VideoConference(int number) { 5 controller=new CountDownLatch(number); 6 } 7 public void arrive(String name){ 8 System.out.printf("%s has arrived.\n",name); 9 10 controller.countDown();//調用countDown()方法,使內部計數器減1 11 System.out.printf("VideoConference: Waiting for %d participants.\n",controller.getCount()); 12 } 13 14 @Override 15 public void run() { 16 System.out.printf("VideoConference: Initialization: %d participants.\n",controller.getCount()); 17 try { 18 19 controller.await();//等待,直到CoutDownLatch計數器為0 20 21 System.out.printf("VideoConference: All the participants have come\n"); 22 System.out.printf("VideoConference: Let's start...\n"); 23 } catch (InterruptedException e) { 24 e.printStackTrace(); 25 } 26 } 27 }
參加會議人員類
1 public class Participant implements Runnable { 2 private VideoConference conference; 3 4 private String name; 5 6 public Participant(VideoConference conference, String name) { 7 this.conference=conference; 8 this.name=name; 9 } 10 @Override 11 public void run() { 12 Long duration=(long)(Math.random()*10); 13 try { 14 TimeUnit.SECONDS.sleep(duration); 15 } catch (InterruptedException e) { 16 e.printStackTrace(); 17 } 18 conference.arrive(name);//每到一個人員,CountDownLatch計數器就減少1 19 } 20 }
主函數
1 public static void main(String[] args) { 2 VideoConference conference = new VideoConference(10); 3 Thread threadConference = new Thread(conference); 4 threadConference.start();//開啟await()方法,在內部計數器為0之前線程處於等待狀態 5 for (int i = 0; i < 10; i++) { 6 Participant p = new Participant(conference, "Participant " + i); 7 Thread t = new Thread(p); 8 t.start(); 9 } 10 }
需要注意的地方
CountDownLatch比較容易記憶的是他的功能,是一個線程計數器。等計數器為0時那些先前因調用await()方法休眠的線程被喚醒。
CountDownLatch能夠控制的線程是哪些?是那些調用了CountDownLatch的await()方法的線程
具體使用方式,容易忘記:先運行await()方法的線程,例子中是視頻會議的線程。然后是執行與會者 線程,這里的處理是每到一位(每創建一個線程並運行run()方法時就使計數器減1)就讓計數器減1,等計數器減為0時喚醒因調用await()方法進入休眠的線程。這里的這些與會者就是要等待的線程。
應用場景
等人到齊了才能開始開會;
CyclicBarrier
含義
柵欄允許兩個或者多個線程在某個集合點同步。當一個線程到達集合點時,它將調用await()方法等待其它的線程。線程調用await()方法后,CyclicBarrier將阻塞這個線程並將它置入休眠狀態等待其它線程的到來。等最后一個線程調用await()方法時,CyclicBarrier將喚醒所有等待的線程然后
這些線程將繼續執行。
CyclicBarrier可以傳入另一個Runnable對象作為初始化參數。當所有的線程都到達集合點后,CyclicBarrier類將Runnable對象作為線程執行。
方法
await():使線程置入休眠直到最后一個線程的到來之后喚醒所有休眠的線程
例子
在矩陣(二維數組)中查找一個指定的數字。矩陣將被分為多個子集,每個子集交給一個線程去查找。當所有線程查找完畢后交給最后的線程匯總結果。
查找類:在一個子集中查找指定數字,找到之后把結果存儲后調用await()方法置入休眠等待最后一個線程的到來喚醒
1 public class Searcher implements Runnable { 2 private final CyclicBarrier barrier; 3 @Override 4 public void run() { 5 int counter; 6 System.out.printf("%s: Processing lines from %d to %d.\n",Thread.currentThread().getName(),firstRow,lastRow); 7 for (int i=firstRow; i<lastRow; i++){ 8 int row[]=mock.getRow(i); 9 counter=0; 10 for (int j=0; j<row.length; j++){ 11 if (row[j]==number){ 12 counter++; 13 } 14 } 15 results.setData(i, counter); 16 } 17 System.out.printf("%s: Lines processed.\n",Thread.currentThread().getName()); 18 try { 19 barrier.await(); 20 } catch (InterruptedException e) { 21 e.printStackTrace(); 22 } catch (BrokenBarrierException e) { 23 e.printStackTrace(); 24 } 25 } 26 }
匯總類:匯總每個Searcher找到的結果
1 public class Grouper implements Runnable { 2 private Results results; 3 4 public Grouper(Results results){ 5 this.results=results; 6 } 7 @Override 8 public void run() { 9 int finalResult=0; 10 System.out.printf("Grouper: Processing results...\n"); 11 int data[]=results.getData(); 12 for (int number:data){ 13 finalResult+=number; 14 } 15 System.out.printf("Grouper: Total result: %d.\n",finalResult); 16 } 17 }
主函數,如何把Searcher和Grouper類配合起來呢??
1 public static void main(String[] args) { 2 final int ROWS=10000; 3 final int NUMBERS=1000; 4 final int SEARCH=5; 5 final int PARTICIPANTS=5; 6 final int LINES_PARTICIPANT=2000; 7 MatrixMock mock=new MatrixMock(ROWS, NUMBERS,SEARCH);//矩陣的聲明 8 9 Results results=new Results(ROWS);//結果集 10 11 Grouper grouper=new Grouper(results);//匯總線程 12 13 CyclicBarrier barrier=new CyclicBarrier(PARTICIPANTS,grouper);//柵欄,傳入參數含義:線程同步個數,匯總線程 14 15 Searcher searchers[]=new Searcher[PARTICIPANTS]; 16 for (int i=0; i<PARTICIPANTS; i++){ 17 searchers[i]=new Searcher(i*LINES_PARTICIPANT, (i*LINES_PARTICIPANT)+LINES_PARTICIPANT, mock, results, 5,barrier); 18 Thread thread=new Thread(searchers[i]); 19 thread.start(); 20 } 21 System.out.printf("Main: The main thread has finished.\n"); 22 }
運行結果:
Mock: There are 999286 ocurrences of number in generated data. Thread-0: Processing lines from 0 to 2000. Main: The main thread has finished. Thread-0: Lines processed. Thread-1: Processing lines from 2000 to 4000. Thread-1: Lines processed. Thread-3: Processing lines from 6000 to 8000. Thread-3: Lines processed. Thread-2: Processing lines from 4000 to 6000. Thread-2: Lines processed. Thread-4: Processing lines from 8000 to 10000. Thread-4: Lines processed. Grouper: Processing results... Grouper: Total result: 999286.
需要注意的地方
線程完成任務后調用CyclicBarrier的await()方法休眠等待。在所有線程在集合點均到達時,柵欄調用傳入的Runnable對象進行最后的執行。
與CountDownLatch的區別:
- 在所有線程到達集合點后接受一個Runnable類型的對象作為后續的執行
- 沒有顯示調用CountDown()方法
- CountDownLatch一般只能使用一次,CyclicBarrier可以多次使用
應用場景
多個線程做任務,等到達集合點同步后交給后面的線程做匯總
Phaser
含義
更加復雜和強大的同步輔助類。它允許並發執行多階段任務。當我們有並發任務並且需要分解成幾步執行時,(CyclicBarrier是分成兩步),就可以選擇使用Phaser。Phaser類機制是在每一步結束的位置對線程進行同步,當所有的線程都完成了這一步,才允許執行下一步。
跟其他同步工具一樣,必須對Phaser類中參與同步操作的任務數進行初始化,不同的是,
可以動態的增加或者減少任務數。
函數
arriveAndAwaitAdvance():類似於CyclicBarrier的await()方法,等待其它線程都到來之后同步繼續執行
arriveAndDeregister():把執行到此的線程從Phaser中注銷掉
isTerminated():判斷Phaser是否終止
register():將一個新的參與者注冊到Phaser中,這個新的參與者將被當成沒有執行完本階段的線程
forceTermination():強制Phaser進入終止態
... ...
例子
使用Phaser類同步三個並發任務。這三個任務將在三個不同的文件夾及其子文件夾中查找過去24小時內修改過擴展為為.log的文件。這個任務分成以下三個步驟:
1、在執行的文件夾及其子文件夾中獲取擴展名為.log的文件
2、對每一步的結果進行過濾,刪除修改時間超過24小時的文件
3、將結果打印到控制台
在第一步和第二步結束的時候,都會檢查所查找到的結果列表是不是有元素存在。如果結果列表是空的,對應的線程將結束執行,並從Phaser中刪除。(也就是動態減少任務數)
文件查找類
1 public class FileSearch implements Runnable { 2 private String initPath; 3 4 private String end; 5 6 private List<String> results; 7 8 private Phaser phaser; 9 10 public FileSearch(String initPath, String end, Phaser phaser) { 11 this.initPath = initPath; 12 this.end = end; 13 this.phaser=phaser; 14 results=new ArrayList<>(); 15 } 16 @Override 17 public void run() { 18 19 phaser.arriveAndAwaitAdvance();//等待所有的線程創建完成,確保在進行文件查找的時候所有的線程都已經創建完成了 20 21 System.out.printf("%s: Starting.\n",Thread.currentThread().getName()); 22 23 // 1st Phase: 查找文件 24 File file = new File(initPath); 25 if (file.isDirectory()) { 26 directoryProcess(file); 27 } 28 29 // 如果查找結果為false,那么就把該線程從Phaser中移除掉並且結束該線程的運行 30 if (!checkResults()){ 31 return; 32 } 33 34 // 2nd Phase: 過濾結果,過濾出符合條件的(一天內的)結果集 35 filterResults(); 36 37 // 如果過濾結果集結果是空的,那么把該線程從Phaser中移除,不讓它進入下一階段的執行 38 if (!checkResults()){ 39 return; 40 } 41 42 // 3rd Phase: 顯示結果 43 showInfo(); 44 phaser.arriveAndDeregister();//任務完成,注銷掉所有的線程 45 System.out.printf("%s: Work completed.\n",Thread.currentThread().getName()); 46 } 47 private void showInfo() { 48 for (int i=0; i<results.size(); i++){ 49 File file=new File(results.get(i)); 50 System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath()); 51 } 52 // Waits for the end of all the FileSearch threads that are registered in the phaser 53 phaser.arriveAndAwaitAdvance(); 54 } 55 private boolean checkResults() { 56 if (results.isEmpty()) { 57 System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase()); 58 System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase()); 59 //結果為空,Phaser完成並把該線程從Phaser中移除掉 60 phaser.arriveAndDeregister(); 61 return false; 62 } else { 63 // 等待所有線程查找完成 64 System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size()); 65 phaser.arriveAndAwaitAdvance(); 66 return true; 67 } 68 } 69 private void filterResults() { 70 List<String> newResults=new ArrayList<>(); 71 long actualDate=new Date().getTime(); 72 for (int i=0; i<results.size(); i++){ 73 File file=new File(results.get(i)); 74 long fileDate=file.lastModified(); 75 76 if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){ 77 newResults.add(results.get(i)); 78 } 79 } 80 results=newResults; 81 } 82 private void directoryProcess(File file) { 83 // Get the content of the directory 84 File list[] = file.listFiles(); 85 if (list != null) { 86 for (int i = 0; i < list.length; i++) { 87 if (list[i].isDirectory()) { 88 // If is a directory, process it 89 directoryProcess(list[i]); 90 } else { 91 // If is a file, process it 92 fileProcess(list[i]); 93 } 94 } 95 } 96 } 97 private void fileProcess(File file) { 98 if (file.getName().endsWith(end)) { 99 results.add(file.getAbsolutePath()); 100 } 101 } 102 }
主函數:
1 public static void main(String[] args) { 2 Phaser phaser = new Phaser(3); 3 4 FileSearch system = new FileSearch("C:\\Windows", "log", phaser); 5 FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser); 6 FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser); 7 8 Thread systemThread = new Thread(system, "System"); 9 systemThread.start(); 10 Thread appsThread = new Thread(apps, "Apps"); 11 appsThread.start(); 12 Thread documentsThread = new Thread(documents, "Documents"); 13 documentsThread.start(); 14 try { 15 systemThread.join(); 16 appsThread.join(); 17 documentsThread.join(); 18 } catch (InterruptedException e) { 19 e.printStackTrace(); 20 } 21 System.out.printf("Terminated: %s\n", phaser.isTerminated()); 22 }
注意的地方
例子中Phaser分了三個步驟:查找文件、過濾文件、打印結果。並且在查找文件和過濾文件結束后對結果進行分析,如果是空的,將此線程從Phaser中注銷掉。也就是說,下一階段,該線程將不參與運行。
在run()方法中,開頭調用了phaser的arriveAndAwaitAdvance()方法來保證所有線程都啟動了之后再開始查找文件。在查找文件和過濾文件階段結束之后,都對結果進行了處理。即:如果結果是空的,那么就把該條線程移除,如果不空,那么等待該階段所有線程都執行完該步驟之后在統一執行下一步。最后,任務執行完后,把Phaser中的線程均注銷掉。
Phaser其實有兩個狀態:活躍態和終止態。當存在參與同步的線程時,Phaser就是活躍的。並且在每個階段結束的時候同步。當所有參與同步的線程都取消注冊的時候,Phase就處於終止狀態。在這種狀態下,Phaser沒有任務參與者。
Phaser主要功能就是執行多階段任務,並保證每個階段點的線程同步。在每個階段點還可以條件或者移除參與者。主要涉及方法arriveAndAwaitAdvance()和register()和arriveAndDeregister()
使用場景
多階段任務