一,Fork-Join
1,定義:
Fork-Join框架:就是在必要的情況下,將一個大任務,進行拆分(fork)成若干個小任務(拆到不能再拆時),再將一個個的小任務運算的結果進行join匯總。
2,,Fork-Join體現了分而治之。什么是分而治之?
規模為N的問題, 當N < 閾值,直接解決。當 N > 閾值, 將N分解為k個小規模子問題,子問題互相獨立,與原問題形式相同。將子問題的解合並得到原問題大的解。
3,工作密取(workStealing)
4,Fork-Join實戰
4.1,Fork/Join的同步調用同時演示返回值結果: 統計整型數組中所有元素的和
/** * 產生整型數組工具類 */ public class MakeArray { //數組長度 public static final int ARRAY_LENGTH = 4000; public static int[] makeArray(){ //new一個隨機數發生器 Random rd = new Random(); int[] result = new int[ARRAY_LENGTH]; for (int i = 0; i < ARRAY_LENGTH;i++){ //用隨機數填充數組 result[i] = rd.nextInt(ARRAY_LENGTH*3); } return result; } }
/** * 使用Fork-Join框架進行計算 */ public class SumArray { private static class SumTask extends RecursiveTask<Integer>{ private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10; private int[] src;//要實際應用的數組 private int fromIndex;//開始統計的下標 private int toIndex;//統計到哪里結束的下標 public SumTask(int[] src,int fromIndex,int toIndex){ this.src = src; this.fromIndex = fromIndex; this.toIndex = toIndex; } @Override protected Integer compute() { if(toIndex - fromIndex < THRESHOLD){ int count = 0; for(int i = fromIndex;i <= toIndex;i++){ try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } count = count + src[i]; } return count; }else { //fromIndex ..... mid....... toIndex。這里我們自己定義的算法:大於閾值就平均分為兩部分 int mid = (fromIndex + toIndex)/2; SumTask left = new SumTask(src,fromIndex,mid); SumTask right = new SumTask(src,mid,toIndex); invokeAll(left,right); return left.join() + right.join(); } } } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); int[] src = MakeArray.makeArray(); SumTask innerFind = new SumTask(src,0,src.length-1); long start = System.currentTimeMillis(); pool.invoke(innerFind);//同步調用 System.out.println("Task is Running......."); System.out.println("the count is "+ innerFind.join()+" spend time:"+(System.currentTimeMillis()-start)+"ms"); } /** * 注意: * 對於這種簡單的相加操作,其實單線程處理的速度更快。 * 使用forkjoin后,使用多線程進行處理。由於需要線程間的切換(上下文切換),導致forkjoin的處理方式花的時間更多。 * 所以使用forkjoin一定要注意場合。 * 這也是redis雖然使用單進程單線程模式,但是處理能力非常強的原因,就是因為redis處理的數據比較簡單(String)。 * 並且使用單線程處理避免了進程間的切換。 */ }
4.2,Fork/Join的異步調用同時演示不要求返回值:遍歷指定目錄(含子目錄),尋找指定類型文件
/** * 遍歷指定目錄(含子目錄),找尋指定類型文件 * 不需要返回值的的Fork/Join */ public class FindDirsFiles extends RecursiveAction{ //當前任務需要搜尋的目錄 private File path; public FindDirsFiles(File path){ this.path = path; } @Override protected void compute() { List<FindDirsFiles> subTasks = new ArrayList<>(); File[] files = path.listFiles();//拿到目錄下文件 if (files != null){ for (File file : files){ if (file.isDirectory()){ //對每一個子目錄都新建一個子任務 subTasks.add(new FindDirsFiles(file)); }else { //遇到文件,檢查 if (file.getAbsolutePath().endsWith("txt")){ System.out.println("文件:"+ file.getAbsolutePath()); } } } if (!subTasks.isEmpty()){ for (FindDirsFiles subTask:invokeAll(subTasks)){ //上面的invlkeAll():用來遞交子任務 subTask.join();//等待子任務 } } } } public static void main(String[] args) { try { //用一個ForkJoinPool 實例調度總任務 ForkJoinPool pool = new ForkJoinPool(); FindDirsFiles task = new FindDirsFiles(new File("D:\\yishang")); pool.execute(task); System.out.println("task is running........"); //主線程做一些自己的事情 try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } int otherWork = 0; for (int i = 0; i<100;i++){ otherWork = otherWork + i; } System.out.println("main Thread done sth ....., otherWork = "+otherWork); task.join();//阻塞方法, System.out.println("task end"); }catch (Exception e){ e.printStackTrace(); } } }
二,CountDownLatch:計數器
1,方法:
latch.countDown():調用該方法,計數器的數量減一
latch.await():調用該方法,如果計數器的數量沒有減為0,那么就該方法會阻塞,知道計數器的數量為0才繼續執行后面的代碼
2,示例代碼:當初始化工作完成以后,才執行業務邏輯代碼
/** * 演示CountDownLatch,有5個初始化的線程,6個扣除點。 * 扣除完畢以后,主線程和業務線程才能繼續自己的工作 */ public class UseCountDownLatch { static CountDownLatch latch = new CountDownLatch(6); /** * 初始化線程 */ private static class InitThread implements Runnable{ @Override public void run() { System.out.println("InitThread_"+Thread.currentThread().getId()+ " ready init work ......."); latch.countDown();//初始化線程完成工作了 //初始化線程調用了countDown()以后,還是可以繼續走自己的邏輯的 for (int i = 0; i < 2; i++) { System.out.println("InitThread_"+Thread.currentThread().getId()+ ".....continue to its work"); } } } /** * 業務線程 * 等所有的初始化線程的初始化工作做完了,業務線程才能執行 */ private static class BusiThread implements Runnable{ @Override public void run() { try { //業務線程阻塞,直到countDown減為0,才往下執行 latch.await();//阻塞方法 } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < 2; i++) { System.out.println("BusiThread_"+Thread.currentThread().getId()+ " do business"); } } } public static void main(String[] args)throws InterruptedException { //單獨的初始化線程,初始化分為2,需要扣減2次 new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread_"+Thread.currentThread().getId()+"ready init work step 1st...."); latch.countDown();//每完成一步初始化工作,扣減一次 System.out.println("begin step 2nd......."); try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread_"+Thread.currentThread().getId()+"ready init work step 2nd...."); latch.countDown();//每完成一步初始化工作,扣減一次 } }).start(); new Thread(new BusiThread()).start(); for (int i = 0; i < 4; i++) { Thread thread = new Thread(new InitThread()); thread.start(); } //主線程阻塞,必須等countDown減為0,才往下執行 latch.await(); System.out.println("main do its work ........."); } }
三,CyclicBarrier:柵欄
1,方法:
barrier.await():等所有線程執行到該方法時,才能繼續向前執行。否則,一直阻塞在這里
2,示例代碼:
/** * 演示:CyclicBarrier,當所有的線程都來到了barrier.await();線程才繼續往下執行。不然就一直阻塞在這個方法前 * 可以類比人員到指定的集合場地,然后在一起出發的場景。比如出去旅游,等所有的人都來到集合地點,然后大家才一起出發。 */ public class UseCyslicBarrier { private static CyclicBarrier barrier = new CyclicBarrier(5); //工作線程 private static class SubThread implements Runnable{ @Override public void run() { long id = Thread.currentThread().getId(); //為了模擬真實環境,每個線程到達barrier.await()方法的時間不一樣。隨即決定工作線程是否睡眠 Random random = new Random(); try { if (random.nextBoolean()){ Thread.sleep(1000+id); System.out.println("Thread_"+id+" 在來的路上堵車了,堵車時間 "+(1000+id)+"ms"); } System.out.println("Thread_"+id+" 在來的路上沒有堵車,提前到達集合地點,然后在集合地點等待其他人員.... "); //當5個線程都執行到了這個地方,然后所有的線程繼續往下執行。 barrier.await(); Thread.sleep(1000+id); System.out.println("Thread_"+id+"開始上車"); }catch (Exception e){ e.printStackTrace(); } } } public static void main(String[] args) { for (int i = 0; i < 5; i++) { Thread thread = new Thread(new SubThread()); thread.start(); } } }
四,Semaphore:信號燈(控制並發執行的線程個數)
1,方法:
sp.acquire():獲得信號燈
sp.release():釋放信號燈
2,圖示理解:
3,示例代碼:
/** * 信號燈:控制並發執行的線程個數 */ public class SemaphoreTest { public static void main(String[] args) { //最多運行3個線程並發執行 final Semaphore sp=new Semaphore(3); Runnable runnable=new Runnable() { @Override public void run() { try { sp.acquire();//獲得信號燈 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("線程"+Thread.currentThread().getName()+"進入,還有"+(3-sp.availablePermits())+"個線程"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("線程"+Thread.currentThread().getName()+"離開,還有"+(3-sp.availablePermits())+"個線程"); //釋放信號燈 sp.release(); } }; //開啟20個線程 for (int i = 0; i < 20; i++) { Thread thread = new Thread(runnable); thread.start(); } } }
五,Exchanger(兩個線程之間做數據交換)
1,方法:
exchanger.exchange(data):該方法一直阻塞到另外一個線程過來交換數據
2,示例代碼:
public class ExchangerTest { public static void main(String[] args) { final Exchanger exchanger = new Exchanger(); //線程1 new Thread(new Runnable() { @Override public void run() { try { String data1 = "aaa"; System.out.println("線程"+Thread.currentThread().getName()+":當前的數據是"+data1+ ",該線程正在准備把 "+data1+"換出去"); String data2 = (String) exchanger.exchange(data1); System.out.println("線程"+Thread.currentThread().getName()+ "換回的數據為"+data2); }catch (InterruptedException e){ } } }).start(); //線程二 new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(new Random().nextInt(3000)); String data1 = "bbb"; System.out.println("線程"+Thread.currentThread().getName()+":當前的數據是"+data1+ ",該線程正在准備把 "+data1+"換出去"); String data2 = (String) exchanger.exchange(data1); System.out.println("線程"+Thread.currentThread().getName()+ "換回的數據為"+data2); }catch (InterruptedException e){ } } }).start(); } }