java高並發編程(五)線程池


摘自馬士兵java並發編程

一、認識Executor、ExecutorService、Callable、Executors

/**
 * 認識Executor
 */
package yxxy.c_026;

import java.util.concurrent.Executor;

public class T01_MyExecutor implements Executor {

    public static void main(String[] args) {
        new T01_MyExecutor().execute(new Runnable(){

            @Override
            public void run() {
                System.out.println("hello executor");
            }
            
        });
    }

    @Override
    public void execute(Runnable command) {
        //new Thread(command).run();
        command.run();
    }

}
View Code
 
Executor執行器是一個接口,只有一個方法execute執行任務,在java的線程池的框架里邊,這個是最頂層的接口;
ExecutorService:從Executor接口繼承。
Callable:里面call方法,和Runnable接口很像,設計出來都是被其他線程調用的;但是Runnable接口里面run方法是沒有返回值的也不能拋出異常;而call方法有返回值可以拋異常;
Executors: 操作Executor的一個工具類;以及操作ExecutorService,ThreadFactory,Callable等;
 
二、ThreadPool:      
/**
 * 線程池的概念
 */
package yxxy.c_026;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class T05_ThreadPool {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(5); //execute submit
        for (int i = 0; i < 6; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(service);
        
        service.shutdown();
        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);
        
        TimeUnit.SECONDS.sleep(5);
        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);
    }
}
View Code

console:

java.util.concurrent.ThreadPoolExecutor@53d8d10a[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
false
true
java.util.concurrent.ThreadPoolExecutor@53d8d10a[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-5
pool-1-thread-4
pool-1-thread-1
true
true
java.util.concurrent.ThreadPoolExecutor@53d8d10a[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
View Code
創建了一個線程池,扔了5個線程,接下來要執行6個任務,扔進去線程池里面就啟一個線程幫你執行一個,因為這里最多就起5個線程,接下來扔第6個任務的時候,不好意思,它排隊了,排在線程池所維護的一個任務隊列里面,任務隊列大多數使用的都是BlockingQueue,這是線程池的概念;
有什么好處?好處在於如果這個任務執行完了,這個線程不會消失,它執行完任務空閑下來了,如果有新的任務來的時候,直接交給這個線程來運行就行了,不需要新啟動線程;從這個概念上講,如果你的任務和線程池線程數量控制的比較好的情況下,你不需要啟動新的線程就能執行很多很多的任務,效率會比較高,並發性好;
 
service.shutdown():關閉線程池,shutdown是正常的關閉,它會等所有的任務都執行完才會關閉掉;還有一個是shutdownNow,二話不說直接就給關了,不管線程有沒有執行完;
service.isTerminated(): 代表的是這里所有執行的任務是不是都執行完了。isShutdown()為true,注意它關了但並不代表它執行完了,只是代表正在關閉的過程之中(注意打印Shutting down)
打印5個線程名字,而且第一個線程執行完了之后,第6個任務來了,第1個線程繼續執行,不會有線程6;
 
當所有線程全部執行完畢之后,線程池的狀態為Terminated,表示正常結束,complete tasks=6
 
線程池里面維護了很多線程,等着你往里扔任務,而扔任務的時候它可以維護着一個任務列表,還沒有被執行的任務列表,同樣的它還維護着另外一個隊列,complete tasks,結束的任務隊列,任務執行結束扔到這個隊列里,所以,一個線程池維護着兩個隊列;
 
 
三、Future                                  
/**
 * 認識future
 */
package yxxy.c_026;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

public class T06_Future {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        /*FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>(){
            @Override
            public Integer call() throws Exception {
                TimeUnit.MILLISECONDS.sleep(3000);
                return 1000;
            }
        });*/
        
        FutureTask<Integer> task = new FutureTask<>(()->{
            TimeUnit.MILLISECONDS.sleep(3000);
            return 1000;
        });
        
        new Thread(task).start();
        
        System.out.println(task.get()); //阻塞
        
        //*******************************
        ExecutorService service = Executors.newFixedThreadPool(5);
        Future<Integer> f = service.submit(()->{
            TimeUnit.MILLISECONDS.sleep(5000);
            return 1;
        });
        System.out.println(f.isDone());
        System.out.println(f.get());
        System.out.println(f.isDone());
        
    }
}
View Code
1000
false
1
true
View Code
Future: ExecutorService里面有submit方法,它的返回值是Future類型,因為你扔一個任務進去需要執行一段時間,未來的某一個時間點上,任務執行完了產生給你一個結果,這個Future代表的就是那個Callable的返回值;
 
 
四、並行計算的例子:      
/**
 * 線程池的概念
 * nasa
 */
package yxxy.c_026;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class T07_ParallelComputing {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long start = System.currentTimeMillis();
        List<Integer> results = getPrime(1, 200000); 
        long end = System.currentTimeMillis();
        System.out.println(end - start);
        
        final int cpuCoreNum = 4;
        
        ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
        
        MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
        MyTask t2 = new MyTask(80001, 130000);
        MyTask t3 = new MyTask(130001, 170000);
        MyTask t4 = new MyTask(170001, 200000);
        
        Future<List<Integer>> f1 = service.submit(t1);
        Future<List<Integer>> f2 = service.submit(t2);
        Future<List<Integer>> f3 = service.submit(t3);
        Future<List<Integer>> f4 = service.submit(t4);
        
        start = System.currentTimeMillis();
        f1.get();
        f2.get();
        f3.get();
        f4.get();
        end = System.currentTimeMillis();
        System.out.println(end - start);
    }
    
    static class MyTask implements Callable<List<Integer>> {
        int startPos, endPos;
        
        MyTask(int s, int e) {
            this.startPos = s;
            this.endPos = e;
        }
        
        @Override
        public List<Integer> call() throws Exception {
            List<Integer> r = getPrime(startPos, endPos);
            return r;
        }
        
    }
    
    //判斷是否是質數
    static boolean isPrime(int num) {
        for(int i=2; i<=num/2; i++) {
            if(num % i == 0) return false;
        }
        return true;
    }
    
    static List<Integer> getPrime(int start, int end) {
        List<Integer> results = new ArrayList<>();
        for(int i=start; i<=end; i++) {
            if(isPrime(i)) results.add(i);
        }
        
        return results;
    }
}
View Code

console:

2280
818
View Code
第二種方式使用了一個線程池,一般線程池有多少個線程,數量多少合適是需要調整的,大多數情況下cpu有幾個核至少就應該起多少個線程,可以多起一個但不能少於cpu核數,將20萬分成了4段;
這里為什么不將20萬平均分呢?
 
 
五、CachedThreadPool
package yxxy.c_026;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class T08_CachedPool {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        System.out.println(service);
        
        for (int i = 0; i < 2; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        
        System.out.println(service);
        
        TimeUnit.SECONDS.sleep(80); //cachedthreadPool里面的線程空閑狀態默認60s后銷毀,這里保險起見
        
        System.out.println(service);
        
        
    }
}
View Code

console:

java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
pool-1-thread-2
pool-1-thread-1
java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
View Code
FixedThreadPool為固定個數的線程池;
CachedThreadPool:剛開始一個線程都沒有,來一個任務就起一個線程,假設起了兩個線程A,B,如果來了第三個任務,這時候恰好線程B任務執行完了,線程池里面有空閑的,這時候直接讓線程池里空閑的線程B來執行;最多起多少個線程?你的系統能支撐多少個為止;默認的情況下,只要一個線程空閑的狀態超過60s,這個線程就自動的銷毀了,alivetime=60s;這個值也可以自己指定。
 
 
六、SingleThreadExecutor
package yxxy.c_026;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class T09_SingleThreadPool {
    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        for(int i=0; i<5; i++) {
            final int j = i;
            service.execute(()->{
                
                System.out.println(j + " " + Thread.currentThread().getName());
            });
        }
    }
}
View Code

console:

0 pool-1-thread-1
1 pool-1-thread-1
2 pool-1-thread-1
3 pool-1-thread-1
4 pool-1-thread-1
View Code
SingleThreadExecutor:線程池里就1個線程;扔5個任務,也永遠只有1個線程執行;
它能保證任務前后一定是順序執行,先扔的任務一定先執行完;只有等第一個任務執行完才執行第二個任務。
 
 
七、ScheduledThreadPool
package yxxy.c_026;

import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class T10_ScheduledPool {
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        service.scheduleAtFixedRate(()->{
            try {
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
}
View Code
ScheduledThreadPool: 執行定時的任務,定時器線程池,一般可以用來替代timer,而且它里面的線程是可以復用的,第一個線程執行完了之后,任務來了如果第一個線程是空閑的,還可以拿第一個線程來執行。而Timer每次都是new一個新的線程。
scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit),第1個參數是任務,第1個任務馬上執行,每隔500毫秒這個任務重復執行。
 
 
八、WorkStealingPool
/**
 *
 */
package yxxy.c_026;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class T11_WorkStealingPool {
    public static void main(String[] args) throws IOException {
        ExecutorService service = Executors.newWorkStealingPool();
        int count = Runtime.getRuntime().availableProcessors();    //看cpu多少核的;如果是4核,默認就幫你起4個線程
        System.out.println(count);    
        
        service.execute(new R(1000));
        for(int i=0; i<count; i++){
            service.execute(new R(2000));
        }
        
        //由於產生的是精靈線程(守護線程、后台線程),主線程不阻塞的話,看不到輸出
        System.in.read(); 
    }

    static class R implements Runnable {
        int time;

        R(int t) {
            this.time = t;
        }

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            System.out.println(time  + " " + Thread.currentThread().getName());
        }
    }
}
View Code

console:

8
1000 ForkJoinPool-1-worker-1
2000 ForkJoinPool-1-worker-2
2000 ForkJoinPool-1-worker-0
2000 ForkJoinPool-1-worker-5
2000 ForkJoinPool-1-worker-3
2000 ForkJoinPool-1-worker-6
2000 ForkJoinPool-1-worker-7
2000 ForkJoinPool-1-worker-4
2000 ForkJoinPool-1-worker-1
View Code
WorkStealingPool:工作竊取,假設有3個線程A、B、C在運行,workStealing可以簡單這么認為,每個線程都維護自己的一個隊列,線程A的隊列里頭積累了5個任務,線程B的隊列里1個任務,C的隊列里2個任務;那么當線程B執行完任務之后,他會去別的線程池所維護的隊列里面把任務偷過來繼續執行,主動的找活干。
本質上是使用ForkJoinPool來實現的:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
View Code

 

例子解釋:cpu多少核默認的起多少個線程,(這里是8),前面幾個任務都扔給1-8個線程了,第9個任務來的時候在那里等着了,誰會去執行它呢?先執行完任務的這個線程會去執行。第1個線程只睡1s鍾,首先執行完,所以第9個任務一定是第一個線程1去運行它,他會主動的把任務拿過去運行。
workStealing的線程是精靈線程,daemon線程,特點就是主線程main方法一旦結束了,它后台可能還在運行,但是你是看不到它任務輸出的;這里Syetem.in.read()讓主函數阻塞才能看到輸出。debug的時候能看到Daemon Thread[ForkJoinPool-1-worker-1]。為什么用精靈線程?它是在后台不斷的運行的,只要虛擬機不退出,這個線程就不會退出,你有任務來了之后,它永遠會主動去拿。
 
workStealing用於什么場景:就說任務分配的不是很均勻,有的線程維護的任務隊列比較長,有些線程執行完任務就結束了不太合適,所以他執行完了之后可以去別的線程維護的隊列里去偷任務,這樣效率更高。
 
 
 
九、ForkJoinPool    
ForkJoinPool: forkjoin的意思就是如果有一個難以完成的大任務,需要計算量特別大,時間特別長,可以把大任務切分成一個個小任務,如果小任務還是太大,它還可以繼續分,至於分成多少你可以自己指定,... 分完之后,把結果進行合並,最后合並到一起join一起,產生一個總的結果。而里面任務的切分你可以自己指定,線程的啟動根據你任務切分的規則,由ForkJoinPool這個線程池自己來維護。
 
 
例子1:
package yxxy.c_026;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;

public class T12_ForkJoinPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();
    
    static {
        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }
        
        System.out.println(Arrays.stream(nums).sum()); //stream api 
    }
    
    
    static class AddTask extends RecursiveAction { 
        
        int start, end;
        
        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected void compute() {
            
            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                System.out.println("from:" + start + " to:" + end + " = " + sum);
            } else {
                int middle = start + (end-start)/2;
                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
            }
        }
    }
    
    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        fjp.execute(task);
        
        System.in.read();
        
    }
}
View Code

console:

49494882
from:906250 to:937500 = 1545274
from:968750 to:1000000 = 1537201
from:593750 to:625000 = 1548289
from:718750 to:750000 = 1546396
from:468750 to:500000 = 1550373
from:843750 to:875000 = 1543421
from:218750 to:250000 = 1549856
from:93750 to:125000 = 1548384
from:562500 to:593750 = 1541814
from:812500 to:843750 = 1547885
from:187500 to:218750 = 1546831
from:687500 to:718750 = 1554064
from:437500 to:468750 = 1547434
from:937500 to:968750 = 1547676
from:875000 to:906250 = 1551839
from:62500 to:93750 = 1548576
from:531250 to:562500 = 1550943
from:656250 to:687500 = 1544991
from:156250 to:187500 = 1548367
from:406250 to:437500 = 1539881
from:125000 to:156250 = 1548128
from:500000 to:531250 = 1545229
from:781250 to:812500 = 1544296
from:625000 to:656250 = 1545283
from:375000 to:406250 = 1553931
from:31250 to:62500 = 1544024
from:750000 to:781250 = 1543573
from:343750 to:375000 = 1546407
from:0 to:31250 = 1539743
from:281250 to:312500 = 1549470
from:312500 to:343750 = 1552190
from:250000 to:281250 = 1543113
View Code
 
例子解釋:
對數組中100萬個數求和計算,第一種方式是普通的將所有數加在一起(for循環);
第二種方式使用ForkJoinPool計算,分而治之,它里面執行的任務必須是ForkJoinTask,這個任務可以自動進行切分,一般用的時候從RecursiveAction或RecursiveTask繼承,RecursiveTask遞歸任務,因為它切分任務還可以在切分。RecursiveAction沒有返回值,RecursiveTask有返回值。
 
例子2:
package yxxy.c_026;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;

public class T12_ForkJoinPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();
    
    static {
        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }
        
        System.out.println(Arrays.stream(nums).sum()); //stream api 
    }
    
    
    static class AddTask extends RecursiveTask<Long> {
        
        int start, end;
        
        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected Long compute() {
            
            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                return sum;
            } 
            
            int middle = start + (end-start)/2;
            
            AddTask subTask1 = new AddTask(start, middle);
            AddTask subTask2 = new AddTask(middle, end);
            subTask1.fork();
            subTask2.fork();
            
            return subTask1.join() + subTask2.join();
        }
    }
    
    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        fjp.execute(task);
        
        long result = task.join();
        System.out.println(result);
    }
}
View Code

console:

49498457
49498457
View Code
和例子1差不多,唯一的區別是有返回值了,RecursiveTask<V>中的V泛型就是返回值類型。
long result = task.join(),因為join本身就是阻塞的,只有等所有的都執行完了,最后才得出總的執行結果。所以不需要System.in.read了;
 
 
 
十、自定義線程池 ThreadPoolExecutor
ThreadPoolExecutor:大多數的線程池的實現背后調用的都是ThreadPoolExecutor(前面6種就ForkJoinPool不是),它是線程池通用的一個類,可以自定義線程池;
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize:核心的線程池里的線程數,自己指定。
maximumPoolSize:最多這個線程池里裝多少個線程;
keepAliveTime:線程呆多久沒有任務傳給它就會消失;
unit:和上面統一指定的;
blockingQueue:真正的裝任務的容器,往往都是用blockingQueue;阻塞式的;任務來了就扔進去,什么時候用到了都可以取。
 
例如,fixedThreadPool的實現是:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
View Code

 

十一、parallel stream  

package yxxy.c_026;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class T14_ParallelStreamAPI {
    public static void main(String[] args) {
        List<Integer> nums = new ArrayList<>();
        Random r = new Random();
        for(int i=0; i<10000; i++) nums.add(1000000 + r.nextInt(1000000));
        
        //System.out.println(nums);
        
        long start = System.currentTimeMillis();
        nums.forEach(v->isPrime(v));
        long end = System.currentTimeMillis();
        System.out.println(end - start);
        
        //使用parallel stream api
        
        start = System.currentTimeMillis();
        nums.parallelStream().forEach(T14_ParallelStreamAPI::isPrime);
        end = System.currentTimeMillis();
        
        System.out.println(end - start);
    }
    
    static boolean isPrime(int num) {
        for(int i=2; i<=num/2; i++) {
            if(num % i == 0) return false;
        }
        return true;
    }
}
View Code

console:

1526
337
View Code
paralleStream(): 運用多線程,如果把這1萬個數看成是數據流,我們用多線程去訪問里面的數,共同來做計算,默認使用多線程。
 
 
 
 
 
 
 
---------------


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM