FutureTask子任務取消執行的狀態判斷


示例代碼可以從github上獲取  https://github.com/git-simm/simm-framework.git
一、業務場景:
  系統中存在多種場景並發操作事務執行時互鎖的情況,導致任務積壓,系統崩潰。先做了各場景業務的性能調整,但是並發互鎖依然無法避免。於是開始考慮選取調用頻繁的同步功能作為死鎖的犧牲品,取消執行,釋放鎖。
 
二、處理方案:
  在這里優先選擇FutureTask.cancel方案。核心思想是 調用FutureTask的get方法時,設置超時時長。接收到超時異常后,調用cancel方法,中斷線程。當然,實際來看這個方案也滿足不了我的業務需要。它存在以下兩個局限:
  • cancel方法只是向子線程發起中斷請求,是否能夠中斷取決於子線程自身,不能確定子線程會在哪一步操作退出,加入啟用的有事務,這個事務可能回滾了,也可能提交成功了。因此,我們需要借用synchronized功能,讓父子線程通訊,來明確獲得子線程的運行狀態;
  • 子線程中執行數據庫操作,引起死鎖等待,這種情況下cancle操作是不能取消任務了,只能等到事務超時。這個問題由於cancel無法強制關閉線程,因此無法用FutureTask方案。

  以下實現依然圍繞FutureTask這個方案來將,只是添加父子線程通訊,明確獲取子線程狀態的實現。

三、代碼實現:

  3.1、創建一個FTaskEndFlag的線程同步標志。父線程等待子線程反饋執行結果后,再執行后續的邏輯;

package simm.framework.threadutils.interrupt;

import java.util.concurrent.TimeoutException;
/**
 * futuretask運行終止事件通知
 * 2018.09.22 by simm
 */
public class FTaskEndFlag {
    private volatile boolean isNormaled = false;
    private volatile boolean fired = false;
    private Exception exception =null;

    public boolean isNormaled() {
        return isNormaled;
    }

    /**
     * 獲取子線程異常信息
     * @return
     */
    public Exception getException() {
        return exception;
    }

    /**
     * 通知結束
     * @param result
     * @param result
     */
    public synchronized void notifyEnd(boolean result){
        isNormaled = result;
        fired = true;
        notifyAll();
    }

    /**
     * 通知結束
     * @param result
     * @param result
     */
    public synchronized void notifyEnd(boolean result,Exception ex){
        isNormaled = result;
        exception = ex;
        fired = true;
        notifyAll();
    }

    /**
     * 執行結束通知
     */
    public synchronized void waitForEnd() throws InterruptedException {
        while (!fired) {
            //子線程掛起,釋放synchronized同步塊
            wait();
        }
    }
   /**
     * 等待
     */
    private void waitFunc(long millis){
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  3.2、創建一個BaseFutureTask的抽象類,內置FTaskEndFlag線程同步標志;

package simm.framework.threadutils.interrupt;

import java.util.concurrent.Callable;

/**
 * 基礎任務
 * 2018.09.22 by simm
 */
public abstract class BaseFutureTask implements Callable<Boolean> {
    /**
     * futuretask 等待標志
     */
    private FTaskEndFlag flag = new FTaskEndFlag();

    public FTaskEndFlag getFlag() {
        return flag;
    }
}

  3.3、創建一個超時重試的工具類,對FutureTask的結果獲取設置超時時間;

package simm.framework.threadutils.interrupt;

import java.lang.reflect.Constructor;
import java.util.List;
import java.util.concurrent.*;
/**
 * 方法超時重試工具
 * 2018.09.20  by simm
 */
public class RetryUtil {
    /**
     * 可緩存線程執行器(依jvm情況自行回收創建)
     */
    private static ExecutorService executorService = Executors.newCachedThreadPool();

    /**
     * 默認方法(3秒超時,重試3次)
     * @param callable
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException
     */
    public static Boolean execute(BaseFutureTask callable) throws InterruptedException, ExecutionException, TimeoutException {
        return execute(callable,3000,1000,3);
    }

    /**
     * 方法超時控制
     * @param callable 方法體
     * @param timeout 超時時長
     * @param interval 間隔時長
     * @param retryTimes 重試次數
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     * @throws TimeoutException
     */
    public static Boolean execute(BaseFutureTask callable, long timeout,long interval, int retryTimes) throws ExecutionException, InterruptedException, TimeoutException {
        Boolean result = false;
        FutureTask<Boolean> futureTask = new FutureTask<>(callable);
        executorService.execute(futureTask);
        try {
            result = futureTask.get(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            futureTask.cancel(true);
            throw e;
        }catch(TimeoutException e){
            futureTask.cancel(true);
            callable.getFlag().waitForEnd();
            if(callable.getFlag().isNormaled()){
                return true;
            }
            e.printStackTrace();
            //超時重試
            retryTimes--;
            if(retryTimes > 0){
                Thread.sleep(interval);
                execute(callable,timeout,interval,retryTimes);
            }else{
                throw e;
            }
        }
        return result;
    }
}

四、給出一個調用代碼。實現一個繼承自BaseFutureTask的 FutureTask任務。依舊需要注意子線程涉及到spring的組件,最好是參數從主線程注入到子線程。

RetryUtil.execute(new SyncProductTask(productBiz,productInfo),timeout,interval,3);

 

參考文章

https://www.jianshu.com/p/55221d045f39


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM