為什么要拒絕使用大事務進行處理任務?


  前話: 不要迷戀事務,大事務會拖垮你的用戶!

  相信很多應用都需要進行一些后台任務的處理,這時候應對的,往往是大批量的數據。比如:對數據進行匯總結算,需要全表掃描,更新; 對用戶訂單狀態進行更新,需要全表掃描,進行更新; 對用戶的會員有效期處理,也需要全表掃描,更新!
  應對這樣的場景,就是定時任務job的職責范疇了。
  那么問題來了,這樣的場景需要進行事務控制嗎? 我覺得這個得看業務需求,比如這個狀態不是很重要,那么可以不用進行事務控制。但是更多時候,我們希望是有事務的,因為往往更新不會是單表的。
  在spring中,有一個簡單的注解,即可以幫忙實現事務的控制。 

@Transactional(readOnly = false, rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)

 

  一個事務搞定!但是問題來了,這里報錯了。

java.lang.Exception:
### Error updating database.  Cause: java.sql.SQLException: Lock wait timeout exceeded; try restarting transaction
### The error may involve defaultParameterMap
### The error occurred while setting parameters
### Cause: java.sql.SQLException: Lock wait timeout exceeded; try restarting transaction
; SQL []; Lock wait timeout exceeded; try restarting transaction; nested exception is java.sql.SQLException: Lock wait timeout exceeded; try restarting transaction

      究其原因,就是mysql 鎖等待超時。這里的等待超時有兩種情況,

          一是該定時任務后執行,是在等待別的客戶端釋放鎖,而遲遲未得到從而超時。

          二是其他客戶端在操作時,由於被該定時任務長時間的占有鎖,從而導致其等待超時。

     當然,更多的可能是第二種。為什么呢?  因為定時任務往往需要處理大量的數據,這時,如果使用了一個最外圍的事務,那么相當於整個腳本都是運行在該事務中,只要該腳本還未運行完成,那么事務就不會提交,也就不會釋放他占有的鎖資源。所以,問題就在這里了。所以,我們得避免進行大事務的形成就很有必要了。

  事實上,事務的目的是為了保證數據的原子性,准確性,那么也就是說,只要你需要保證的數據做到了,就可以進行事務提交了。所以,可以將大事務拆小,即保證最小事務的執行即可。如:更新一個用戶的會員狀態,那么只需要查出相關信息,更改狀態,寫入相應記錄,該事務即可提交。

  將大事務拆小后,就可以做到快速釋放鎖的作用,從而避免了其他客戶端的鎖等待超時問題了。

 

樣例: 更新用戶的賬單狀態,步驟為: 查詢出所有需要更新的賬單數量 >> 定稿job執行開始記錄 >> 更新每個訂單狀態 >> 寫入job執行結束標識 >> 完成!

  該過程,主要耗時是在對每個用戶的賬單更新,因此,可以將該處作為事務拆小的依據,具體代碼如下:

  主事務進行總體控制

 // 使用新線程進行具體執行功能,需另起事務控制或接收原有事務
    @Override
    public Integer updateBorrowStatus(JobParamBean jobParamBean) {
        String method = "updateBorrowStatus";
        logger.info("enter {} method, jobParamBean:{}", method, jobParamBean);
        // 更新數據庫
        Integer result = null;
        Map<String, Object> cond = new HashMap<>();
        //borrowApplyTimeStart, borrowApplyTimeEnd, borrowStatusList, pageStart, perPageNum
//        cond.put("borrowApplyTimeStart", jobParamBean.getStartTime());
//        cond.put("borrowApplyTimeEnd", jobParamBean.getEndTime());
        cond.put("shouldRepayTimeStart", jobParamBean.getStartTime());
        cond.put("shouldRepayTimeEnd", jobParamBean.getEndTime());

        List<String> borrowStatusList = new ArrayList<>();
        borrowStatusList.add("3");
        cond.put("borrowStatusList", borrowStatusList);

        Integer totalUpdate = borrowMapper.countUsersBorrowListByMap(cond);
        String dubboConsumerIp = jobParamBean.getDubboConsumerIp();
        String myServerIp = "127.0.0.1";
        try {
            myServerIp = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            logger.error("get local server ip error:{}", e);
        }

        Date now = new Date();
        JobExecRecordEntity recordEntity = new JobExecRecordEntity();
        BeanUtils.copyProperties(jobParamBean, recordEntity);
        recordEntity.setJobName("autoUpdateBorrowStatus");
        recordEntity.setExecDateStart(jobParamBean.getEndTime());           // 結束時間為最近時間
        recordEntity.setExecDateEnd(jobParamBean.getStartTime());
        recordEntity.setTotalAffectNum(totalUpdate);
        recordEntity.setReqParams(JSONObject.toJSONString(jobParamBean));
        recordEntity.setJobParams(JSONObject.toJSONString(cond));
        recordEntity.setExecServerIp(myServerIp);
        recordEntity.setReqServerIp(dubboConsumerIp);
        recordEntity.setJobEndTime(DateUtils.convert(now, "yyyy-MM-dd HH:mm:ss.S"));
        Integer recordAffectNum = jobExecRecordMapper.addJobExecRecord(recordEntity);
        Long recordId = recordEntity.getId();

        Integer realUpdateNum = 0;
        if(totalUpdate != null && totalUpdate > 0) {
            Integer pageStart = 0;
            Integer perPageNum = 10;
            String nowDateStr = DateUtils.convert(now, "yyyy-MM-dd");      //當前時間
            for(int i = 0; i < totalUpdate; i += perPageNum) {
                pageStart = i;
                cond.put("pageStart", pageStart);
                cond.put("perPageNum", perPageNum);
                Integer thisAffectNum = platformAssistantBusiness.pieceUpdateBorrowStatus(cond, nowDateStr);      // 使用輔助類進行小事務的拆分
                realUpdateNum += thisAffectNum;
                recordEntity.setCurrentAffectNum(perPageNum);
                recordEntity.setRealAffectNum(thisAffectNum);
                recordEntity.setStatus("1");
                jobExecRecordMapper.updateJobExecRecord(recordEntity);
            }
        }
        recordEntity.setStatus("5");
        recordEntity.setCurrentAffectNum(0);
        recordEntity.setRealAffectNum(0);
        String jobEndTime = DateUtils.convert(new Date(), "yyyy-MM-dd HH:mm:ss.S");
        recordEntity.setJobEndTime(jobEndTime);
        jobExecRecordMapper.updateJobExecRecord(recordEntity);

        result = totalUpdate;
        logger.info("exit {} method, result:{}", method, result);
        return result;
    }

  小事務放在另一方法中,以確保事務生效!

    @Override
    @Transactional(readOnly = false, rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
    public Integer pieceUpdateBorrowStatus(Map<String, Object> cond, String nowDateStr) {
        String shouldRepayDate = "";
        Long lateDays = 0L;
        String borrowStatus;
        Integer thisAffectNum = 0;
        List<UsersBorrowEntity> pageList1 = borrowMapper.getUsersBorrowListByMap(cond);
        for(UsersBorrowEntity borrowEntity1 : pageList1) {
            UsersBorrowEntity updateBorrowEntity = new UsersBorrowEntity();
            updateBorrowEntity.setId(borrowEntity1.getId());
            updateBorrowEntity.setUserId(borrowEntity1.getUserId());
            shouldRepayDate = borrowEntity1.getShouldRepayTime();
            lateDays = DateUtils.getDayDiff(shouldRepayDate, nowDateStr);
            if(lateDays != null && lateDays >= 0 && !"8".equals(borrowEntity1.getBorrowStatus())) {
                if(lateDays == 0) {
                    borrowStatus = "5";
                } else {
                    borrowStatus = "6";
                }
                updateBorrowEntity.setBorrowStatus(borrowStatus);
                updateBorrowEntity.setRepayStatus(CommonUtil.getRepayStatusByBorrowStatus(borrowStatus));
                updateBorrowEntity.setLateDays(lateDays.intValue());
                Integer affectNum = usersBorrowMapper.updateUsersBorrow(updateBorrowEntity);
                thisAffectNum += affectNum;
            }
        }
        return thisAffectNum;
    }

  這樣就保證了,執行的完整性,然后,每10個小事務就進行提交一次。從而解決鎖超時問題了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM