Java多線程之ExecutorService使用說明


一、簡介

ExecutorService是Java中對線程池定義的一個接口,它java.util.concurrent包中,在這個接口中定義了和后台任務執行相關的方法。

 

二、線程池

Java給我們提供了一個Executors工廠類,它可以幫助我們很方便的創建各種類型ExecutorService線程池,Executors一共可以創建下面這四類線程池:

  • newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。 
  • newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。 
  • newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。 
  • newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

 

三、代碼舉例

 

import org.junit.Test;
import java.util.concurrent.*;

/**
 * @author :gongxr
 * @description:ExecutorService是Java中對線程池定義的一個接口,它java.util.concurrent包中,在這個接口中定義了和后台任務執行相關的方法。 Java通過Executors提供四種創建線程池的工廠類,分別為:
 * newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
 * newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
 * newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。
 * newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
 */

public class TestExecutorService {
    /**
     * newCachedThreadPool:創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
     */
    @Test
    public void test1() throws InterruptedException {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " 開始工作!");
                    Thread.sleep((long) (Math.random() * 3000));
                    System.out.println(Thread.currentThread().getName() + " 工作結束!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 5; i++) {
            cachedThreadPool.execute(runnable);
        }
        Thread.sleep(3000);
        System.out.println("主線程結束!");
        cachedThreadPool.shutdown();
    }

    /**
     * newFixedThreadPool:創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
     */
    @Test
    public void test2() throws InterruptedException {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " 開始工作!");
                    Thread.sleep((long) (Math.random() * 3000));
                    System.out.println(Thread.currentThread().getName() + " 工作結束!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 6; i++) {
            final int index = i;
            fixedThreadPool.execute(runnable);
        }
        Thread.sleep(6000);
        fixedThreadPool.shutdown();
        System.out.println("主線程結束!");
    }

    /**
     * newScheduledThreadPool:創建一個定長線程池,支持定時及周期性任務執行。
     * 該例中延遲3秒執行。
     */
    @Test
    public void test3_1() throws InterruptedException {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("delay 3 seconds");
            }
        };
        scheduledThreadPool.schedule(runnable, 3, TimeUnit.SECONDS);
        Thread.sleep(10000);
        scheduledThreadPool.shutdown();
    }

    /**
     * newScheduledThreadPool:創建一個定長線程池,支持定時及周期性任務執行。
     * 延遲1秒后每3秒執行一次。
     */
    @Test
    public void test3_2() throws InterruptedException {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("delay 1 seconds, and excute every 3 seconds");
            }
        };
        scheduledThreadPool.scheduleAtFixedRate(runnable, 1, 3, TimeUnit.SECONDS);
        Thread.sleep(10000);
        scheduledThreadPool.shutdown();
    }

    /**
     * newSingleThreadExecutor:創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
     */
    @Test
    public void test4() throws InterruptedException {
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            final int index = i;
            singleThreadExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(index);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        Thread.sleep(10000);
        singleThreadExecutor.shutdown();
    }

    /**
     * submit(Callable)
     * submit(Callable)和submit(Runnable)類似,也會返回一個Future對象,
     * 但是除此之外,submit(Callable)接收的是一個Callable的實現,
     * Callable接口中的call()方法有一個返回值,可以返回任務的執行結果,
     * 而Runnable接口中的run()方法是void的,沒有返回值。
     */
    @Test
    public void testCallable() throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Callable callable = new Callable() {
            @Override
            public Object call() throws InterruptedException {
                String name = Thread.currentThread().getName();
                System.out.println("Thread Name:" + name + " 開始工作!");
                Thread.sleep((long) (Math.random() * 3000));
                System.out.println("Thread Name:" + name + " 結束工作!");
                return name;
            }
        };

        Future future = executorService.submit(callable);

        System.out.println("主線程繼續執行!");
        Thread.sleep(3000);

        // future.get()使得主線程阻塞,等待子線程執行結束
        System.out.println("子線程執行完回調結果:" + future.get().toString());

        System.out.println("主線程結束!");
        executorService.shutdown();
    }

    /**
     * ExecutorService的shutdown、shutdownNow、awaitTermination用法:
     * 調用shutdown方法之后,ExecutorService不再接收新的任務,直到當前所有線程執行完成才會關閉,所有之前提交的任務都會被執行。
     * 調用shutdownNow方法后,將試圖阻止所有正在執行的任務和被提交還沒有執行的任務。
     * awaitTermination方法的作用:監視各任務的狀態是否已經結束,經過指定的時間后,全部完成返回true,反之返回false。
     */
    @Test
    public void testShutDown() throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " 開始工作!");
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + " 工作結束!");
                } catch (InterruptedException e) {
                    System.out.println(Thread.currentThread().getName() + " 子線程中斷!");
                    e.printStackTrace();
                }
            }
        };
        // 線程1
        executorService.execute(runnable);
        Thread.sleep(1000);
        // 線程2
        executorService.execute(runnable);
        executorService.shutdown();
        // 等待一秒后檢查子線程運行狀態,全部完成返回true
        if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
            System.out.println("主線程強制中斷子線程!");
            executorService.shutdownNow();
        }
        Thread.sleep(4000);
        System.out.println("主線程結束!");
    }

}

 

與多線程同步工具結合使用:

    /**
     * ExecutorService實戰用法:
     * 與CountDownLatch線程同步工具結合使用
     */
    @Test
    public void testExe() throws Exception {
        int count = 5; // 任務數量
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(count);  //線程同步工具

        for (int i = 1; i <= count; i++) {
            int taskId = i;
            executorService.execute(() -> task(taskId, countDownLatch));
        }

        executorService.shutdown();  // 不再接受新任務
        countDownLatch.await();  // 等待所有任務完成
        // 等待一秒后檢查子線程運行狀態,全部完成返回true
        if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
            System.out.println("主線程強制中斷子線程!");
            executorService.shutdownNow();
        }
        System.out.println("主線程結束!");
    }

    private void task(int index, CountDownLatch countDownLatch) {
        try {
            System.out.println("任務" + index);
            System.out.println(Thread.currentThread().getName() + " 開始工作!");
            Thread.sleep((long) (Math.random() * 3000));
            System.out.println(Thread.currentThread().getName() + " 工作結束!");
            countDownLatch.countDown();
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " 子線程中斷!");
            e.printStackTrace();
        }
    }

 運行結果:

任務1
任務3
任務4
pool-1-thread-4 開始工作!
任務2
pool-1-thread-2 開始工作!
任務5
pool-1-thread-3 開始工作!
pool-1-thread-1 開始工作!
pool-1-thread-5 開始工作!
pool-1-thread-3 工作結束!
pool-1-thread-2 工作結束!
pool-1-thread-5 工作結束!
pool-1-thread-4 工作結束!
pool-1-thread-1 工作結束!
主線程結束!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM