CompleteableFuture常用方法解析



package com.xunqi.gulimall.search.thread;

import java.util.concurrent.*;

/**
 * @Description:
 * @Created: with IntelliJ IDEA.
 * @author: 阿倫啊
 * @createTime: 2021-06-18 11:16
 **/
public class ThreadTest {

    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // System.out.println("main......start.....");
        // Thread thread = new Thread01();
        // thread.start();
        // System.out.println("main......end.....");

        // Runable01 runable01 = new Runable01();
        // new Thread(runable01).start();

        // FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
        // new Thread(futureTask).start();
        // System.out.println(futureTask.get());

        // service.execute(new Runable01());
        // Future<Integer> submit = service.submit(new Callable01());
        // submit.get();

        System.out.println("main......start.....");
        // CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        //     System.out.println("當前線程:" + Thread.currentThread().getId());
        //     int i = 10 / 2;
        //     System.out.println("運行結果:" + i);
        // }, executor);

        /**
         * 方法完成后的處理
         */
        // CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        //     System.out.println("當前線程:" + Thread.currentThread().getId());
        //     int i = 10 / 0;
        //     System.out.println("運行結果:" + i);
        //     return i;
        // }, executor).whenComplete((res,exception) -> {
        //     //雖然能得到異常信息,但是沒法修改返回數據
        //     System.out.println("異步任務成功完成了...結果是:" + res + "異常是:" + exception);
        // }).exceptionally(throwable -> {
        //     //可以感知異常,同時返回默認值
        //     return 10;
        // });

        /**
         * 方法執行完后端處理
         */
        // CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        //     System.out.println("當前線程:" + Thread.currentThread().getId());
        //     int i = 10 / 2;
        //     System.out.println("運行結果:" + i);
        //     return i;
        // }, executor).handle((result,thr) -> {
        //     if (result != null) {
        //         return result * 2;
        //     }
        //     if (thr != null) {
        //         System.out.println("異步任務成功完成了...結果是:" + result + "異常是:" + thr);
        //         return 0;
        //     }
        //     return 0;
        // });


        /**
         * 兩個任務都完成
         */
//        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
//            System.out.println("任務一線程:" + Thread.currentThread().getId());
//            int i = 10 / 4;
//            System.out.println("任務一結束:");
//            return i;
//        }, executor);
//
//        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
//            System.out.println("任務二線程:" + Thread.currentThread().getId());
//            try {
//                Thread.sleep(1000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            System.out.println("任務二結束:");
//            return "Hello ";
//        }, executor);

        //不能接受返回值
//        future01.runAfterBothAsync(future02,()->{
//            System.out.println("任務三開始");
//        },executor);

        //能獲取前兩個任務的返回值
//        future01.thenAcceptBothAsync(future02,(f1, f2)->{
//            System.out.println("任務三開始。。。之前的結果:"+f1+"----->"+f2);
//        },executor);

        //能獲取到前兩個任務的返回值,並且自己也能有返回值
//        future01.thenCombineAsync(future02,(f1,f2)->{
//            return f1 + ": "+ f2 + " Haha";
//        },executor);

        /**
         * 兩個任務,只要有一個完成,就執行任務3
         * runAfterEitherAsync;不感知結果,自己也無返回值
         * acceptEitherAsync;可以接受上次的結果,但是沒有返回值
         * applyToEitherAsync;可感知結果,且有返回值
         */
//        future01.runAfterEitherAsync(future02,()->{
//            System.out.println("任務三開始");
//        },executor);

        //可以接受上次的結果,但是沒有返回值
//        future01.acceptEitherAsync(future02,(res)->{
//            System.out.println("任務三開始...之前的結果是" + res);
//        },executor);

        //可感知結果,且有返回值
//        CompletableFuture<String> future = future01.applyToEitherAsync(future02, res -> {
//            return res.toString() + "->哈哈";
//        }, executor);
//        System.out.println(future.get());

        /**
         * 多任務組合
         */
        CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(()->{
            System.out.println("查詢商品的圖片信息");
            return "hello.jpg";
        },executor);

        CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(()->{
            System.out.println("查詢商品的屬性");
            return "黑色+256G";
        },executor);

        CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(()->{
            System.out.println("查詢商品的介紹");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "華為";
        },executor);
//        futureImg.get();futureAttr.get();futureDesc.get();//阻塞式等待,太麻煩,代碼冗余

//        CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
//        allOf.get();//等待所有結果完成
//        System.out.println("main...end..."+futureImg.get() + futureAttr.get() + futureDesc.get());

        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);
        anyOf.get();//任意一個返回即可
        System.out.println("main...end..."+anyOf.get());

        /**
         * 線程串行化
         * 1、thenRunL:不能獲取上一步的執行結果
         * 2、thenAcceptAsync:能接受上一步結果,但是無返回值
         * 3、thenApplyAsync:能接受上一步結果,有返回值
         *
         */
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("當前線程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運行結果:" + i);
            return i;
        }, executor).thenApplyAsync(res -> {
            System.out.println("任務2啟動了..." + res);
            return "Hello" + res;
        }, executor);
        System.out.println("main......end....." + future.get());

    }

    private static void threadPool() {

        ExecutorService threadPool = new ThreadPoolExecutor(
                200,
                10,
                10L,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<Runnable>(10000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        //定時任務的線程池
        ExecutorService service = Executors.newScheduledThreadPool(2);
    }


    public static class Thread01 extends Thread {
        @Override
        public void run() {
            System.out.println("當前線程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運行結果:" + i);
        }
    }


    public static class Runable01 implements Runnable {
        @Override
        public void run() {
            System.out.println("當前線程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運行結果:" + i);
        }
    }


    public static class Callable01 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("當前線程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運行結果:" + i);
            return i;
        }
    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM