一個線程池同時執行多個線程


一.CountdownLatch和CyclicBarrier的區別'

CountdownLatch和CyclicBarrier都屬於線程同步的工具。

CyclicBarrier對象時傳入了一個方法,當調用CyclicBarrier的await方法后,當前線程會被阻塞等到所有線程都調用了await方法后 調用傳入CyclicBarrier的方法,然后讓所有的被阻塞的線程一起運行

CountdownLatch可以當做一個計數器來使用,比如某線程需要等待其他幾個線程都執行過某個時間節點后才能繼續執行
countDown方法使計數器-1

1,CountdownLatch適用於所有線程通過某一點后通知方法,而CyclicBarrier則適合讓所有線程在同一點同時執行

2,CountdownLatch利用繼承AQS的共享鎖來進行線程的通知,利用CAS來進行--,而CyclicBarrier則利用ReentrantLock的Condition來阻塞和通知線程

二.ThreadPoolExecutor的參數:

corePoolSize:指定了線程池中的線程數量,它的數量決定了添加的任務是開辟新的線程去執行,還是放到workQueue任務隊列中去;

maximumPoolSize:指定了線程池中的最大線程數量,這個參數會根據你使用的workQueue任務隊列的類型,決定線程池會開辟的最大線程數量;

keepAliveTime:當線程池中空閑線程數量超過corePoolSize時,多余的線程會在多長時間內被銷毀;

unit:keepAliveTime的單位

workQueue:任務隊列,被添加到線程池中,但尚未被執行的任務;它一般分為直接提交隊列(SynchronousQueue)、有界任務隊列(ArrayBlockingQueue)、無界任務隊列(LinkedBlockingQueue)、優先任務隊列(PriorityBlockingQueue)幾種;

1、直接提交隊列:設置為SynchronousQueue隊列,SynchronousQueue是一個特殊的BlockingQueue,它沒有容量,沒執行一個插入操作就會阻塞,需要再執行一個刪除操作才會被喚醒,反之每一個刪除操作也都要等待對應的插入操作。

復制代碼
public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //maximumPoolSize設置為2 ,拒絕策略為AbortPolic策略,直接拋出異常
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        for(int i=0;i<3;i++) {
            pool.execute(new ThreadTask());
        }   
    }
}

public class ThreadTask implements Runnable{
    
    public ThreadTask() {
        
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}
復制代碼

輸出結果為

復制代碼
pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.hhxx.test.ThreadTask@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
    at com.hhxx.test.ThreadPool.main(ThreadPool.java:17)
復制代碼

可以看到,當任務隊列為SynchronousQueue,創建的線程數大於maximumPoolSize時,直接執行了拒絕策略拋出異常。

使用SynchronousQueue隊列,提交的任務不會被保存,總是會馬上提交執行。如果用於執行任務的線程數量小於maximumPoolSize,則嘗試創建新的進程,如果達到maximumPoolSize設置的最大值,則根據你設置的handler執行拒絕策略。因此這種方式你提交的任務不會被緩存起來,而是會被馬上執行,在這種情況下,你需要對你程序的並發量有個准確的評估,才能設置合適的maximumPoolSize數量,否則很容易就會執行拒絕策略;

2、有界的任務隊列:有界的任務隊列可以使用ArrayBlockingQueue實現,如下所示

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用ArrayBlockingQueue有界任務隊列,若有新的任務需要執行時,線程池會創建新的線程,直到創建的線程數量達到corePoolSize時,則會將新的任務加入到等待隊列中。若等待隊列已滿,即超過ArrayBlockingQueue初始化的容量,則繼續創建線程,直到線程數量達到maximumPoolSize設置的最大線程數量,若大於maximumPoolSize,則執行拒絕策略。在這種情況下,線程數量的上限與有界任務隊列的狀態有直接關系,如果有界隊列初始容量較大或者沒有達到超負荷的狀態,線程數將一直維持在corePoolSize以下,反之當任務隊列已滿時,則會以maximumPoolSize為最大線程數上限。

3、無界的任務隊列:有界任務隊列可以使用LinkedBlockingQueue實現,如下所示

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用無界任務隊列,線程池的任務隊列可以無限制的添加新的任務,而線程池創建的最大線程數量就是你corePoolSize設置的數量,也就是說在這種情況下maximumPoolSize這個參數是無效的,哪怕你的任務隊列中緩存了很多未執行的任務,當線程池的線程數達到corePoolSize后,就不會再增加了;若后續有新的任務加入,則直接進入隊列等待,當使用這種任務隊列模式時,一定要注意你任務提交與處理之間的協調與控制,不然會出現隊列中的任務由於無法及時處理導致一直增長,直到最后資源耗盡的問題。

4、優先任務隊列:優先任務隊列通過PriorityBlockingQueue實現,下面我們通過一個例子演示下

復制代碼
public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //優先任務隊列
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
          
        for(int i=0;i<20;i++) {
            pool.execute(new ThreadTask(i));
        }    
    }
}

public class ThreadTask implements Runnable,Comparable<ThreadTask>{
    
    private int priority;
    
    public int getPriority() {
        return priority;
    }

    public void setPriority(int priority) {
        this.priority = priority;
    }

    public ThreadTask() {
        
    }
    
    public ThreadTask(int priority) {
        this.priority = priority;
    }

    //當前對象和其他對象做比較,當前優先級大就返回-1,優先級小就返回1,值越小優先級越高
    public int compareTo(ThreadTask o) {
         return  this.priority>o.priority?-1:1;
    }
    
    public void run() {
        try {
            //讓線程阻塞,使后續任務進入緩存隊列
            Thread.sleep(1000);
            System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    
    }
}
復制代碼

我們來看下執行的結果情況

復制代碼
priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1
復制代碼

大家可以看到除了第一個任務直接創建線程執行外,其他的任務都被放入了優先任務隊列,按優先級進行了重新排列執行,且線程池的線程數一直為corePoolSize,也就是只有一個。

通過運行的代碼我們可以看出PriorityBlockingQueue它其實是一個特殊的無界隊列,它其中無論添加了多少個任務,線程池創建的線程數也不會超過corePoolSize的數量,只不過其他隊列一般是按照先進先出的規則處理任務,而PriorityBlockingQueue隊列可以自定義規則根據任務的優先級順序先后執行。

    指定一個實現了BlockingQueue接口的任務等待隊列。在ThreadPoolExecutor線程池的API文檔中,一共推薦了三種等待隊列,

    它們是:SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue;

    https://blog.csdn.net/wfzczangpeng/article/details/52015866

threadFactory:線程工廠,用於創建線程,一般用默認即可;

handler:拒絕策略;當任務太多來不及處理時,如何拒絕任務;

三.ThreadPoolExecutor的兩個常用執行方法

無返回值異步調用:execute()方法,底層使用Runnable的run()
有返回值異步調用:submit()方法,底層使用Callable的call(),可以提供Future < T > 類型的返回值。 

package com.lagou;

import javax.sound.midi.Soundbank;
import java.util.concurrent.*;

public class Test {
    public static void main(String args[]) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
        ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(10,10,1, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(100));
        for (int i = 1;i < 11 ; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        cyclicBarrier.await();
                        System.out.println(Thread.currentThread()+":"+System.currentTimeMillis());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        threadPoolExecutor.shutdown();
        System.out.println("======================");
        CountDownLatch countDownLatch = new CountDownLatch(10);
        ThreadPoolExecutor threadPoolExecutor1 =  new ThreadPoolExecutor(10,10,1, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(100));
        for (int i = 1;i < 11 ; i++) {
            threadPoolExecutor1.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread()+":"+System.currentTimeMillis());
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        //關閉線程處理
        try {
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        //關閉線程池
        threadPoolExecutor1.shutdown();
    }

}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM