示例代碼可以從github上獲取
https://github.com/git-simm/simm-framework.git
一、業務場景:
系統中存在多種場景並發操作事務執行時互鎖的情況,導致任務積壓,系統崩潰。先做了各場景業務的性能調整,但是並發互鎖依然無法避免。於是開始考慮選取調用頻繁的同步功能作為死鎖的犧牲品,取消執行,釋放鎖。
二、處理方案:
在這里優先選擇FutureTask.cancel方案。核心思想是 調用FutureTask的get方法時,設置超時時長。接收到超時異常后,調用cancel方法,中斷線程。當然,實際來看這個方案也滿足不了我的業務需要。它存在以下兩個局限:
- cancel方法只是向子線程發起中斷請求,是否能夠中斷取決於子線程自身,不能確定子線程會在哪一步操作退出,加入啟用的有事務,這個事務可能回滾了,也可能提交成功了。因此,我們需要借用synchronized功能,讓父子線程通訊,來明確獲得子線程的運行狀態;
- 子線程中執行數據庫操作,引起死鎖等待,這種情況下cancle操作是不能取消任務了,只能等到事務超時。這個問題由於cancel無法強制關閉線程,因此無法用FutureTask方案。
以下實現依然圍繞FutureTask這個方案來將,只是添加父子線程通訊,明確獲取子線程狀態的實現。
三、代碼實現:
3.1、創建一個FTaskEndFlag的線程同步標志。父線程等待子線程反饋執行結果后,再執行后續的邏輯;
package simm.framework.threadutils.interrupt; import java.util.concurrent.TimeoutException; /** * futuretask運行終止事件通知 * 2018.09.22 by simm */ public class FTaskEndFlag { private volatile boolean isNormaled = false; private volatile boolean fired = false; private Exception exception =null; public boolean isNormaled() { return isNormaled; } /** * 獲取子線程異常信息 * @return */ public Exception getException() { return exception; } /** * 通知結束 * @param result * @param result */ public synchronized void notifyEnd(boolean result){ isNormaled = result; fired = true; notifyAll(); } /** * 通知結束 * @param result * @param result */ public synchronized void notifyEnd(boolean result,Exception ex){ isNormaled = result; exception = ex; fired = true; notifyAll(); } /** * 執行結束通知 */ public synchronized void waitForEnd() throws InterruptedException { while (!fired) { //子線程掛起,釋放synchronized同步塊 wait(); } } /** * 等待 */ private void waitFunc(long millis){ try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
3.2、創建一個BaseFutureTask的抽象類,內置FTaskEndFlag線程同步標志;
package simm.framework.threadutils.interrupt; import java.util.concurrent.Callable; /** * 基礎任務 * 2018.09.22 by simm */ public abstract class BaseFutureTask implements Callable<Boolean> { /** * futuretask 等待標志 */ private FTaskEndFlag flag = new FTaskEndFlag(); public FTaskEndFlag getFlag() { return flag; } }
3.3、創建一個超時重試的工具類,對FutureTask的結果獲取設置超時時間;
package simm.framework.threadutils.interrupt; import java.lang.reflect.Constructor; import java.util.List; import java.util.concurrent.*; /** * 方法超時重試工具 * 2018.09.20 by simm */ public class RetryUtil { /** * 可緩存線程執行器(依jvm情況自行回收創建) */ private static ExecutorService executorService = Executors.newCachedThreadPool(); /** * 默認方法(3秒超時,重試3次) * @param callable * @return * @throws InterruptedException * @throws ExecutionException * @throws TimeoutException */ public static Boolean execute(BaseFutureTask callable) throws InterruptedException, ExecutionException, TimeoutException { return execute(callable,3000,1000,3); } /** * 方法超時控制 * @param callable 方法體 * @param timeout 超時時長 * @param interval 間隔時長 * @param retryTimes 重試次數 * @return * @throws ExecutionException * @throws InterruptedException * @throws TimeoutException */ public static Boolean execute(BaseFutureTask callable, long timeout,long interval, int retryTimes) throws ExecutionException, InterruptedException, TimeoutException { Boolean result = false; FutureTask<Boolean> futureTask = new FutureTask<>(callable); executorService.execute(futureTask); try { result = futureTask.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); futureTask.cancel(true); throw e; }catch(TimeoutException e){ futureTask.cancel(true); callable.getFlag().waitForEnd(); if(callable.getFlag().isNormaled()){ return true; } e.printStackTrace(); //超時重試 retryTimes--; if(retryTimes > 0){ Thread.sleep(interval); execute(callable,timeout,interval,retryTimes); }else{ throw e; } } return result; } }
四、給出一個調用代碼。實現一個繼承自BaseFutureTask的 FutureTask任務。依舊需要注意子線程涉及到spring的組件,最好是參數從主線程注入到子線程。
RetryUtil.execute(new SyncProductTask(productBiz,productInfo),timeout,interval,3);
參考文章
https://www.jianshu.com/p/55221d045f39