Futures工具類使用


Futures是guava提供的工具類,全類名是com.google.common.util.concurrent.Futures。配合MoreExecutors使用,效果極佳。

主要方法如下:

1、addCallback()方法:

public static void addCallback(ListenableFuture future, FutureCallback callback, Executor executor):給ListenableFuture實例添加一個回調,作用等同於調用ListenableFuture實例的addListener(Runnable listener, Executor executor)方法。

示例:

    public static void main(String[] args) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
        ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
        ListenableFuture<Integer> future = listeningExecutorService.submit(() -> {
            try {
                Thread.sleep(4000);
                System.out.println(Thread.currentThread().getName() + "@666");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 1);
        Futures.addCallback(future, new FutureCallback<Integer>() {

            @Override
            public void onSuccess(Integer result) {
                System.out.println(Thread.currentThread().getName() + "@" + result);
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println(Thread.currentThread().getName() + "@" + t.getMessage());
            }
        }, threadPoolExecutor);
        System.out.println(Thread.currentThread().getName() + "@888");
    }

ExecutorService對應Future,ListeningExecutorService對應ListenableFuture。

2、allAsList()方法的兩個重載:

public static ListenableFuture<List<V>> allAsList(ListenableFuture<V>... futures)

public static ListenableFuture<List<V>> allAsList(Iterable<ListenableFuture<V>> futures)

示例:

    public static void main(String[] args) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
        ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
        ListenableFuture<List<Integer>> mergedListenableFuture = Futures.allAsList(
                Lists.newArrayList(
                        listeningExecutorService.submit(() -> {
                            try {
                                Thread.sleep(4000);
                                System.out.println(Thread.currentThread().getName() + "@666");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }, 1),
                        listeningExecutorService.submit(() -> {
                            try {
                                Thread.sleep(2000);
                                System.out.println(Thread.currentThread().getName() + "@888");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }, 2)
                )
        );
        try {
            List<Integer> resultList = mergedListenableFuture.get();
            System.out.println(resultList);
        } catch (Exception e) {
            e.printStackTrace();
        }

        Futures.addCallback(mergedListenableFuture,
                new FutureCallback<List<Integer>>() {
                    @Override
                    public void onSuccess(List<Integer> result) {
                        try {
                            Thread.sleep(1000);
                            System.out.println(Thread.currentThread().getName() + ", success callback");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        try {
                            Thread.sleep(1000);
                            System.out.println(Thread.currentThread().getName() + ", " + t.getMessage());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                },
                threadPoolExecutor);
    }

可以用來把多個ListenableFuture實例合並成一個ListenableFuture實例,組合的ListenableFuture實例的get()方法返回一個集合,集合中的元素是之前各ListenableFuture實例的get()方法返回值,且元素順序同allAsList()方法入參Future實例對應。假如對這個組合的ListenableFuture實例添加回調,則回調會在原來所有ListenableFuture實例都done之后才執行。同樣,假如某一個ListenableFuture實例對應任務拋異常,則組合的ListenableFuture實例的get()也會拋異常。

3、successfulAsList()方法的兩個重載:

public static ListenableFuture<List<V>> successfulAsList(ListenableFuture<V>... futures)

public static ListenableFuture<List<V>> successfulAsList(Iterable<ListenableFuture<V>> futures)

successfulAsList()方法和allAsList()方法有一點區別,就是組合的ListenableFuture實例的get()方法永遠不會拋異常,即使之前某ListenableFuture實例對應的任務拋異常。如果某任務拋異常,則get()方法返回的集合中對應位置的值為null。極端情況下,get()方法會返回一個純null的集合。

示例:

    public static void main(String[] args) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
        ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
        ListenableFuture<List<Integer>> mergedListenableFuture = Futures.successfulAsList(
                Lists.newArrayList(
                        listeningExecutorService.submit(() -> {
                            try {
                                Thread.sleep(4000);
                                System.out.println(Thread.currentThread().getName() + "@666");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println(Lists.newArrayList(1, 2).get(3));
                        }, 1),
                        listeningExecutorService.submit(() -> {
                            try {
                                Thread.sleep(2000);
                                System.out.println(Thread.currentThread().getName() + "@888");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println("".substring(2));
                        }, 2)
                )
        );
        try {
            List<Integer> resultList = mergedListenableFuture.get();
            System.out.println(resultList);
        } catch (Exception e) {
            e.printStackTrace();
        }

        Futures.addCallback(mergedListenableFuture,
                new FutureCallback<List<Integer>>() {
                    @Override
                    public void onSuccess(List<Integer> result) {
                        try {
                            Thread.sleep(1000);
                            System.out.println(Thread.currentThread().getName() + ", success callback");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        try {
                            Thread.sleep(1000);
                            System.out.println(Thread.currentThread().getName() + ", " + t.getMessage());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                },
                threadPoolExecutor);
    }

4、whenAllComplete()方法的兩個重載:

public static FutureCombiner<V> whenAllComplete(ListenableFuture<V>... futures)

public static FutureCombiner<V> whenAllComplete(Iterable<ListenableFuture<V>> futures)

當所有ListenableFuture實例都執行完后,做一些操作,其中一些ListenableFuture實例對應任務拋異常也不要緊,不影響接下來要做的事情。

返回的FutureCombiner實例,有三個實例方法可以使用,返回值都是ListenableFuture類型,利用這個特性還可以實現鏈式異步操作。異步1執行完后執行異步2,異步2完成之后執行異步3,只要需要,就可以一直這么鏈式下去。

FutureCombiner常用實例方法:

public ListenableFuture<C> call(Callable<C> combiner, Executor executor)

public ListenableFuture<C> callAsync(AsyncCallable<C> combiner, Executor executor)

public ListenableFuture<?> run(Runnable combiner, Executor executor)

示例:

    public static void main(String[] args) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
        ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
        ListenableFuture<Integer> listenableFuture1 = Futures.whenAllComplete(
                listeningExecutorService.submit(() -> {
                    try {
                        Thread.sleep(4000);
                        System.out.println(Thread.currentThread().getName() + "@666");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Lists.newArrayList(1, 2).get(3));
                }, 1),
                listeningExecutorService.submit(() -> {
                    try {
                        Thread.sleep(2000);
                        System.out.println(Thread.currentThread().getName() + "@888");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("".substring(2));
                }, 2)
        ).call(() -> {
            System.out.println(123);
            return 1;
        }, threadPoolExecutor);
        Futures.whenAllComplete(listenableFuture1).call(() -> {
            System.out.println(456);
            return 2;
        }, threadPoolExecutor);
    }

5、whenAllSucceed()方法的兩個重載

public static FutureCombiner<V> whenAllSucceed(ListenableFuture<V>... futures)

public static FutureCombiner<V> whenAllSucceed(Iterable<ListenableFuture<V>> futures)

whenAllSucceed()方法和whenAllComplete()方法有一點區別,就是如果入參某個實例對應任務拋異常,則返回值FutureCombiner實例的call()方法或者run()方法入參的任務不會執行,也不拋異常。

示例:

    public static void main(String[] args) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
        ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
        Futures.FutureCombiner futureCombiner = Futures.whenAllSucceed(
                listeningExecutorService.submit(() -> {
                    try {
                        Thread.sleep(4000);
                        System.out.println(Thread.currentThread().getName() + "@666");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }, 1),
                listeningExecutorService.submit(() -> {
                    try {
                        Thread.sleep(2000);
                        System.out.println(Thread.currentThread().getName() + "@888");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("".substring(2));
                }, 2)
        );
        futureCombiner.call(() -> {
            System.out.println(123);
            return 1;
        }, threadPoolExecutor);

        try {
            Thread.sleep(5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(456);
    }

本例由於whenAllSucceed()方法第二個入參ListenableFuture實例對應的任務會拋異常,所以FutureCombiner實例的call()方法的任務不會執行,故不會打印123。

6、catching開頭的兩個方法:

public static ListenableFuture<V> catching(ListenableFuture<V> input, Class<X> exceptionType, Function<X, V> fallback, Executor executor)

注意,這里的Function不是jdk的java.util.function.Function,而是guava的Function,在base 子package中,全類名是com.google.common.base.Function。

public static ListenableFuture<V> catchingAsync(ListenableFuture<V> input, Class<X> exceptionType, AsyncFunction<X, V> fallback, Executor executor)

當ListenableFuture實例對應的任務拋異常時,假如拋出的異常是指定的類型,則可以執行planB。

示例:

public static void main(String[] args) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
        ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
        ListenableFuture<List<Integer>> mergedListenableFuture = Futures.allAsList(
                Lists.newArrayList(
                        listeningExecutorService.submit(() -> {
                            try {
                                Thread.sleep(4000);
                                System.out.println(Thread.currentThread().getName() + "@666");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }, 1),
                        listeningExecutorService.submit(() -> {
                            try {
                                Thread.sleep(2000);
                                System.out.println(Thread.currentThread().getName() + "@888");
                                System.out.println(Thread.currentThread().getName() + "".substring(2));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }, 2)
                )
        );
        ListenableFuture<List<Integer>> withFallbackListenableFuture = Futures.catching(mergedListenableFuture,
                StringIndexOutOfBoundsException.class,
                input -> getBackUpList(),
                threadPoolExecutor
        );
        try {
            System.out.println(withFallbackListenableFuture.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static List<Integer> getBackUpList() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new ArrayList<>();
    }

只有當input拋出指定異常時,才會執行fallback方法。如果fallback方法也拋了異常,則最終ListenableFuture實例的get()方法會拋異常。

catchingAsync()方法,第三個參數是AsyncFunction實例,AsyncFunction也是個函數式接口,只是這個接口的方法的返回值必須是ListenableFuture類型,用起來沒有catching()方法方便。

7、public static ImmutableList<ListenableFuture<T>> inCompletionOrder(Iterable<ListenableFuture<T>> futures)

返回一個不可變的ListenableFuture實例的集合,集合中元素順序和各ListenableFuture實例執行完的順序一致

示例:

  public static void main(String[] args) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
        ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
        ImmutableList<ListenableFuture<Integer>> listenableFutureList = Futures.inCompletionOrder(Lists.newArrayList(
                listeningExecutorService.submit(() -> {
                    try {
                        Thread.sleep(4000);
                        System.out.println(Thread.currentThread().getName() + "@666");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }, 1),
                listeningExecutorService.submit(() -> {
                    try {
                        Thread.sleep(2000);
                        System.out.println(Thread.currentThread().getName() + "@888");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }, 2)
                )
        );
        listenableFutureList.forEach(p -> {
            try {
                System.out.println(Thread.currentThread().getName() + p.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

8、transform()相關的三個方法:

public static ListenableFuture<O> transform(ListenableFuture<I> input, Function<I, O> function, Executor executor)

public static ListenableFuture<O> transformAsync(ListenableFuture<I> input, AsyncFunction<I, O> function, Executor executor)

public static Future<O> lazyTransform(Future<I> input, Function<I, O> function)

transform()方法和transformAsync()方法,返回一個ListenableFuture實例,其結果是由入參ListenableFuture實例的結果通過入參Function實例計算得出。如果入參ListenableFuture實例對應的任務拋異常,則返回的ListenableFuture實例也會拋同樣的異常,Function實例不會執行。如果入參ListenableFuture實例對應的任務被取消,則返回的ListenableFuture實例也會被取消。如果返回的ListenableFuture實例被取消,則入參ListenableFuture實例也會被取消。

lazyTransform()方法比較特殊, 入參Function實例不會主動執行,只有在返回的Future實例的get()方法被調用時,Function實例才會執行,但是這樣又會阻塞當前主線程。所以這個方法不是很實用。

示例:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 20, 0, TimeUnit.MINUTES, new LinkedBlockingQueue<>(3000), threadFactory);
        ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
        ListenableFuture<Integer> oriListenableFuture = listeningExecutorService.submit(() -> {
            try {
                Thread.sleep(4000);
                System.out.println(Thread.currentThread().getName() + "@666");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 1);
        Future future = Futures.lazyTransform(oriListenableFuture, input -> {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + "@888");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return String.valueOf(input);
        });
        future.get();
        System.out.println(456);
    }

9、scheduleAsync()方法的兩個重載:指定多少時間后執行任務,任務只會執行一次。

public static ListenableFuture<O> scheduleAsync(AsyncCallable<O> callable, long delay, TimeUnit timeUnit, ScheduledExecutorService executorService)

public static ListenableFuture<O> scheduleAsync(AsyncCallable<O> callable, Duration delay, ScheduledExecutorService executorService)

AsyncCallable也是個函數式接口,無入參,出參是一個ListenableFuture實例。

示例:

    public static void main(String[] args) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("async-pool-%d").build();
        ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(5, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
        Futures.scheduleAsync(() -> {
            ListenableFuture sf = MoreExecutors.listeningDecorator(scheduledExecutorService).submit(() -> {
                System.out.println(Thread.currentThread().getName() + "@" + System.currentTimeMillis());
            });
            return sf;
        }, 5, TimeUnit.SECONDS, scheduledExecutorService);
    }

上例中,5s后會打印一次,只打印一次。

若要想真的定時任務,應該怎么寫呢?比如說,要求每5s打印一次。

示例:

 

10、withTimeout()方法的兩個重載:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM