Fork-Join(分而治之)
規模為N的問題,N<閾值,直接解決,N>閾值,將N分解為K個小規模子問題,子問題互相對立,與原問題形式相同,將子問題的解合並得到原問題的解
如何使用的流程圖
用法
1.Fork/Join的同步用法同時演示返回結果值:統計整形數組中所有元素的和
兩個main方法,SumSingleThread類里的main是單線程求和,每次休眠一秒;SumByForkJoin類里使用forkandjoin進行求和
下面是個生成隨機整數數組的類
import java.util.Random; /** * 產生一個整形的數組 * */ public class CreateArray { public static final int ARRAY_LENTH=1000; public static int[] createArray(){ Random r = new Random(); int[] result = new int[ARRAY_LENTH]; for(int i=0;i<ARRAY_LENTH;i++){ result[i]=r.nextInt(ARRAY_LENTH*3); } return result; } }
import com.thread.demo.SleepTools; public class SumSingleThread { public static void main(String[] args) { int count = 0; int[] src =CreateArray.createArray(); long start = System.currentTimeMillis(); for(int i= 0;i<src.length;i++){ SleepTools.ms(1); count = count + src[i]; } System.out.println("The count is "+count +" spend time:"+(System.currentTimeMillis()-start)+"ms"); } }
public class SumByForkJoin { private static class SumTask extends RecursiveTask<Integer>{ private static final int THRESHOLD = CreateArray.ARRAY_LENTH/10; private int[] src; //表示我們要實際統計的數組 private int fromIndex;//開始統計的下標 private int toIndex;//統計到哪里結束的下標 public SumTask(int[] src, int fromIndex, int toIndex) { this.src = src; this.fromIndex = fromIndex; this.toIndex = toIndex; } /** * 這個是有返回值的,在compute方法中按照需要的邏輯寫forkjoin邏輯 * */ @Override protected Integer compute() { //當滿足閾值范圍時,進入計算 if(toIndex-fromIndex<THRESHOLD){ int count = 0; for(int i=fromIndex;i<toIndex;i++){ count=count+src[i]; } return count; }else{//不滿足閾值時,繼續拆分 int mid = (fromIndex+toIndex)/2; SumTask left = new SumTask(src, fromIndex, mid); SumTask right = new SumTask(src, mid+1, toIndex); invokeAll(left, right); return left.join()+right.join(); } } } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); int[] src = CreateArray.createArray(); SumTask innerFind = new SumTask(src,0,src.length-1); long start = System.currentTimeMillis(); pool.invoke(innerFind);//同步調用,就是這個方法執行完才會繼續執行下面的sysout,所以以這個demo是同步的用法,異步調用的方法:execute(object) System.out.println("Task is Running....."); System.out.println("The count is "+innerFind.join() +" spend time:"+(System.currentTimeMillis()-start)+"ms"); } }
Q:把循環求和中的sleep注掉,並且增大數組的長度,會發現,在小於一定長度時,單線程直接求和的速度比使用fork/jion快
A:因為使用forkJoin時cpu會進行上下問切換操作,這個操作相比較於計算型操作其實更費時間
2.Fork/Join的異步用法同時演示不要求返回值:遍歷指定目錄(含子目錄)尋找指定類型文件
import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; public class FindFile extends RecursiveAction{ private File path; public FindFile(File path) { this.path=path; } @Override protected void compute() { List<FindFile> subTasks = new ArrayList<FindFile>(); File[] files = path.listFiles(); if(files!=null){ for(File file:files){//循環文件路徑 if(file.isDirectory()){//判斷是不是目錄 subTasks.add(new FindFile(file)); }else{ if(file.getAbsolutePath().endsWith("avi")){ System.out.println("找到對應文件:"+file.getAbsolutePath()); } } } if(!subTasks.isEmpty()){ for(FindFile sub:invokeAll(subTasks)){//invokeAll的返回值和傳入的值一樣 sub.join(); } } } } public static void main(String[] args) { try { ForkJoinPool pool = new ForkJoinPool(); FindFile task = new FindFile(new File("D:/")); pool.execute(task);//異步調用 System.out.println("Task is Running......"); Thread.sleep(1); int otherWork = 0; for(int i=0;i<100;i++){ otherWork = otherWork+i; } System.err.println("Main Thread done sth......,otherWork="+otherWork); task.join();//阻塞的方法,此處是為了防止出現主線程走完,task被直接中斷的情況 System.out.println("Task end"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
常用的並發工具類(直接放課程里的demo了,很詳細了~)
CountDownLatch
作用:是一組線程等待其他的線程完成工作以后在執行,加強版join
await用來等待,countDown負責計數器的減一
import java.util.concurrent.CountDownLatch; import com.xiangxue.tools.SleepTools; /** *@author Mark老師 享學課堂 https://enjoy.ke.qq.com * *類說明:演示CountDownLatch,有5個初始化的線程,6個扣除點, *扣除完畢以后,主線程和業務線程才能繼續自己的工作 */ public class UseCountDownLatch { static CountDownLatch latch = new CountDownLatch(6); //初始化線程(只有一步,有4個) private static class InitThread implements Runnable{ @Override public void run() { System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work......"); latch.countDown();//初始化線程完成工作了,countDown方法只扣減一次; for(int i =0;i<2;i++) { System.out.println("Thread_"+Thread.currentThread().getId() +" ........continue do its work"); } } } //業務線程 private static class BusiThread implements Runnable{ @Override public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } for(int i =0;i<3;i++) { System.out.println("BusiThread_"+Thread.currentThread().getId() +" do business-----"); } } } public static void main(String[] args) throws InterruptedException { //單獨的初始化線程,初始化分為2步,需要扣減兩次 new Thread(new Runnable() { @Override public void run() { SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 1st......"); latch.countDown();//每完成一步初始化工作,扣減一次 System.out.println("begin step 2nd......."); SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 2nd......"); latch.countDown();//每完成一步初始化工作,扣減一次 } }).start(); new Thread(new BusiThread()).start(); for(int i=0;i<=3;i++){ Thread thread = new Thread(new InitThread()); thread.start(); } latch.await(); System.out.println("Main do ites work........"); } }
CyclicBarrier
讓一組線程達到某個屏障,被阻塞,一直到組內最后一個線程達到屏障時,屏障開放,所有被阻塞的線程會繼續運行CyclicBarrier(int parties)
CyclicBarrier(int parties, Runnable barrierAction),屏障開放,barrierAction定義的任務會執行
CountDownLatch和CyclicBarrier辨析
1、countdownlatch放行由第三者控制,CyclicBarrier放行由一組線程本身控制
2、countdownlatch放行條件》=線程數,CyclicBarrier放行條件=線程數
import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; /** *@author Mark老師 享學課堂 https://enjoy.ke.qq.com * *類說明:CyclicBarrier的使用 */ public class UseCyclicBarrier { private static CyclicBarrier barrier = new CyclicBarrier(5,new CollectThread()); private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<>();//存放子線程工作結果的容器 public static void main(String[] args) { for(int i=0;i<=4;i++){ Thread thread = new Thread(new SubThread()); thread.start(); } } //負責屏障開放以后的工作 private static class CollectThread implements Runnable{ @Override public void run() { StringBuilder result = new StringBuilder(); for(Map.Entry<String,Long> workResult:resultMap.entrySet()){ result.append("["+workResult.getValue()+"]"); } System.out.println(" the result = "+ result); System.out.println("do other business........"); } } //工作線程 private static class SubThread implements Runnable{ @Override public void run() { long id = Thread.currentThread().getId();//線程本身的處理結果 resultMap.put(Thread.currentThread().getId()+"",id); Random r = new Random();//隨機決定工作線程的是否睡眠 try { if(r.nextBoolean()) { Thread.sleep(2000+id); System.out.println("Thread_"+id+" ....do something "); } System.out.println(id+"....is await"); barrier.await(); Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do its business "); } catch (Exception e) { e.printStackTrace(); } } } }
Semaphore
控制同時訪問某個特定資源的線程數量,用在流量控制
Exchange
兩個線程間的數據交換
import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Exchanger; /** *@author Mark老師 享學課堂 https://enjoy.ke.qq.com * *類說明:Exchange的使用 */ public class UseExchange { private static final Exchanger<Set<String>> exchange = new Exchanger<Set<String>>(); public static void main(String[] args) { //第一個線程 new Thread(new Runnable() { @Override public void run() { Set<String> setA = new HashSet<String>();//存放數據的容器 try { /*添加數據 * set.add(.....) * */ setA = exchange.exchange(setA);//交換set /*處理交換后的數據*/ } catch (InterruptedException e) { } } }).start(); //第二個線程 new Thread(new Runnable() { @Override public void run() { Set<String> setB = new HashSet<String>();//存放數據的容器 try { /*添加數據 * set.add(.....) * set.add(.....) * */ setB = exchange.exchange(setB);//交換set /*處理交換后的數據*/ } catch (InterruptedException e) { } } }).start(); } }
(ps:所有內容參考享學課堂視頻)