使用ibatis處理大數據量批量插入更新問題


近期項目中遇到一個問題,有一批數據,少則幾百條,多則上萬條,需要向數據庫中進行插入和更新操作,即:當數據庫中有數據時,更新之,沒有數據時,插入之。

解決問題的步驟如下:

首先想到的當然是用先根據條件select count(*) from table where "case",判斷select出的結果,如果結果>0則更新,等於0則插入。

 public Class intefaceImpl implements XXXService{
     //.......
     
     public Result insertOrUpdateXXDOBatch(List<XXDO> xxDos) throws DAOException{
         for(XXDO xxDo:xxDos){
             //IbatisDAO.queryTotalCount對應select count(*)語句
             //  xxDo.getCase()代表的是要更新的條件
             int count=(int)sqlMapClientTemplate.queryForObject("IbatisDAO.queryTotalCount", xxDo.getCase());
             if(count==0){
 sqlMapClientTemplate.update("IbatisDAO.update", xxDo);
         }else {
             sqlMapClientTemplate.insert("IbatisDAO.insert", xxDo);
         }   
   }
 } 

隨后發現這個有漏洞,這些漏洞也是這個問題的難點:

1,數據量大,容易引起大量的數據庫讀寫操作,對數據庫壓力大,而且在讀寫過程中通信消耗也大;

2,由於讀寫時整體放到一個服務接口中的,所以服務會超時。

現在需要一步步解決這些問題,如何解決呢?首先,對於數據量大造成的壓力大和通信消耗大的問題,可以想到的一種方法,就是批量執行,恰好,ibatis就提供了批量執行的方法startBatch/endBatch。

另外還要思考能否簡化數據庫的操作呢?主要的方法,就是不再使用select,方法有兩種:

1,因為我用的是mysql數據庫,所以,可以使用replace語句或on duplicate key update來做(具體語句寫法這里就不多寫了,隨便一搜都能查到);

2,先update一次,如果update的返回值為0,表示update了0行,則insert。

首先我考察了第一種方法,發現不適用,原因是無論是replace或on duplicate key update的原理都是:首先嘗試插入,如果出現主鍵和唯一索引沖突,則進行更新。但問題是,哥的這個變態需求里,查詢的條件既不是主鍵也不是唯一索引,於是這個方法1不行。

那么只能使用方法2了,使用spring的回調機制和ibatis,編碼如下(下面的編碼參考了http://www.cnblogs.com/sunwei2012/archive/2010/11/26/1888497.html):

public int insertOrUpdateXXDOBatch (final List< XXDO> xxDOs) throws DAOException {
       if (null == sqlMapClientTemplate.getSqlMapClient()) {
               throw new DAOException("未初始化好sqlMapClientTemplate,請檢查配置及init方法" );
       }
       // 執行回調
       sqlMapClientTemplate.execute( new SqlMapClientCallback() {
               // 實現回調接口
               public Object doInSqlMapClient( SqlMapExecutor executor)
                               throws SQLException {
                       // 開始批處理
                       executor. startBatch();
                       for (XXDO xxDO : xxDOs) {
                               if (null != xxDO && null != xxDO.getCase()) {
                                       int updateCount = executor.update("IbatisDAO.update", xxDO);
                                       if (updateCount == 0 ) {
                                        executor.update("IbatisDAO.insert", xxDO);// 這里是用update方法來執行insert語句,以便於批處理
                                       }
                               }
                       }
                       // 執行批處理
                       executor. executeBatch();
                       return null;
               }
       });
       return 0 ;
}

需要注意,在ibatis中,可以使用update方法來執行insert語句。

然而,在運行時,又出現了問題:既更新了一條,又插入了一條數據;

這個問題檢查一下代碼就能發現,這跟ibatis的executeBatch有關,在ibatis中,批量執行實質上是將一系列sql語句組合成一個幾個,然后一次性push到數據庫去執行。這樣既能提高執行效率,又能降低通信消耗;然而,在本代碼中if(updateCount==0)這個判斷其實是沒有起到任何作用的,因為executeBatch是不管這個判斷直接將update語句和insert語句給組合到一個集合中批量執行。

為了解決這個問題,我只能直接使用兩條update語句,第一條語句直接將原來的數據邏輯delete掉,第二條insert進一條數據,代碼如下:

public int insertOrUpdateXXDOBatch (final List< XXDO> xxDOs) throws DAOException {
       if (null == sqlMapClientTemplate.getSqlMapClient()) {
               throw new DAOException("未初始化好sqlMapClientTemplate,請檢查配置及init方法" );
       }
       // 執行回調
       sqlMapClientTemplate.execute( new SqlMapClientCallback() {
               // 實現回調接口
               public Object doInSqlMapClient( SqlMapExecutor executor)
                               throws SQLException {
                       // 開始批處理
                       executor. startBatch();
                       for (XXDO xxDO : xxDOs) {
                               if (null != xxDO && null != xxDO.getCase()) {
                      executor.update("IbatisDAO.logicDelete" , xxDO);
                      executor.update("IbatisDAO.insert", xxDO);// 這里是用update方法來執行insert語句,以便於批處理
                               }
                       }
                       // 執行批處理
                       executor. executeBatch();
                       return null;
               }
       });
       return 0 ;
}

這仍然存在一個問題:先delete,再insert,在delete和insert的瞬間如果有讀取,就會出現數據丟失。所以最好的辦法,是在這里加上一個鎖,不過兩個方面的原因沒有這么做:

業務上出現在delete之后立即讀取的可能性幾乎沒有

本人能力暫時還不足,對於鎖機制了解還不足,所以將在以后對這一方面了解的更加深入的時候,再進行優化。

至於服務超時的問題,解決方案很簡單,將上面的批量執行代碼寫在一個Runable的實現類中,並使用下面的代碼來實現一個異步線程,在服務中啟動這個線程,之后立即返回,異步線程將批量執行的結果寫入數據庫中的一個字段里,查看結果時,從數據庫中讀取該結果即可。

     ExecutorService exec = Executors.newSingleThreadExecutor();
     exec.execute(new BatchUpdateThread(xxDos));
     exec.shutdown();

水平有限,歡迎留言指正。

轉載請注明出處,含鏈接:http://www.cnblogs.com/zhguang/archive/2013/05/23/3094615.html

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM