jdk中的簡單並發,需要掌握


前言

  開心一刻

    小時候有一次爸爸帶我去偷村頭別人家的梨子,我上樹摘,爸爸在下面放風,正摘着主人來了,爸爸指着我破口大罵:臭小子,趕緊給我滾下來,敢偷吃別人家梨子,看我不打死你。主人家趕緊說:沒事沒事,小孩子淘氣嘛,多摘點回家吃。我……這坑兒子的爹...

純正的海豹突擊隊

  路漫漫其修遠兮,吾將上下而求索!

  github:https://github.com/youzhibing

  碼雲(gitee):https://gitee.com/youzhibing

Runnable

  如果是簡單的實現一個線程,我們會通過實現Runnable接口或繼承Thread類來完成。JDK1.0中就已經存在Runnable和Thread,Thread實現了Runnable接口。Runnable使用方式一般如下

public class RunnableTest {

    public static void main(String[] args) {
        // Java 8之前:
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("Before Java8, 我是子線程1");
            }
        }).start();

        //Java 8方式:
        new Thread( () -> {
            System.out.println("In Java8");
            System.out.println("我是子線程2");
        } ).start();

    }
}
View Code

  一般我們的線程不是以匿名內部類的方式存在的,而是以如下方式存在

public class RunnableTest {

    public static void main(String[] args) throws InterruptedException {
        Runnable myRunnable = new MyRunnable();
        new Thread(myRunnable).start();
        Thread.sleep(1000);
        System.out.println("我是主線程");
    }
}
class MyRunnable implements Runnable {

    @Override
    public void run() {
        System.out.println("我是子線程1");
    }
}
View Code

  當然線程的實現方式還有Thread類,Thread實現了Runnable接口,本質還是一樣;無論是Runnable,還是Thread,實現的線程有一個很明顯的缺點,就是沒有返回值,執行完任務之后無法獲取執行結果。

Callable

  Callable接口是JDK1.5中引入的,和Runnable類似,都是用來實現多線程,不同的是,Callable能返回結果和拋出checked exception。源代碼如下

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

  可以看到,Callable是一個泛型接口,call()函數返回的類型就是傳遞進來的泛型類型,也是返回的結果類型。那么怎么使用Callable呢?一般情況下是配合ExecutorService來使用的,而ExecutorService的創建又是用Executors來完成的。

線程池

  Executors

    也是JDK1.5新增內容,是創建ExecutorService、ScheduledExecutorService、ThreadFactory和Callable的工廠,並提供了一些有效的工具方法。有很多創建ExecutorService的方法

    主要分為6類方法,每一類都兩兩重載,一個有ThreadFactory threadFactory參數,一個沒有ThreadFactory threadFactory參數,也就是我們可以自定義ThreadFactory來定制Thread;若沒有ThreadFactory參數,則使用默認的DefaultThreadFactory來構建Thread。6類方法如下

      newCachedThreadPool(...)

        創建一個可緩存的線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程;返回類型是:ThreadPoolExecutor。

      newFixedThreadPool(...)

        創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待;返回類型是:ThreadPoolExecutor。

      newScheduledThreadPool(...)

        創建一個定長線程池,支持定時及周期性任務執行;返回類型是:ScheduledThreadPoolExecutor。多數情況下可用來替代Timer類。

      newSingleThreadExecutor(...)

        創建一個單線程化的線程池,只有唯一的一個工作線程來執行任務,保證所有任務按照指定順序執行;返回類型是:ThreadPoolExecutor的代理,我們可以認為就是ThreadPoolExecutor。

      newSingleThreadScheduledExcutor(...)

        創建一個單線程化的線程池,與newSingleThreadExecutor類似,但支持定時及周期性任務執行;返回類型是:ScheduledThreadPoolExecutor。

      newWorkStealingPool(...)

        創建持有足夠線程的線程池來支持給定的並行級別,並通過使用多個隊列,減少競爭;它需要穿一個並行級別的參數,如果不傳,則被設定為默認的CPU數量。JDK1.8中新增,返回類型是:ForkJoinPool。ForkJoinFool通常配合ForkJoinTask的子類RecursiveAction或RecursiveTask使用。

    常用的主要是以下3類:newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool。至於newWorkStealingPool,我還沒用過,不太好評論。

  ExecutorService

    ExecutorService是一個interface,繼承了Executor,是Java中對線程池定義的一個接口,類圖如下:

    ExecutorService接口中常用方法如下

void execute(Runnable command);    // 從Executor繼承而來,用來執行Runnale,沒有返回值
<T> Future<T> submit(Callable<T> task);    // 執行Callable類型的task,並返回Future
<T> Future<T> submit(Runnable task, T result);    // 這種方式很少使用
Future<?> submit(Runnable task);    // 執行Runnable類型的task,並返回Future

    當然還有invokeAll、invokeAny,感興趣的可以去看下。關於Future,下面會講到。

    當我們使用完成ExecutorService之后應該關閉它,否則它里面的線程會一直處於運行狀態,導致應用無法停止。關閉ExecutorService的方式有兩種,其一是ExecutorService.shutdown()方法,在調用shutdown()方法之后,ExecutorService不會立即關閉,但是它不再接收新的任務,直到當前所有線程執行完成才會關閉,所有在shutdown()執行之前提交的任務都會被執行;其二是調用ExecutorService.shutdownNow()方法,它將跳過所有正在執行的任務和被提交還沒有執行的任務,但是它並不對正在執行的任務做任何保證,有可能它們都會停止,也有可能執行完成。一般推薦的關閉方式是ExecutorService.shutdown()。

  Future

    對具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果。相關類圖如下

public interface Future<V> {

    /**
     * 取消任務,如果取消任務成功則返回true,如果取消任務失敗則返回false
     * @param mayInterruptIfRunning 是否允許取消正在執行卻沒有執行完畢的任務,如果設置true,則表示可以取消正在執行過程中的任務。
     *         如果任務已經完成,則無論mayInterruptIfRunning為true還是false,此方法肯定返回false,即如果取消已經完成的任務會返回false。
     *         如果任務正在執行,若mayInterruptIfRunning設置為true,則返回true,若mayInterruptIfRunning設置為false,則返回false。
     *         如果任務還沒有執行,則無論mayInterruptIfRunning為true還是false,肯定返回true。
     * @return
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 任務是否被取消成功,如果在任務正常完成前被取消成功,則返回true。
     * @return
     */
    boolean isCancelled();

    /**
     * 任務是否已經完成,若完成則返回true。
     * @return
     */
    boolean isDone();

    /**
     * 獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回。
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 獲取執行結果,如果在指定時間內沒獲取到結果,就直接返回null
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException
     */
    V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}
View Code

    從如上代碼可以看出Future提供了三種功能:

      1、判斷任務是否完成;2、中斷任務;3、獲取任務執行結果。

線程池使用示例

  Runnable使用示例

    示例一,定時周期的執行某個任務

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class RunnableTest {

    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        MyRunnable myRunnable = new MyRunnable();
        // 應用啟動3秒開始執行myRunnable,之后每隔5秒執行一次; scheduleAtFixedRate是有返回值的,配合Runnable的話,我們不關注返回值
        executorService.scheduleAtFixedRate(myRunnable, 3, 5, TimeUnit.SECONDS);
        System.out.println("我是主線程...");
    }
}
class MyRunnable implements Runnable {

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("我是子線程1");
    }
}
View Code

    示例二,單線程化的線程池執行某個任務,並顯示的關閉線程池

public class RunnableTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        MyRunnable myRunnable = new MyRunnable();

        // 為了可取消性而使用Future但又不提供可用的結果,則可以聲明 Future<?> 形式類型、並返回null作為底層任務的結果
        Future<?> submit = executorService.submit(myRunnable);

        // 如果不shutdown,那它里面的線程會一直處於運行狀態,應用不會停止
        executorService.shutdown();

        // 輸出任務執行結果,由於Runnable沒有返回值,所以get的是null
        System.out.println(submit.get(4, TimeUnit.SECONDS));
        System.out.println("我是主線程...");
    }
}
class MyRunnable implements Runnable {

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("我是子線程1");
    }
}
View Code

  Callable使用示例

    示例一,Callable + Future獲取結果;采用緩存線程池執行任務

import java.util.concurrent.*;
import java.util.concurrent.Future;

public class CallableTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<String> result = executorService.submit(task);
        executorService.shutdown();

        Thread.sleep(1000);
        System.out.println("我是主線程, 執行另外的業務...");

        System.out.println("task執行結果:" + result.get());

        System.out.println("任務全部執行完畢...");
    }
}
class Task implements Callable<String> {

    @Override
    public String call() throws Exception {
        System.out.println("子線程, 業務處理中...");
        Thread.sleep(2000);
        return "業務執行成功";
    }
}
View Code

    示例二,Callable + FutureTask獲取結果;采用定長線程池執行定時任務

import java.util.concurrent.*;

public class CallableTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<String> futureTask = new FutureTask<>(task);
        executorService.submit(futureTask);
        executorService.shutdown();

        Thread.sleep(1000);
        System.out.println("我是主線程, 執行另外的業務...");

        System.out.println("task執行結果:" + futureTask.get(5, TimeUnit.SECONDS));

        System.out.println("任務全部執行完畢...");
    }
}
class Task implements Callable<String> {

    @Override
    public String call() throws Exception {
        System.out.println("子線程, 業務處理中...");
        Thread.sleep(2000);
        return "業務執行成功";
    }
}
View Code

shiro中session驗證定時任務

  shiro源碼篇 - shiro的session的查詢、刷新、過期與刪除,你值得擁有中講到了session驗證定時任務,我們AbstractValidatingSessionManager中createSession方法開始

  可以看到,調用Executors.newSingleThreadScheduledExcutor(ThreadFactory threadFactory)方法創建了一個支持定時及周期性執行的單線程化線程池,支持定時及周期性地執行task,並且線程池中只有一個線程。ExecutorServiceSessionValidationScheduler本身就是一個Runnable,那么會定時、周期性的執行其run()。說的簡單點就是:應用啟動60分鍾后,單線程化的線程池中的單個線程開始執行ExecutorServiceSessionValidationScheduler的run()方法,之后每隔60分鍾執行一次,60分鍾是默認設置;ExecutorServiceSessionValidationScheduler的run()中,會調用sessionManager的validateSessions()方法完成session的驗證。

總結

  1、無需返回結果,簡單的線程實現可以用Runnable(或Thread);需要返回結果的、稍復雜的線程實現可以用Callable;如果線程操作頻繁、需要連接池管理的可以考慮用ExecutorService來實現線程池;更復雜的任務調度,則可以用三方工具,比如:quartz,更多三方調度工具可查閱spring-boot-2.0.3之quartz集成,不是你想的那樣哦!,具體選擇哪個,需要結合我們的具體業務來考慮,沒有絕對的選擇誰而不選擇誰,就看誰更契合;

  2、一般情況下,Callable(或Runnale)、Executors、ExecutorService、Future會配合來使用,很多時候我們不需要返回值,則可以不關注Future;推薦使用線程池的方式,有與數據庫連接池類似的優點;

  3、很多三方的框架、工具都沿用了jdk的線程池實現,而沒有引用第三方調度工具,例如shiro中,session的驗證定時任務就是沿用的jdk中的Executors.newSingleThreadScheduledExcutor(ThreadFactory threadFactory)來創建的線程池;

  4、jdk中的線程還有很多內容,本文只是涉及到了冰山一角,更深入的學習有待大家自行去進行。

參考

  Java 8 教程匯總

  Java並發編程:Callable、Future和FutureTask

  深入理解 Java 線程池:ThreadPoolExecutor


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM