0318 guava並發工具


image.png


並發是一個難題,但是可以通過使用強力簡單的抽象來顯著的簡化,為了簡化問題,guava擴展了Future接口,即 ListenableFuture (可以監聽的Future)。
我強烈建議你在你的所有代碼里使用ListenableFuture去替代Future,原因如下:

  • 很多的Futures 類的方法需要它。(Futures工具類使用)
  • 它比后來改造為ListenableFutrue更簡單。(早點使用比重構更簡單)
  • 工具方法的提供者不需要提供Future和ListenableFuture方法的變體。(不需要兼容兩套)

接口

一個傳統的Futrue代表一個異步計算的結果:一個可能完成也可能沒有完成輸出結果的計算。
一個Future可以用在進度計算,或者說是 一個提供給我們結果的服務的承諾。


一個ListenableFuture允許注冊當你在計算完成的時候的回調,或者計算已經完成了。
這個簡單的增強讓高效支持多種操作成為可能。而Future接口並不能支持。


ListenbleFuture中添加的基本操作是
addListener(Runnable , Executor ),
它指出了當未來計算完成時,指定的Runnable會在指定的Executor中運行。


增加回調

很多用戶喜歡使用 Futures.addCallback(ListenableFuture,FutureCallback,Executor)方法。
FutureCallback實現了下面兩個方法:


  • onSuccess(v) 當未來成功執行的動作,基於計算結果
  • onFailure(Throwable) 當未來失敗執行的動作,基於失敗

創建

相較於jdk提供的 ExecutorService.submit(Callable)方法來初始化一個異步計算。它返回一個常規的Future,
guava提供了ListeningExecutorService接口,它返回ListenableFuture。
把ExecutorService轉換為ListenableExecutorService
使用:MoreExecutors.listeningDecorator(ExecutorService)

基礎用法如下:


/**
 * 說明:使用例子代碼
 * @author carter
 * 創建時間: 2020年03月19日 9:54 上午
 **/

@Slf4j
public class ListenableFutureUtils {

    public static void main(String[] args) {

ListeningExecutorService service = MoreExecutors.listeningDecorator(
    Executors.newFixedThreadPool(10));


        final ListenableFuture<AResult> listenableFuture = service.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new AResult(30, "male", 1);

        });


        Futures.addCallback(listenableFuture,
                new FutureCallback<AResult>() {
                    @Override
                    public void onSuccess(AResult aResult) {
                        log.info("計算成功,{}",aResult);
                    }

                    @Override
                    public void onFailure(Throwable throwable) {

                        log.error("計算錯誤",throwable);
                        
                    }
                },service);

    }
    
    @Data
    @AllArgsConstructor
    public static class AResult{
        
        private Integer age;
        
        private String sex;
        
        private Integer id;
        
        
    }
    
}

相對的,如果你想從基於FutureTask的API轉換過來,
Guava提供了
ListenableFutureTask.create(Callable)

ListenableFutureTask.create(Runnable)
不同於jdk,ListenableFutureTask並不是直接擴展的。

如果你喜歡抽象的設置future的值,而不是實現一個方法然后計算值,可以考慮使用AbstractFuture或使用SettableFuture ;

如果你必須轉換Future為ListenableFuture,你別無選擇,必須使用 JdkFutureAdapters.listenInPoolThread(Future)來轉換Future為ListenableFuture
任何時候只要可能,推薦你修改源碼讓它返回一個 ListenableFuture

應用

使用ListenablFuture最重要的原因是可以使用鏈式異步操作。

代碼如下:

package com.xxx.demo;

import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.AllArgsConstructor;
import lombok.Data;

/**
 * 說明:異步操作鏈
 * @author carter
 * 創建時間: 2020年03月19日 10:11 上午
 **/

public class ApplicationUtils {


    public static void main(String[] args) {

        Query query = new Query(30);

        ListenableFuture<RowKey> rowKeyFuture = lookUp(query);

        AsyncFunction<RowKey, QueryResult> queryFun = rowKey -> readData(rowKey);

        final ListenableFuture<QueryResult> queryResultListenableFuture = 
            Futures.transformAsync(rowKeyFuture, queryFun);

    }

    private static ListenableFuture<QueryResult> readData(RowKey rowKey) {
        return null;
    }

    private static ListenableFuture<RowKey> lookUp(Query query) {
        return null;
    }


    @Data
    @AllArgsConstructor
    public static class RowKey {

        private String id;

    }

    @Data
    @AllArgsConstructor
    public static class Query {

        private Integer age;

    }


    @Data
    @AllArgsConstructor
    public static class QueryResult {

        private String id;
        private String age;

    }


}

很多其他高效支持的操作ListenableFuture提供,而Future不提供。
不同的操作可以被不同的線程池執行,一個簡單的ListenableFuture可以有多個操作去等待。

只要一個操作開始,其他多個操作應該開始,fan-out, 千帆競發。

ListenableFuture可以實現這樣的操作:它觸發了所有請求的回調。

通過少量的工作,我們可以 fan-in.

觸發一個ListenableFuture 來獲得計算結果,當其他的Future結束的時候。

Futures.allAsList是一個例子。

方法介紹:

方法 描述
transformAsync(ListenableFuture , AsyncFunction , Executor) 返回一個新的ListenableFuture,它的結果是執行異步函數的返回,函數入參是ListenableFuture的返回結果;
transform(ListenableFuture , Function , Executor) 返回一個新的ListenableFuture,它的結果是執行函數的返回,函數入參是ListenableFuture的返回結果;
allAsList(Iterable ) 返回一個ListenableFuture,它的結果是一個list,包含每一個列表中的ListenableFuture的執行結果,任何一個ListenableFuture執行失敗或者取消,最后的返回結果取消
successfullAsList(Iterable ) 返回一個ListenableFuture,它的結果是一個list,包含每一個列表中的ListenableFuture的執行結果,成功的是結果,失敗或者取消的值使用null替代

AsyncFunction<A,B> 提供了一個方法 , ListenableFuture apply(A inpunt),它可以用來異步的轉換值。

代碼如下:

package com.xxx.demo;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
 * 說明:成功執行結果匯集
 * @author carter
 * 創建時間: 2020年03月19日 10:34 上午
 **/
@Slf4j
public class Test3 {

    public static void main(String[] args) {

        List<ListenableFuture<QueryResult>> querys = Lists.newLinkedList();
        final ListenableFuture<List<QueryResult>> successfulAsList =
            Futures.successfulAsList(querys);
        
        Futures.addCallback(successfulAsList, new FutureCallback<List<QueryResult>>() {
            @Override
            public void onSuccess(List<QueryResult> queryResults) {
                log.info("執行結果列表:{}",queryResults);
            }

            @Override
            public void onFailure(Throwable throwable) {
                log.error("執行失敗",throwable);
            }
        });


    }

    @Data
    @AllArgsConstructor
    public static class QueryResult{
        
        
      private  Integer age;
        
    }
    

}

嵌套的Future

你的代碼調用一個通用接口並返回一個Future,很可能最終返回一個嵌套的Future.

package com.xxx.demo;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

/**
 * 說明:嵌套的ListenableFuture
 * @author carter
 * 創建時間: 2020年03月19日 10:43 上午
 **/

public class Test4 {

    public static void main(String[] args) {


        final ListeningExecutorService executorService = MoreExecutors
            .listeningDecorator(Executors.newFixedThreadPool(2));
        final ListeningExecutorService otherExecutorService = MoreExecutors
            .listeningDecorator(Executors.newFixedThreadPool(2));


        Callable<Foo> otherCallback =  ()->new Foo("aaa");


        final ListenableFuture<ListenableFuture<Foo>> submit = 
                executorService.submit(() -> otherExecutorService.submit(otherCallback));


    }
    
    @Data
    @AllArgsConstructor
    public static class Foo{
        
        private String name;
    }
    
}

例子最后返回的是: ListenableFuture<ListenableFuture > ,
這個代碼不對,因為當外層的Future 取消的時候,無法傳播到內層的Future,
這也是一個 使用get()檢查別的Future或者Listnener的常規的錯誤,

但是,除非特別關注 否則 otherCallback拋出的異常會被壓制。
為了避免這種情況,所有的guava的Future處理方法(有些從jdk來),有 *Async版本來安全的解開這個嵌套。

比如:transform,transformAsyn, submit, submitAsync方法。

深入研究

原創不易,轉載請注明出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM