Java 並發編程學習(五):批量並行執行任務的兩種方式


背景介紹

有時候我們需要執行一批相似的任務,並且要求這些任務能夠並行執行。通常,我們的需求會分為兩種情況:

  1. 並行執行一批任務,等待耗時最長的任務完成之后,再處理所有任務的結果。
  2. 並行執行一批任務,依次處理完成的任務結果(哪個任務先執行完就先處理哪個)。

這篇文章要介紹的兩種批量執行任務的方式,正好對應了上述兩種情況,下面分別介紹在Java中,如何使用並發包里面的API完成我們的需求。

使用ExecutorSevice#invokeAll()

通過向線程池提交一組任務,可以實現上述第一種批量執行的需求。下面來看具體例子。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 創建兩個任務
        Callable<Integer> task1 = () -> {
            TimeUnit.SECONDS.sleep(3);
            System.out.println("[" + Thread.currentThread().getName() + "]" + " task1 finished");
            return 1;
        };

        Callable<Integer> task2 = () -> {
            TimeUnit.SECONDS.sleep(10);
            System.out.println("[" + Thread.currentThread().getName() + "]" + " task2 finished");
            return 2;
        };

        List<Callable<Integer>> tasks = new ArrayList<>();
        tasks.add(task1);
        tasks.add(task2);

        // 創建線程池
        ExecutorService service = Executors.newFixedThreadPool(2);

        // 調用invokeAll方法,批量提交一組任務
        List<Future<Integer>> futures = service.invokeAll(tasks);

        for (Future<Integer> future : futures) {
            // 在這里獲取任務的返回值時,會等待所有任務都執行完才返回結果
            Integer result = future.get();
            System.out.println("[" + Thread.currentThread().getName() + "]" + " " + result);
        }

        // 關閉線程池
        service.shutdown();
    }
}

執行結果:

使用CompletionService

CompletionService也可以用來提交一組任務,讓這些任務並行執行,任何一個任務執行完之后別的線程可以立即獲得計算結果。看一下具體例子:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 創建兩個任務
        Callable<Integer> task1 = () -> {
            TimeUnit.SECONDS.sleep(3);
            System.out.println("[" + Thread.currentThread().getName() + "]" + " task1 finished");
            return 1;
        };

        Callable<Integer> task2 = () -> {
            try {
                TimeUnit.SECONDS.sleep(10);
                System.out.println("[" + Thread.currentThread().getName() + "]" + " task2 finished");
            }catch (InterruptedException e) {
                // 第二個任務耗時長,等第一個任務完成之后,手動取消第二個任務的執行,此時第二個任務可能會收到中斷。
                e.printStackTrace();
            }

            return 2;
        };

        List<Callable<Integer>> tasks = new ArrayList<>();
        tasks.add(task1);
        tasks.add(task2);

        // 創建線程池
        ExecutorService pool = Executors.newFixedThreadPool(2);

        // 使用CompletionService批量提交任務
        CompletionService<Integer> cs = new ExecutorCompletionService<>(pool);
        List<Future<Integer>> futures = new ArrayList<>();
        for (Callable<Integer> task : tasks) {
            Future<Integer> f = cs.submit(task);
            futures.add(f);
        }

        // 獲取第一個執行完的任務結果
        Future<Integer> f = cs.take();
        System.out.println("[" + Thread.currentThread().getName() + "]" + f.get());
        futures.remove(f);
        
        // 取消其他任務
        for (Future<Integer> future : futures) {
            future.cancel(true);
        }

        // 關閉線程池
        pool.shutdown();
    }
}

運行結果:

第一個任務執行完之后,第二個任務還在運行中,但是我們已經不關心其計算結果了,手動取消這個任務的執行,因此第二個任務會收到一個中斷。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM