線程池子線程超時(僵死)問題


簡介

線程池循環執行一些任務,某個線程執行超時,需要將超時的線程任務拋棄。

示例

修改前

當遇到超時的任務就涼涼,得重啟程序。

Task.java:

public class Task implements Runnable {

  private final int sleepTime;
  private final CountDownLatch countDownLatch;

  public Task(int sleepTime, CountDownLatch countDownLatch) {
    this.sleepTime = sleepTime;
    this.countDownLatch = countDownLatch;
  }

  @Override
  public void run() {
    Thread.sleep(sleepTime == Demo.THREAD_SIZE ? Demo.LONG_RUNNING_THREAD_TIME : Demo.SHORT_RUNNING_THREAD_TIME);
    System.out.println("任務 " + sleepTime + " 干完了");
    countDownLatch.countDown();
  }
}

Demo.java:

public class Demo {
  /**
   * 線程數量
   * 前三個任務執行 {@link Demo#SHORT_RUNNING_THREAD_TIME} ms
   * 最后一個線程運行 {@link Demo#LONG_RUNNING_THREAD_TIME} ms
   */
  public static final int THREAD_SIZE = 4;
  /**
   * 線程超時時間(ms)
   */
  public static final int THREAD_TIMEOUT = 3000;
  /**
   * 超時線程運行的時間(ms)
   */
  public static final int LONG_RUNNING_THREAD_TIME = 20000;
  /**
   * 正常線程運行的時間(ms)
   */
  public static final int SHORT_RUNNING_THREAD_TIME = 10;
  
  public static void main(String[] args) throws Exception {
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    while (true) {
      CountDownLatch countDownLatch = new CountDownLatch(THREAD_SIZE);
      System.out.println("開始");
      for (int i = 1; i <= THREAD_SIZE; i++) {
        executorService.execute(new Task(i, countDownLatch));
      }

      if (!countDownLatch.await(Demo.THREAD_TIMEOUT, TimeUnit.MILLISECONDS)) {
        throws new Exception("涼涼,重啟程序");
      }
      System.out.println("----寫數據開始----");
      System.out.println("----寫數據結束----");
      System.out.println("結束");
    }
  }
}

修改后

如果 countDownLatch.await(timeout, unit) 判斷超時未全部執行完,就遍歷線程池 submit 返回的所有 future,未執行完就中斷,最后再調用 await() 等待中斷的線程執行 countDownLatch.countDown() 完成所有任務,代碼如下:

Task.java:

public class Task implements Runnable {

  private final int sleepTime;
  private final CountDownLatch countDownLatch;

  public Task(int sleepTime, CountDownLatch countDownLatch) {
    this.sleepTime = sleepTime;
    this.countDownLatch = countDownLatch;
  }

  @Override
  public void run() {
    try {
      Thread.sleep(sleepTime == Demo.THREAD_SIZE ? Demo.LONG_RUNNING_THREAD_TIME : Demo.SHORT_RUNNING_THREAD_TIME);
      System.out.println("任務 " + sleepTime + " 干完了");
    } catch (InterruptedException ie) {
      System.out.println("任務 " + sleepTime + " 被中斷");
    } finally {
      countDownLatch.countDown();
    }
  }
}

Demo.java:

public class Demo {
  /**
   * 線程數量
   * 前三個任務執行 {@link Demo#SHORT_RUNNING_THREAD_TIME} ms
   * 最后一個線程運行 {@link Demo#LONG_RUNNING_THREAD_TIME} ms
   */
  public static final int THREAD_SIZE = 4;
  /**
   * 線程超時時間(ms)
   */
  public static final int THREAD_TIMEOUT = 3000;
  /**
   * 超時線程運行的時間(ms)
   */
  public static final int LONG_RUNNING_THREAD_TIME = 20000;
  /**
   * 正常線程運行的時間(ms)
   */
  public static final int SHORT_RUNNING_THREAD_TIME = 10;
  
  public static void main(String[] args) throws InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    while (true) {
      List<Future<Boolean>> futures = new ArrayList<>(THREAD_SIZE);
      CountDownLatch countDownLatch = new CountDownLatch(THREAD_SIZE);
      System.out.println("開始");
      for (int i = 1; i <= THREAD_SIZE; i++) {
        futures.add(executorService.submit(new Task(i, countDownLatch), true));
      }

      if (!countDownLatch.await(Demo.THREAD_TIMEOUT, TimeUnit.MILLISECONDS)) {
        for (Future<Boolean> future : futures) {
          if (!future.isDone()) {
            future.cancel(true);
          }
        }
        countDownLatch.await();
      }
      System.out.println("----寫數據開始----");
      System.out.println("----寫數據結束----");
      System.out.println("結束");
    }
  }
}

輸出結果:

開始
任務 3 干完了
任務 1 干完了
任務 2 干完了
任務 4 被中斷
----寫數據開始----
----寫數據結束----
結束
開始
任務 3 干完了
任務 1 干完了
任務 2 干完了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM