優雅的使用線程池---ListeningExecutorService的使用


參考: google guava中文教程 https://wizardforcel.gitbooks.io/guava-tutorial/content/16.html
https://blog.csdn.net/u010900754/article/details/90742576

並發編程是一個難題,但是一個強大而簡單的抽象可以顯著的簡化並發的編寫。出於這樣的考慮,Guava 定義了 ListenableFuture接口並繼承了JDK concurrent包下的Future 接口,ListenableFuture 允許你注冊回調方法(callbacks),在運算(多線程執行)完成的時候進行調用, 或者在運算(多線程執行)完成后立即執行。這樣簡單的改進,使得可以明顯的支持更多的操作,這樣的功能在JDK concurrent中的Future是不支持的。 在高並發並且需要大量Future對象的情況下,推薦盡量使用ListenableFuture來代替

使用異步編程接口獲取返回值的方式有兩種:

1.同步方式,也就是調用方主動獲取,但是這時可能還沒有返回結果,可能需要輪詢;

2.回調方式,調用者在提交任務時,注冊一個回調函數,任務執行完以后,自動觸發回調函數通知調用者;這種實現方式需要在執行框架里植入一個擴展點,用於觸發回調。

Java原生api里的Future屬於第一種,Java8提供的CompletableFuture屬於第二種;在Java8出來之前,guava也提供了基於回調的編程接口,也就是本次要說的ListenableFuture(其實看guava代碼,里面有大量這玩意兒,不搞懂不行。。。)。

futureTask

簡單demo:


public class FutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<Integer> call = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("waiting~~~~~~");
                Thread.sleep(3000);
                return 10086;
            }
        };
        FutureTask<Integer> futureTask = new FutureTask(call);
        Thread thread = new Thread(futureTask);
        thread.start();
        //do something
        System.out.println("other things");
        Integer o = futureTask.get();
        System.out.println(o);
    }
}

Future

FutureTask實現Future接口,它定義了5個方法:

簡單說明一下接口定義

boolean cancel(boolean mayInterruptInRunning); //取消一個任務,並返回取消結果。參數表示是否中斷線程。
boolean isCancelled(); //判斷任務是否被取消
Boolean isDone();    //判斷當前任務是否執行完畢,包括正常執行完畢、執行異常或者任務取消。
V get() ;//獲取任務執行結果,任務結束之前會阻塞。
V get(long timeout, TimeUnit unit); //在指定時間內嘗試獲取執行結果。若超時則拋出超時異常

FutureTask

  • volatile int state:表示對象狀態,volatile關鍵字保證了內存可見性。futureTask中定義了7種狀態,代表了7種不同的執行狀態

private static final int NEW          = 0; //任務新建和執行中
private static final int COMPLETING   = 1; //任務將要執行完畢
private static final int NORMAL       = 2; //任務正常執行結束
private static final int EXCEPTIONAL  = 3; //任務異常
private static final int CANCELLED    = 4; //任務取消
private static final int INTERRUPTING = 5; //任務線程即將被中斷
private static final int INTERRUPTED  = 6; //任務線程已中斷

Callable callable:被提交的任務
Object outcome:任務執行結果或者任務異常
volatile Thread runner:執行任務的線程
volatile WaitNode waiters:等待節點,關聯等待線程
long stateOffset:state字段的內存偏移量
long runnerOffset:runner字段的內存偏移量
long waitersOffset:waiters字段的內存偏移量

ListenableFuture


public class ListenableFutureTest {
    public static void main(String[] args) {

    }
    public static void testListenFuture() throws InterruptedException {
        ListenableFuture<String> submit = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1))
                .submit(() -> {
                    Thread.sleep(2000L);
                    return "aync result";
                });
        Futures.addCallback(submit, new FutureCallback<String>() {
            @Override
            public void onSuccess(@Nullable String result) {
                System.out.println("succeed, result: {}" + result);
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println("failed, t: {}" + t);
            }
        }, Executors.newSingleThreadExecutor());
        Thread.sleep(100000);
    }
}

MoreExecutors.listeningDecorator就是包裝了一下ThreadPoolExecutor,目的是為了使用


private static class ListeningDecorator extends AbstractListeningExecutorService {
        private final ExecutorService delegate;

        ListeningDecorator(ExecutorService delegate) {
            this.delegate = (ExecutorService)Preconditions.checkNotNull(delegate);
        }

        public final void execute(Runnable command) {
            this.delegate.execute(command);
        }
    }

ListenableFuture 中的基礎方法是addListener(Runnable, Executor), 該方法會在多線程運算完的時候,在Executor中執行指定的Runnable。

這里的delegate就是ThreadPoolExecutor,雖然還重寫了execute,不過還是直接調用ThreadPoolExecutor里面的execute ListeningExecutorService pool=MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
這樣一個執行器就被new出來了,現在需要往里面放任務了
ListenableFuture future = pool.submit(task1);看看submit代碼


public <T> ListenableFuture<T> submit(Callable<T> task) {
        return (ListenableFuture)super.submit(task);
    }

然后調用父類的submit

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

這里會調用重寫的newTaskFor


protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return TrustedListenableFutureTask.create(callable);
    }

class TrustedListenableFutureTask<V> extends TrustedFuture<V> implements RunnableFuture<V> {
    private volatile InterruptibleTask<?> task;
    
    static <V> TrustedListenableFutureTask<V> create(Callable<V> callable) {
        return new TrustedListenableFutureTask(callable);
    }

    TrustedListenableFutureTask(Callable<V> callable) {
        this.task = new TrustedListenableFutureTask.TrustedFutureInterruptibleTask(callable);
    }

    public void run() {
        InterruptibleTask localTask = this.task;
        if (localTask != null) {
            localTask.run();
        }

        this.task = null;
    }
}    

創建了一個TrustedListenableFutureTask,里面有個task是TrustedFutureInterruptibleAsyncTask, 這里重寫了Runnable的run方法,調用的就是這個task得run方法(也就是我們真正的任務)


private final class TrustedFutureInterruptibleAsyncTask extends InterruptibleTask<ListenableFuture<V>> {
        private final AsyncCallable<V> callable;

        TrustedFutureInterruptibleAsyncTask(AsyncCallable<V> callable) {
            this.callable = (AsyncCallable)Preconditions.checkNotNull(callable);
        }
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM