java並發系列(八)-----java異步編程


同步計算與異步計算

從多個任務的角度來看,任務是可以串行執行的,也可以是並發執行的。從單個任務的角度來看,任務的執行方式可以是同步的,也可以是異步的。

Runnable、Callable、FutureTask

1、Runnable

先說一下java.lang.Runnable吧,它是一個接口,在它里面只聲明了一個run()方法:

public interface Runnable {
    public abstract void run();
}

由於run()方法返回值為void類型,所以在執行完任務之后無法返回任何結果。

2、Callable

Callable位於java.util.concurrent包下,它也是一個接口,在它里面也只聲明了一個方法,只不過這個方法叫做call():

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

可以看到,這是一個泛型接口,call()函數返回的類型就是傳遞進來的V類型。

那么怎么使用Callable呢?一般情況下是配合ExecutorService來使用的,在ExecutorService接口中聲明了若干個submit方法的重載版本:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

3、Future

Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果。

Future類位於java.util.concurrent包下,它是一個接口:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

在Future接口中聲明了5個方法,下面依次解釋每個方法的作用:

  • cancel方法用來取消任務,如果取消任務成功則返回true,如果取消任務失敗則返回false。參數mayInterruptIfRunning表示是否允許取消正在執行卻沒有執行完畢的任務,如果設置true,則表示可以取消正在執行過程中的任務。如果任務已經完成,則無論mayInterruptIfRunning為true還是false,此方法肯定返回false,即如果取消已經完成的任務會返回false;如果任務正在執行,若mayInterruptIfRunning設置為true,則返回true,若mayInterruptIfRunning設置為false,則返回false;如果任務還沒有執行,則無論mayInterruptIfRunning為true還是false,肯定返回true。
  • isCancelled方法表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true。
  • isDone方法表示任務是否已經完成,若任務完成,則返回true;
  • get()方法用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回;
  • get(long timeout, TimeUnit unit)用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null。

  也就是說Future提供了三種功能:

  • 1)判斷任務是否完成;
  • 2)能夠中斷任務;
  • 3)能夠獲取任務執行結果。

  因為Future只是一個接口,所以是無法直接用來創建對象使用的,因此就有了下面的FutureTask。

4、FutureTask

我們先來看一下FutureTask的實現:

public class FutureTask<V> implements RunnableFuture<V>

FutureTask類實現了RunnableFuture接口,我們看一下RunnableFuture接口的實現:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

可以看出RunnableFuture繼承了Runnable接口和Future接口,而FutureTask實現了RunnableFuture接口。所以它既可以作為Runnable被線程執行,又可以作為Future得到Callable的返回值。

FutureTask提供了2個構造器:

public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}

事實上,FutureTask是Future接口的一個唯一實現類。

示例:

public class Test {
    public static void main(String[] args) {
        //第一種方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
         
        //第二種方式,注意這種方式和第一種方式效果是類似的,只不過一個使用的是ExecutorService,一個使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主線程在執行任務");
         
        try {
            //futureTask其實是Runnable+Future的綜合體,因此可以通過futureTask.get()獲取執行結果
            System.out.println("task運行結果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任務執行完畢");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子線程在進行計算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}    

java Executor框架

Runnable接口和Callable接口是對任務處理邏輯的抽象,不管是什么樣的任務,其處理邏輯總是展現為一個具有統一簽名的方法——Runnable.run()或者Callable.call()。二者區別如下:

  1. 方法簽名不同,void Runnable.run()V Callable.call() throws Exception
  2. 是否允許有返回值,Callable允許有返回值
  3. 是否允許拋出異常,Callable允許拋出異常。

java.util.concurrent.Executor接口則是對任務的執行進行的抽象,接口定義了如下方法:

void execute(Runnable command)

ExecutorService在Executor的基礎上增加了“service”特性的方法:

  • shutdown()、shutdownNow():都是關閉當前service服務,釋放Executor的所有資源(參見實現類);它所觸發的動作就是取消隊列中任務的執行。shutdown是一種“友好”的關閉,它將不再(事實上是不能)接受新的任務提交,同時把已經提交到隊列中的任務執行完畢。shutdownNow更加直接一些,它將會把尚未執行的任務不再執行,正在執行的任務,通過“線程中斷”(thread.interrupt),如果線程無法響應“中斷”,那么將不會通過此方式被立即結束。shutdowNow是個有返回類型的方法,它返回那些等待執行的任務列表(List<Runnable>)
  • isShutdown:程序是否已經關閉,1)方法將導致其返回true。
  • isTerminated:是否已經結束,如果關閉后,所有的任務都執行完成,將返回true,否則其他情況均返回false。
  • awaitTermination(timeout):會拋出interruptException,此方法就是個廢柴,大概意思是等待一段之間直到“任務全部結束”,如果超時就返回false。
  • Future submit(callable/runnale):向Executor提交任務,並返回一個結果未定的Future。
  • List<Future> invokeAll(Collection<Callable>):一個廢柴方法,同步的方法,執行所有的任務列表,當所有任務都執行完成后,返回Future列表。這方法有啥用??貌似,可以對一批任務進行批量跟蹤。此方法會拋出interruptException。
  • T invokeAny(Collection<Callable>): 任務集合中,任何一個任務完成就返回。

這些方法都會被ExecutorService的子類實現,其實Executor的子類的實現原理,才是最有意義的。其實基於Executor接口自己也能創造世界。

Executors

java中提供了Executors工具類,能夠返回默認線程工廠、能夠將runnable實例轉換為callable實例。

方法名 功能
newFixedThreadPool(int nThreads) 創建固定大小的線程池
newSingleThreadExecutor() 創建只有一個線程的線程池
newCachedThreadPool() 創建一個不限線程數上限的線程池,任何提交的任務都將立即執行

不過阿里開發手冊中禁止使用這種方式去創建線程池,需要使用ThreadPoolExecutor,這個后面介紹。

1、newCachedThreadPool 

創建一個可緩存線程池,應用中存在的線程數可以無限大,示例代碼如下:

package com.alivn.sockets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Threadpools {

    /**
     * 我們獲取四次次線程,觀察4個線程地址
     * @param args
     */
    public static  void main(String[]args)
    {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        System.out.println("****************************newCachedThreadPool*******************************");
        for(int i=0;i<4;i++)
        {
            final int index=i;
     //submit本質上還是調用execute(),submit()方法可以有返回結果future newCachedThreadPool.submit(
new ThreadForpools(index)); } } }
package com.alivn.sockets;

public class ThreadForpools implements Runnable{

    private Integer index;
    public  ThreadForpools(Integer index)
    {
     this.index=index;
    }
    @Override
    public void run() {
        /***
         * 業務......省略
          */
        try {
            System.out.println("開始處理線程!!!");
            Thread.sleep(index*100);
            System.out.println("我的線程標識是:" + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出結果是:可以有無限大的線程數進來(線程地址不一樣)

 

2、newFixedThreadPool

創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。示例代碼如下:

package com.alivn.sockets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Threadpools {

    /**
     * 我們獲取四次次線程,觀察4個線程地址
     * @param args
     */
    public static  void main(String[]args)
    {
        //線程池允許同時存在兩個線程
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        System.out.println("****************************newFixedThreadPool*******************************");
        for(int i=0;i<4;i++)
        {
            final int index=i;
            newFixedThreadPool.submit(new ThreadForpools(index));
        }
    }
}

輸出結果:每次只有兩個線程在處理,當第一個線程執行完畢后,新的線程進來開始處理(線程地址不一樣)

 

3、newScheduledThreadPool

 

package com.ty.thread;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Threadpools {
    /**
     * 我們獲取四次次線程,觀察4個線程地址
     * @param args
     */
    public static  void main(String[]args)
    {
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2);
        System.out.println("****************************newScheduledThreadPool*******************************");
        for(int i=0;i<4;i++)
        {
            final int index=i;
            //延遲三秒執行
            newScheduledThreadPool.schedule(new ThreadForpools(index), 3, TimeUnit.SECONDS);
        }
    }
}

執行結果:延遲三秒之后執行,除了延遲執行之外和newFixedThreadPool基本相同,可以用來執行定時任務

4、newSingleThreadExecutor

 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。示例代碼如下:

package com.ty.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Threadpools {
    /**
     * 我們獲取四次次線程,觀察4個線程地址
     * @param args
     */
    public static  void main(String[]args)
    {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        System.out.println("****************************newSingleThreadExecutor*******************************");
        for(int i=0;i<4;i++)
        {
            final int index=i;
            newSingleThreadExecutor.submit(new ThreadForpools(index));
        }
    }
}

執行結果:只存在一個線程,順序執行

ThreadPoolExecutor

Executors中創建線程池的快捷方法,實際上是調用了ThreadPoolExecutor的構造方法(定時任務使用的是ScheduledThreadPoolExecutor),該類構造方法參數列表如下:

// Java線程池的完整構造函數
public ThreadPoolExecutor(
  int corePoolSize, // 線程池長期維持的線程數,即使線程處於Idle狀態,也不會回收。
  int maximumPoolSize, // 線程數的上限
  long keepAliveTime, TimeUnit unit, // 超過corePoolSize的線程的idle時長,超過這個時間,多余的線程會被回收。
  BlockingQueue<Runnable> workQueue, // 任務的排隊隊列
  ThreadFactory threadFactory, // 新線程的產生方式
  RejectedExecutionHandler handler) // 拒絕策略

這些參數中,比較容易引起問題的有corePoolSize, maximumPoolSize, workQueue以及handler:

  • corePoolSizemaximumPoolSize設置不當會影響效率,甚至耗盡線程;
  • workQueue設置不當容易導致OOM;
  • handler設置不當會導致提交任務時拋出異常。

1、線程池的工作順序

corePoolSize -> 任務隊列 -> maximumPoolSize -> 拒絕策略

2、提交任務的方式:

提交方式 是否關心返回結果
Future<T> submit(Callable<T> task)
void execute(Runnable command)
Future<?> submit(Runnable task) 否,雖然返回Future,但是其get()方法總是返回null

3、如何正確使用線程池

3.1避免使用無界隊列

不要使用Executors.newXXXThreadPool()快捷方法創建線程池,因為這種方式會使用無界的任務隊列,為避免OOM,我們應該使用ThreadPoolExecutor的構造方法手動指定隊列的最大長度:

ExecutorService executorService = new ThreadPoolExecutor(2, 2, 
                0, TimeUnit.SECONDS, 
                new ArrayBlockingQueue<>(512), // 使用有界隊列,避免OOM
                new ThreadPoolExecutor.DiscardPolicy());

3.2明確拒絕任務時的行為

任務隊列總有占滿的時候,這是再submit()提交新的任務會怎么樣呢?RejectedExecutionHandler接口為我們提供了控制方式,接口定義如下:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

線程池給我們提供了幾種常見的拒絕策略:

拒絕策略 拒絕行為
AbortPolicy 拋出RejectedExecutionException
DiscardPolicy 什么也不做,直接忽略
DiscardOldestPolicy 丟棄執行隊列中最老的任務,嘗試為當前提交的任務騰出位置
CallerRunsPolicy 直接由提交任務者執行這個任務

線程池默認的拒絕行為是AbortPolicy,也就是拋出RejectedExecutionException異常,該異常是非受檢異常,很容易忘記捕獲。如果不關心任務被拒絕的事件,可以將拒絕策略設置成DiscardPolicy,這樣多余的任務會悄悄的被忽略。

ExecutorService executorService = new ThreadPoolExecutor(2, 2, 
                0, TimeUnit.SECONDS, 
                new ArrayBlockingQueue<>(512), 
                new ThreadPoolExecutor.DiscardPolicy());// 指定拒絕策略

3.3獲取處理結果和異常

線程池的處理結果、以及處理過程中的異常都被包裝到Future中,並在調用Future.get()方法時獲取,執行過程中的異常會被包裝成ExecutionExceptionsubmit()方法本身不會傳遞結果和任務執行過程中的異常。獲取執行結果的代碼可以這樣寫:

ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<Object> future = executorService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            throw new RuntimeException("exception in call~");// 該異常會在調用Future.get()時傳遞給調用者
        }
    });
     
try {
  Object result = future.get();
} catch (InterruptedException e) {
  // interrupt
} catch (ExecutionException e) {
  // exception in Callable.call()
  e.printStackTrace();
}

上述代碼輸出類似如下:

CompletionService異步任務的批量執行

普通情況下,我們使用Runnable作為主要的任務表示形式,可是Runnable是一種有非常大局限的抽象,run方法中僅僅能記錄日志,打印,或者把數據匯總入某個容器(一方面內存消耗大,還有一方面須要控制同步,效率非常大的限制),總之不能返回運行的結果;比方同一時候1000個任務去網絡上抓取數據,然后將抓取到的數據進行處理(處理方式不定),我認為最好的方式就是提供回調接口,把處理的方式最為回調傳進去;可是如今我們有了更好的方式實現:CompletionService + Callable
Callable的call方法能夠返回運行的結果;
CompletionService將Executor(線程池)和BlockingQueue(堵塞隊列)結合在一起,同一時候使用Callable作為任務的基本單元,整個過程就是生產者不斷把Callable任務放入堵塞對了,Executor作為消費者不斷把任務取出來運行,並返回結果;

通過一個示例來明確CompletionService的作用,

1、不使用CompletionService

任務類

package net.aty.completeservice;  
  
import java.util.concurrent.Callable;  
import java.util.concurrent.TimeUnit;  
  
public class ReturnAfterSleepCallable implements Callable<Integer>  
{  
    private int sleepSeconds;  
  
    private int returnValue;  
  
    public ReturnAfterSleepCallable(int sleepSeconds, int returnValue)  
    {  
        this.sleepSeconds = sleepSeconds;  
        this.returnValue = returnValue;  
    }  
  
    @Override  
    public Integer call() throws Exception  
    {  
        System.out.println("begin to execute.");  
  
        TimeUnit.SECONDS.sleep(sleepSeconds);  
  
        System.out.println("end to execute.");  
  
        return returnValue;  
    }  
}  

通過一個List來保存每個任務返回的Future,然后輪詢這些Future,直到每個Future都已完成。我們不希望出現因為排在前面的任務阻塞導致后面先完成的任務的結果沒有及時獲取的情況,所以在調用get方式時,需要將超時時間設置為0。

package net.aty.completeservice;  
  
import java.util.ArrayList;  
import java.util.List;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.Future;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.TimeoutException;  
  
public class TraditionalTest  
{  
    public static void main(String[] args)  
    {  
        int taskSize = 5;  
  
        ExecutorService executor = Executors.newFixedThreadPool(taskSize);  
  
        List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();  
  
        for (int i = 1; i <= taskSize; i++)  
        {  
            int sleep = taskSize - i; // 睡眠時間  
  
            int value = i; // 返回結果  
  
            // 向線程池提交任務  
            Future<Integer> future = executor.submit(new ReturnAfterSleepCallable(sleep, value));  
              
            // 保留每個任務的Future  
            futureList.add(future);  
        }  
          
        // 輪詢,獲取完成任務的返回結果  
        while(taskSize > 0)  
        {  
            for (Future<Integer> future : futureList)  
            {  
                Integer result = null;  
                  
                try  
                {  
                    result = future.get(0, TimeUnit.SECONDS);  
                } catch (InterruptedException e)  
                {  
                    e.printStackTrace();  
                } catch (ExecutionException e)  
                {  
                    e.printStackTrace();  
                } catch (TimeoutException e)  
                {  
                    // 超時異常需要忽略,因為我們設置了等待時間為0,只要任務沒有完成,就會報該異常  
                }  
                  
                // 任務已經完成  
                if(result != null)  
                {  
                    System.out.println("result=" + result);  
                      
                    // 從future列表中刪除已經完成的任務  
                    futureList.remove(future);    
                    taskSize--;  
                    //此處必須break,否則會拋出並發修改異常。(也可以通過將futureList聲明為CopyOnWriteArrayList類型解決)   
                    break; // 進行下一次while循環  
                }  
            }  
        }  
          
        // 所有任務已經完成,關閉線程池  
        System.out.println("all over.");  
        executor.shutdown();  
    }  
}  

可見輪詢future列表非常的復雜,而且還有很多異常需要處理,TimeOutException異常需要忽略;還要通過雙重循環和break,防止遍歷集合的過程中,出現並發修改異常。另外就是ExcutorService里面任務結果Future也得按照入隊列的順序來取,比如任務一先進隊列,那么取Future的時候也是先取到任務一的結果。那么如果十個任務中,第一個執行時間最長,后面的不能先取出,這也是一個問題。這么多需要考慮的細節,程序員很容易犯錯。

2、使用CompletionService

package net.aty.completeservice;  
  
import java.util.concurrent.CompletionService;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.ExecutorCompletionService;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
  
public class CompletionServiceTest  
{  
    public static void main(String[] args)  
    {  
        int taskSize = 5;  
  
        ExecutorService executor = Executors.newFixedThreadPool(taskSize);  
  
        // 構建完成服務  
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(  
                executor);  
  
        for (int i = 1; i <= taskSize; i++)  
        {  
            int sleep = taskSize - i; // 睡眠時間  
  
            int value = i; // 返回結果  
  
            // 向線程池提交任務  
            completionService  
                    .submit(new ReturnAfterSleepCallable(sleep, value));  
        }  
  
        // 按照完成順序,打印結果  
        for (int i = 0; i < taskSize; i++)  
        {  
            try  
            {  
                System.out.println(completionService.take().get());  
            } catch (InterruptedException e)  
            {  
                e.printStackTrace();  
            } catch (ExecutionException e)  
            {  
                e.printStackTrace();  
            }  
        }  
  
        // 所有任務已經完成,關閉線程池  
        System.out.println("all over.");  
        executor.shutdown();  
    }  
}  

可見使用CompletionService不會有TimeOutExeception的問題,不用遍歷future列表,不用擔心並發修改異常。

3、CompletionService和ExecutorCompletionService的實現

 JDK源碼中CompletionService的javadoc說明如下:

/** 
 * A service that decouples the production of new asynchronous tasks 
 * from the consumption of the results of completed tasks.  Producers 
 * <tt>submit</tt> tasks for execution. Consumers <tt>take</tt> 
 * completed tasks and process their results in the order they 
 * complete.  
 */ 
也就是說,CompletionService實現了生產者提交任務和消費者獲取結果的解耦,生產者和消費者都不用關心任務的完成順序,由CompletionService來保證,消費者一定是按照任務完成的先后順序來獲取執行結果。

ExecutorCompletionService是CompletionService的實現,融合了線程池Executor和阻塞隊列BlockingQueue的功能。
public ExecutorCompletionService(Executor executor) {  
       if (executor == null)  
           throw new NullPointerException();  
       this.executor = executor;  
       this.aes = (executor instanceof AbstractExecutorService) ?  
           (AbstractExecutorService) executor : null;  
       this.completionQueue = new LinkedBlockingQueue<Future<V>>();  
   }
到這里可以推測,按照任務的完成順序獲取結果,就是通過阻塞隊列實現的,阻塞隊列剛好具有這樣的性質:阻塞和有序。
 
ExecutorCompletionService任務的提交和執行都是委托給Executor來完成。當提交某個任務時,該任務首先將被包裝為一個QueueingFuture
public Future<V> submit(Callable<V> task) {  
        if (task == null) throw new NullPointerException();  
        RunnableFuture<V> f = newTaskFor(task);  
        executor.execute(new QueueingFuture(f));  
        return f;  
} 

QueueingFuture是FutureTask的一個子類,通過改寫FutureTask類的done方法,可以實現當任務完成時,將結果放入到BlockingQueue中。

private class QueueingFuture extends FutureTask<Void> {  
        QueueingFuture(RunnableFuture<V> task) {  
            super(task, null);  
            this.task = task;  
        }  
        protected void done() { completionQueue.add(task); }  
        private final Future<V> task;  
} 

這里簡單說明下:FutureTask.done(),這個方法默認什么都不做,就是一個回調,當提交的線程池中的任務完成時,會被自動調用。這也就說時候,當任務完成的時候,會自動執行QueueingFuture.done()方法,將返回結果加入到阻塞隊列中,加入的順序就是任務完成的先后順序。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM