ListenableFuture in Guava


ListenableFuture的說明 

  並發編程是一個難題,但是一個強大而簡單的抽象可以顯著的簡化並發的編寫。出於這樣的考慮,Guava 定義了 ListenableFuture接口並繼承了JDK concurrent包下的Future 接口,ListenableFuture 允許你注冊回調方法(callbacks),在運算(多線程執行)完成的時候進行調用,  或者在運算(多線程執行)完成后立即執行。這樣簡單的改進,使得可以明顯的支持更多的操作,這樣的功能在JDK concurrent中的Future是不支持的。 在高並發並且需要大量Future對象的情況下,推薦盡量使用ListenableFuture來代替..

  ListenableFuture 中的基礎方法是addListener(Runnable, Executor), 該方法會在多線程運算完的時候,在Executor中執行指定的Runnable。

ListenableFuture的創建和使用

  對應JDK中的 ExecutorService.submit(Callable) 提交多線程異步運算的方式,Guava 提供了ListeningExecutorService 接口, 該接口返回 ListenableFuture, 而相應的ExecutorService 返回普通的 Future。將 ExecutorService 轉為 ListeningExecutorService,可以使用MoreExecutors.listeningDecorator(ExecutorService)進行裝飾。舉例說明:

  

ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

然后我們可以向這個ListeningExecutorService提交Callable任務

final ListenableFuture<String> future =  pool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000*3);
                return     "Task done !";
            }
        });

然后我們添加Listener:

future.addListener(new Runnable() {
                @Override
                public void run() {
                    try {
                        final String contents = future.get();
                        System.out.println(contents);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            }, MoreExecutors.sameThreadExecutor());

我們看看上面的代碼,確實不怎么優雅,我們需要處理拋出的異常,需要自己通過future.get()獲得前面計算的值。有沒有更加簡便的方法呢?當然有,Guava提供了一個簡便方法來替代上面的寫法:

Futures.addCallback(future, new FutureCallback<String>() {
                @Override
                public void onSuccess(String result) {
                    System.out.println(result);
                }

                @Override
                public void onFailure(Throwable t) {
                    t.printStackTrace();
                }
            });

完成代碼如下:

package concurrency;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

/**
 * Created by hupeng on 2014/9/24.
 */
public class ListenableFutureTest {


    public static void main(String[] args) throws InterruptedException {
        ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

        final ListenableFuture<String> future = pool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000 * 2);
                return "Task done !";
            }
        });

//            future.addListener(new Runnable() {
//                @Override
//                public void run() {
//                    try {
//                        final String contents = future.get();
//                        System.out.println(contents);
//                    } catch (InterruptedException e) {
//                        e.printStackTrace();
//                    } catch (ExecutionException e) {
//                        e.printStackTrace();
//                    }
//                }
//            }, MoreExecutors.sameThreadExecutor());

        Futures.addCallback(future, new FutureCallback<String>() {
            @Override
            public void onSuccess(String result) {
                System.out.println(result);
            }

            @Override
            public void onFailure(Throwable t) {
                t.printStackTrace();
            }
        });

        Thread.sleep(5 * 1000);  //wait for task done

        pool.shutdown();
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM