JAVA 多線程(11):阻塞隊列與線程池


說線程池必須說隊列,因為線程池跟隊列有着莫大的關系

 

一、阻塞隊列(7個):
數組阻塞隊列、鏈表阻塞隊列、優先級排序隊列,還有對應的無界阻塞隊列,另外還有雙向阻塞隊列,排序規則分為先進先出FIFO 與先進后出LIFO兩種。

對於阻塞隊列,針對插入與移除有有4種操作方式。如下:

方法 拋出異常 返回特殊值 一直阻塞 超時退出
插入 add(e) offer put offer(e,time,unit)
移除 remove poll take poll(time,unit)
檢查 element peek 不可用 不可用

 

測試(有界隊列):

1.拋出異常:

 

 輸出:

 

第一次正常:

 

 輸出:

 

 第二次隊列設置長度為1,add方法調用了2次,結果拋出異常。

 

2.返回特殊值

 

 輸出:

由結果看出,使用offer方法不會拋出異常,在添加元素時如果隊列已滿,返回失敗標識,當使用移除方法poll時,如果隊列已空,則會返回null

 

 

3.一直阻塞:put/take

輸出:

 

 由結果看出,因為有界隊列為長度為1,主線程執行了2次put方法,第二次因為隊列已滿,所以會一直阻塞當前線程知道隊列不滿時才會繼續執行,修改一下,如下:

 

 

 

輸出:

 

 

 執行完畢,由結果看出,上面代碼執行開始執行后,主線程與子線程亂序輸出,但是可以看出當主線程執行第二次put方法后會等待子線程take后,主線程再執行take,也就是說當線程滿后put方法會導致當前線程阻塞,當線程空了也會當作當前線程阻塞,我們可以再原先的代碼上再多加一句。如下:

 

 輸出:

首先主線程等待子線程先執行,子線程首先執行take方法,因為隊列中沒有元素,所以子線程等待,2秒后主線程開始執行,第一次put后,子線程發現隊列中有元素了,所以不再阻塞,進入runable狀態,接着線程再執行take 方法。結束。

總之,關於上面3類方法,各有特色。(當然也有可能主線程會先執行take方法,但是這種可能性為0,想一想,因為put方法是連續執行的,再執行第二個put時,因為隊列已滿,所以必須要等待隊列非滿時,主線程才不會阻塞)

 

 

 

 

1.ArrayBlockingQueue   數組有界阻塞隊列FIFO

按照阻塞的先后順序訪問隊列,默認情況下不保證線程公平的訪問隊列~如果要保證公平性,會降低一定的吞吐量,源碼如下:

 

 2.LinkedBlockingQueue

鏈表有界阻塞隊列,默認最大長度為Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。

3.PriorityBlockingQueue

優先級隊列,可以自定義排序方法,但是對於同級元素不能保證順序

4.DelayQueue

延遲獲取元素隊列,指定時間后獲取,為無界阻塞隊列。

5.SynchronousQueue

不存儲元素的阻塞隊列。每一個put操作必須訂單tabke 操作,否則不能繼續添加元素。

6.LinkedTransfetQueue

無界阻塞隊列,多了tryTransfer 和transfet方法

7.LinkedBlockingQueue

鏈表結構組成的雙向阻塞隊列。可以從隊列的兩端插入和移除元素。

 

另外:非阻塞算法的安全隊列- ConcurrentLinkedQueue(CAS 實現 compare and swap)

首先它是一個基礎鏈接節點的無界的線程安全隊列,有head和tail組成~沒了,個人對線程理解比較淺,有興趣的小伙伴可以研究一下~

 

二、線程池

使用線程池的好處:

1.降低資源消耗:重復利用已創建的線程降低線程創建和銷毀造成的消耗

2.提高響應速度:任務到達時,任務不需要等待線程創建

3.提高線程的可管理性:可以對線程統一分配、調優和監控。

 

關於線程池的實現原理:

1.判斷核心線程線程是否都在執行任務,如不是,則創建一個工作線程來執行任務,否則進入2

2.判斷隊列是不是滿了,如果沒有則提交任務到工作隊列,否則進入3

3.判斷線程池是否都在工作,如果沒有則創建一個新的工作線程執行任務。否則,交給飽和策略來處理這個任務。

 

線程池中創建新線程執行任務的時候需要獲取全局鎖,所以java在執行任務的時候,如果在第二個步驟盡量進入到隊列中,因為其不需要獲取全局鎖,在執行execute方法時,工作線程在執行完任務后會從隊列中獲取工作任務執行。

 

構造方法:

參數:

 

1. corePoolSize:線程池基本大小,當提交一個新任務到工作線程時,線程池會創建一個線程來執行任務,即使空閑的基本線程能夠執行新任務也會創建新線程,等到任務數大於線程基本大小時就不再創建,如果

調用prestartAllCoreThreads方法,線程池會提前創建並啟動所有基本線程

2.maximumPoolSize:線程池中允許的最大線程數。如果核心線程池滿了,會丟到隊列中,如果隊列也滿了(這里說的是有界隊列),會創建一個新的線程執行這個工作任務。打個比方:過節回家的時候,高速和國道,假設高速允許100輛車同時上高速,

允許有100輛車可以在入口等待,國道允許200輛車通行,那么也就是說允許同時出現100+100+200 輛車,也就是400輛,這就是最大線程數(不是很貼切,理解就好)

3.keeyAliveTime:線程活動保持時間,線程空閑后,保持的時間。如果超過時間則銷毀線程。

4.TimeUnit:時間單位

5.阻塞隊列:用於保存等待執行任務的阻塞隊列,數組有界隊列、鏈表隊列、同步隊列、優先級無界隊列。newFixedThreadPool 使用的是鏈表隊列。

6.ThreadFactory:創建線程的工廠。

7.RejectedRxecutionHandler:飽和策略。也就是說當超過了最大線程池數量,那么會執行飽和策略。以2中的例子接着說,如果車輛超過了400輛,那么從第401輛車開始需要給出另外的解決方法,可以是在國道入口排隊,或則直接讓你坐火車去~

 

 

提交任務的方法:

1.execute:不返回值

2.submit:返回值(future類型對象)

 

如下:

public static void main(String[] args) {
        // 創建線程池
        ExecutorService executor = Executors.newFixedThreadPool(10);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());

            }
        };
        Callable callable = new Callable() {
            @Override
            public Object call() throws Exception {
                return "hi 我是返回值";
            }
        };
        executor.execute(runnable);

        Future future = executor.submit(callable);
        try {
            Object o = future.get();
            System.out.println(o);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        executor.shutdown();
    }

輸出結果:

future 是一個接口類,其實現類是FutureTask該類源碼如下:

 

 

 

Callable 類似Runnable 但是它會有返回值。

 

 

關於關閉線程:

推薦使用shutdown 而不是shutdownNiw,shutdown 會遍歷線程池中的線程,並逐個關閉其狀態,對於正在執行線程會等待其執行結束(interrupt),shutdownNow類似立即停止,所以~推薦使用shutdown就好,當然如果有特殊情況除外。

 

監控:

通過繼承線程池可以自定義線程池,重寫beforeExecute、afterExecute、terminated方法,在開始前、執行后、中止的時候調用。如:

static class TestCustomMyPool extends ThreadPoolExecutor {
        public TestCustomMyPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            System.out.println("開始執行");
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            System.out.println("執行結束");
        }

        @Override
        protected void terminated() {
            super.terminated();
            System.out.println("中止");
        }
    }

    public static void main(String[] args) {
        TestCustomMyPool testCustomMyPool = new TestCustomMyPool(10, 10, 0L, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>());
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("正在執行線程");
            }
        };
        testCustomMyPool.execute(runnable);

        testCustomMyPool.shutdown();
    }

輸出:

由結果看出,線程在執行前后執行了重寫的方法。

 

另外,除了這些,線程池還有其他一些方法可以來監控線程池的情況,如:

public static void main(String[] args) {
        TestCustomMyPool testCustomMyPool = new TestCustomMyPool(10, 10, 0L, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>());
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("正在執行線程");
            }
        };
        testCustomMyPool.execute(runnable);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 需要執行的任務數量
        System.out.println("需要執行的任務數量:"+testCustomMyPool.getTaskCount());
        // 已完成的任務數量
        System.out.println("已完成的任務數量:"+testCustomMyPool.getCompletedTaskCount());
        // 創建最大線程數量
        System.out.println("創建最大線程數量:"+testCustomMyPool.getLargestPoolSize());
        // 線程池的線程數量
        System.out.println("線程池的線程數量:"+testCustomMyPool.getPoolSize());
        // 線程池活動的線程數
        System.out.println("線程池活動的線程數:"+testCustomMyPool.getActiveCount());

        testCustomMyPool.shutdown();
    }

 

輸出:

 

關於future,這里有個參考鏈接,有興趣的小伙伴也可以參考這位博主的文章 https://www.cnblogs.com/dolphin0520/p/3949310.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM