Java線程池如何優雅地等待所有任務執行完


 https://blog.csdn.net/flycp/article/details/106337294

 

***Java多線程-線程池ThreadPoolExecutor的submit返回值Future     (要看)

https://blog.csdn.net/qq_25806863/article/details/71214033 

 

隨着項目的體量越來越大,對代碼的執行效率要求越來越高,在實際應用過程中我們會經常使用線程池。
那么如果線程池嵌入在業務代碼中,如何正確的等待線程池執行完,在執行后續操作呢?或者想要獲取執行結果有應該怎么處理呢?

下面走一下場景:

package com.example.demo1.entity;

/**
* create by c-pown on 2019-12-06
*/
public class Student {
private String name;
private Integer age;
private Integer heigh;
private String hoby;

public Student(String name, Integer age, Integer heigh, String hoby) {
this.name = name;
this.age = age;
this.heigh = heigh;
this.hoby = hoby;
}
static String getAllname(){
return "張三";
}
public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getAge() {
return age;
}

public void setAge(Integer age) {
this.age = age;
}

public Integer getHeigh() {
return heigh;
}

public void setHeigh(Integer heigh) {
this.heigh = heigh;
}

public String getHoby() {
return hoby;
}

public void setHoby(String hoby) {
this.hoby = hoby;
}

@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", age=" + age +
", heigh=" + heigh +
", hoby='" + hoby + '\'' +
'}';
}
}
 
package com.example.demo1.service.TestThreadPool;

import com.example.demo1.entity.Student;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
* create by c-pown on 2020-05-25
*/
public class TestThreadPool {
/**
* 手動創建線程池
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20,25,100L,
TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());

public static void main(String[] args) {
Student student = null;
List<Student> students = new ArrayList<>();
//添加五十萬個學生元素
for (int i = 0; i < 500000; i++) {
student = new Student("name"+i,20,183,"玩");
students.add(student);
}
for (Student student1 : students) {
/**
* 給元素添加后綴
*/
executor.submit(()-> student1.setName(student1.getName()+"這是后綴"));
}
//查看添加情況
System.out.println("添加數量:"+students.stream().filter(x->x.getName().contains("這是后綴")).count());
}
}
 
我們給List里面添加500000個學生元素,然后使用線程池給name屬性添加后綴,看一下執行結果:

添加數量:475371

我們發現添加成功的數量是少了兩萬多,這是由於線程池中的子線程任務沒有執行完,而主線程已經開始執行業務代碼,導致成功數量變少。
下面我們修改一下代碼:

一、使用CountDownLatch
package com.example.demo1.service.TestThreadPool;

import com.example.demo1.entity.Student;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
* create by c-pown on 2020-05-25
*/
public class TestThreadPool {
/**
* 手動創建線程池
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20,25,100L,
TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());

public static void main(String[] args) {
Student student = null;
List<Student> students = new ArrayList<>();
//添加五十萬個學生元素
for (int i = 0; i < 500000; i++) {
student = new Student("name"+i,20,183,"玩");
students.add(student);
}
CountDownLatch countDownLatch = new CountDownLatch(students.size());
for (Student student1 : students) {
/**
* 給元素添加后綴
*/
executor.submit(()-> {
try {
student1.setName(student1.getName()+"這是后綴");
} catch (Exception e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//查看添加情況
System.out.println("添加數量:"+students.stream().filter(x->x.getName().contains("這是后綴")).count());
}
}
 
結果:

添加數量:500000
1
這是一個計數器操作,在線程池執行之前,給計數器指定數值(與要執行代碼的次數一致)也就是students.size(),在線程池執行代碼體里面要加上countDownLatch.countDown();代表每執行一次數值減少一,最后在循環體外邊寫上countDownLatch.await();代表等待計數器歸零。
可以查看下源碼介紹:

/**
* Decrements the count of the latch, releasing all waiting threads if
* the count reaches zero.
*
* <p>If the current count is greater than zero then it is decremented.
* If the new count is zero then all waiting threads are re-enabled for
* thread scheduling purposes.
*
* <p>If the current count equals zero then nothing happens.
*/
public void countDown() {
sync.releaseShared(1);
}
 
/**
* Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>If the current count is zero then this method returns immediately.
*
* <p>If the current count is greater than zero then the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of two things happen:
* <ul>
* <li>The count reaches zero due to invocations of the
* {@link #countDown} method; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
* while waiting
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
 
介紹中寫到等待計數器數量減少直至為0為止。也可以給await()設置超時時間

countDownLatch.await(300,TimeUnit.SECONDS);
1
如果超過300s(也可以是時,分)則不再等待,直接執行下面代碼。

二、使用Future.get()
package com.example.demo1.service.TestThreadPool;

import com.example.demo1.entity.Student;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
* create by c-pown on 2020-05-25
*/
public class TestThreadPool {
/**
* 手動創建線程池
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20,25,100L,
TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());

public static void main(String[] args) {
Student student = null;
List<Student> students = new ArrayList<>();
//添加五十萬個學生元素
for (int i = 0; i < 500000; i++) {
student = new Student("name"+i,20,183,"玩");
students.add(student);
}
List<Future> futures = new ArrayList<>();
for (Student student1 : students) {
/**
* 給元素添加后綴
*/
Future future = executor.submit(()-> {
try {
student1.setName(student1.getName()+"這是后綴");
} catch (Exception e) {
e.printStackTrace();
}
});
futures.add(future);
}
for (Future future : futures) {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
//查看添加情況
System.out.println("添加數量:"+students.stream().filter(x->x.getName().contains("這是后綴")).count());
}
}
 
結果:

添加數量:500000
1
Future.get()可以同步等待線程執行完成,並且可以監聽執行結果

/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
 
源碼中可以看出方法是有返回值得,可以監聽線程池子線程執行狀態及執行結果。
直接return 結果 Future<?>添加泛型即可。
同樣的 Future.get()也是可以指定超時時間的,超過等待時間可以直接執行后續代碼。

最后 如果線程池是方法內部創建的,可以直接使用shutdown()也會等待線程池的執行結果。同時會關閉線程池資源。

executor.shutdown();
try {
executor.awaitTermination(300,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
—————————————————————————————————————
原文鏈接:https://blog.csdn.net/flycp/article/details/106337294


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM