從Java Future到Guava ListenableFuture實現異步調用


原文地址: http://blog.csdn.net/pistolove/article/details/51232004

Java Future

    通過Executors可以創建不同類似的線程池,常見的大概有下表幾種類型,還有些可能為被列出。在實際應用中,個人感覺主要使用newCachedThreadPook和newFixedThreadPool來創建線程池。

Executors創建線程池源碼

//調用newCachedThreadPool方法,可以創建一個緩沖型線程池,而在改方法中通過傳參創建一個ThreadPoolExecutor,也許你會很奇怪明明返回的是一個ExecutorService,怎么會創建了一個ThreadPoolExecutor呢?
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, 
                   TimeUnit.SECONDS, new SynchronousQueue<Runnable());
}

// ThreadPoolExecutor繼承了抽象的service類AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService {}

//AbstractExecutorService實現了ExecutorService接口
public abstract class AbstractExecutorService implements ExecutorService {}

//所以ExecutorService其實是ThreadPoolExecutor的基類,這也就解釋清楚了

ExecutorService(線程池)

ExecutorService是一個接口,它繼承了Executor,在原有execute方法的基礎上新增了submit方法,傳入一個任務,該方法能夠返回一個Future對象,可以獲取異步計算結果。

//ExecutorService繼承了Executor,並擴展了新方法。
public interface ExecutorService extends Executor { }

//Executor中的方法
void execute(Runnable command);

//增加了submit方法,該方法傳任務來獲取Future對象,而Future對象中可以獲取任務的執行結果
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);

Future(獲取異步計算結果)

Future接口中有下表所示方法,可以獲取當前正在執行的任務相關信息。

FutureTask

Executor框架利用FutureTask來完成異步任務,並可以用來進行任何潛在的耗時計算,一般FutureTask多用於耗時的計算,主線程可以在完成自己任務后,再去獲取結果。

FutureTask包裝了Callable和Runnable接口對象,提供了對Future接口的基本實現,開始、取消計算、查詢結果是否完成、獲取計算結果。僅當計算完成時才能檢索結果,當計算沒有完成時,該方法會一直阻塞直到任務轉入完成狀態。一旦完成計算,不能夠重新開始或取消計算。通過Excutor(線程池)來執行,也可傳遞給Thread對象執行。如果在主線程中需要執行比較耗時的操作時,但又不想阻塞主線程時,可以把這些作業交給Future對象在后台完成,當主線程將來需要時,就可以通過Future對象獲得后台作業的計算結果或者執行狀態。

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestFuture {
    // 創建線程池
    final static ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Long t1 = System.currentTimeMillis();

        // 任務1
        Future<Boolean> booleanTask = service.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                return true;
            }
        });

        while (true) {
            if (booleanTask.isDone() && !booleanTask.isCancelled()) {
                //模擬耗時
                Thread.sleep(500);
                Boolean result = booleanTask.get();
                System.err.println("BooleanTask: " + result);
                break;
            }
        }

        // 任務2
        Future<String> stringTask = service.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello World";
            }
        });

        while (true) {
            if (stringTask.isDone() && !stringTask.isCancelled()) {
                String result = stringTask.get();
                System.err.println("StringTask: " + result);
                break;
            }
        }



        // 任務3
        Future<Integer> integerTask = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        });

        while (true) {
            if (integerTask.isDone() && !integerTask.isCancelled()) {
                Integer result = integerTask.get();
                System.err.println("IntegerTask: " + result);
                break;
            }
        }

        // 執行時間
        System.err.println("time: " + (System.currentTimeMillis() - t1));
    }

}

Guava Future

ListenableFuture是可以監聽的Future,它是對Java原始Future的擴展增強。Future表示一個異步計算任務,當任務完成時可以得到計算結果。如果希望計算完成時馬上就拿到結果展示給用戶或者做另外的計算,就必須使用另一個線程不斷的查詢計算狀態。這樣做會使得代碼復雜,且效率低下。如果使用ListenableFuture,Guava會幫助檢測Future是否完成了,如果完成就自動調用回調函數,這樣可以減少並發編程的復雜度。

常用API

1. MoreExecutors

該類是final類型的工具類,提供了很多靜態方法。比如ListeningDecorator方法初始化ListeningExecutorService方法,使用此實例submit方法即可初始化ListenableFuture對象。

2. ListeningExecutorService

該類是對ExecutorService的擴展,重新ExecutorService類中的submit方法,返回ListenableFuture對象。

3. ListenableFuture

該接口擴展了Future接口,增加了addListener方法,該方法在給定的executor上注冊一個監聽器,當計算完成時會馬上調用該監聽器。不能夠確保監聽器執行的順序,但可以在計算完成時確保馬上被調用。

4. FutureCallback

該接口提供了OnSuccess和OnFailure方法。獲取異步計算的結果並回調。

5. Futures

該類提供了很多實用的靜態方法以供實用。

6. ListenableFutureTask

該類擴展了FutureTask類並實現了ListenableFuture接口,增加了addListener方法。

7.

public class TestListenableFuture2 {
    // 創建線程池
    final static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

    public static void main(String[] args) throws Exception {
        Long t1 = System.currentTimeMillis();
        // 任務1
        ListenableFuture<Boolean> booleanTask = service.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                return true;
            }
        });

        Futures.addCallback(booleanTask, new FutureCallback<Boolean>() {
            @Override
            public void onSuccess(Boolean result) {
                System.err.println("BooleanTask: " + result);
            }

            @Override
            public void onFailure(Throwable t) {
            }
        });

        // 任務2
        ListenableFuture<String> stringTask = service.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello World";
            }
        });

        Futures.addCallback(stringTask, new FutureCallback<String>() {
            @Override
            public void onSuccess(String result) {
                System.err.println("StringTask: " + result);
            }

            @Override
            public void onFailure(Throwable t) {
            }
        });

        // 任務3
        ListenableFuture<Integer> integerTask = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        });

        Futures.addCallback(integerTask, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(Integer result) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.err.println("IntegerTask: " + result);
            }

            @Override
            public void onFailure(Throwable t) {
            }
        });

        // 執行時間
        System.err.println("time: " + (System.currentTimeMillis() - t1));
    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM