MyBatis批量操作


源碼基於MyBatis 3.4.6

如何使用

MyBatis內部提供了批量執行SQL的功能,當然這也只是對JDBC的一個包裝。在介紹MyBatis中如何使用批量功能前,先來段原生的JDBC代碼,看看如何執行一個批量SQL。大多數使用批量執行功能時,大多數都是對同一條SQL語句反復執行插入、更新、刪除,只是傳遞的參數不一致。在接下來的代碼中我將向MySQL中批量插入100000條數據。

/**
 * 實體類
 */
@Getter
@Setter
public class Example {

    private Integer id;

    private String name;

    private Integer age;
}

/**
 * 測試類
 */
public class ExampleMapperTest {

	// 准備100000條數據
    private List<Example> prepareData() {
        List<Example> examples = new ArrayList<>();
        Random random = new Random();
        for(int i = 1; i <= 100000; i++) {
            Example example = new Example();
            example.setId(i);
            example.setName("example-" + i);
            example.setAge(random.nextInt(101));
            examples.add(example);
        }
        return examples;
    }

    // 執行批量插入
    @Test
    public void testJDBCBatch() throws SQLException {
        String url = "jdbc:mysql://127.0.0.1:3306/test";
        long start = System.currentTimeMillis();
        Connection conn = DriverManager.getConnection(url, "root", "123456");
        List<Example> examples = prepareData();
        try {
            // 開啟事務
            conn.setAutoCommit(false);
            PreparedStatement ps = conn.prepareStatement("INSERT INTO example(id, name, age) VALUES (?, ?, ?)");
            for(Example example : examples) {
                // 設置參數
                ps.setInt(1, example.getId());
                ps.setString(2, example.getName());
                ps.setInt(3, example.getAge());
                // 添加到批量語句集合中
                ps.addBatch();
            }
            // 執行批量操作
            int[] updateCounts = ps.executeBatch();
            // 提交事務
            conn.commit();
            long end = System.currentTimeMillis();
            System.out.println("批量執行耗時: " + (end - start) / 1000.0 + "s");
            for(int updateCount : updateCounts) {
                Assert.assertEquals(1, updateCount);
            }
        } catch (SQLException e) {
            conn.rollback();
            throw e;
        } finally {
            conn.close();
        }
    }
}

我對上述批量插入操作進行了一個簡單的測試,並且統計了執行時間,在MySQL5.6中,批量插入100000條數據大約需要10秒左右。

接下來看看如何在MyBatis中如何執行批量操作,上面用到的實體類就不貼了。

/**
 * 獲取SqlSession的工具類
 */
public class MyBatisUtils {

    private final static SqlSessionFactory sqlSessionFactory;

    static{
        try {
            Reader reader = Resources.getResourceAsReader("mybatis-config.xml");
            sqlSessionFactory = new SqlSessionFactoryBuilder().build(reader);
        } catch (IOException e) {
            throw new RuntimeException("sqlSessionFactory init fail.", e);
        }
    }

    public static SqlSession getSqlSession(ExecutorType executorType) {
        return sqlSessionFactory.openSession(executorType);
    }
}

/**
 * Mapper接口,為了方便直接使用注解的方式。
 */
public interface ExampleMapper {

    @Insert(value = "INSERT INTO example(id, name, age) VALUES (#{id}, #{name}, #{age})")
    int insert(Example example);
}

/**
 * 測試類
 */
public class ExampleMapperTest {

    @Test
    public void testBatch() {
        // 准備數據,上面已經貼過了。
        List<Example> examples = prepareData();
        // 關鍵,如果使用批量功能,需要使用BatchExecutor而不是默認的SimpleExecutor
        try (SqlSession session = MyBatisUtils.getSqlSession(ExecutorType.BATCH)) {
            ExampleMapper exampleMapper = session.getMapper(ExampleMapper.class);
            for(Example example : examples) {
                // 因為使用的Executor是BatchExecutor, 並不會真的執行
                // 內部調用的是statement.addBatch()
                exampleMapper.insert(example);
            }
            // 執行批量操作, 內部調用的是statement.executeBatch()
            // 如果不需要返回值可以不用顯示調用,commit方法內部會調用此方法
            List<BatchResult> results = session.flushStatements();
            session.commit();
            Assert.assertEquals(1, results.size());
            BatchResult result = results.get(0);
            for(int updateCount : result.getUpdateCounts()) {
                Assert.assertEquals(1, updateCount);
            }
        }
    }
}

這里可能需要說明的是flushStatements方法了,此方法定義在SqlSession接口中,簽名如下

List<BatchResult> flushStatements();

此方法的作用就是將前面所有執行過的INSERT、UPDATE、DELETE語句真正刷新到數據庫中。底層調用了JDBC的statement.executeBatch方法。這里有疑惑的肯定是這個方法的返回值了。通俗的說如果執行的是同一個方法並且執行的是同一條SQL,注意這里的SQL還沒有設置參數,也就是說SQL里的占位符'?'還沒有被處理成真正的參數,那么每次執行的結果共用一個BatchResult,真正的結果可以通過BatchResult中的getUpdateCounts方法獲取。

按照上面例子,執行的是ExampleMapper中的insert方法,執行的SQL語句是INSERT INTO example(id, name, age) VALUES (?, ?, ?)。這100000萬次執行的都是同一個方法同一條SQL語句。那么返回值中只有一個BatchResultgetUpdateCounts返回的數組大小是100000,代表着每一次執行的結果。

源碼分析

SqlSession這個接口操作數據庫的功能都是Executor接口實現的,而批量功能正是由上面提到過的BatchExecutor實現。SqlSession接口中的updateinsertdelete最終都會調用Executorupdate方法。

// 位於父類BaseExecutor中,BatchExecutor繼承了BaseExecutor
@Override
public int update(MappedStatement ms, Object parameter) throws SQLException {
    ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    // 清空本地緩存
    clearLocalCache();
    // 對數據庫做INSERT、UPDATE、DELETE操作,由子類實現
    return doUpdate(ms, parameter);
}

接下來看BatchExecutor

public class BatchExecutor extends BaseExecutor {

  public static final int BATCH_UPDATE_RETURN_VALUE = Integer.MIN_VALUE + 1002;
  /**
   * Statement集合,如果執行的方法不一樣或者SQL語句不同
   * 都會創建一個Statement
   */
  private final List<Statement> statementList = new ArrayList<Statement>();
  /**
   * 與上面一一對應,用來保存每一個statement的執行結果。
   */
  private final List<BatchResult> batchResultList = new ArrayList<BatchResult>();
  // 當前的SQL語句
  private String currentSql;
  // XML中的每一個<insert><update><delete><select>標簽最終被解析成
  // 一個個的MappedStatement對象,該對象的id就是名稱空間+標簽id
  // 也就是所說的接口完全限定名 + "." + 方法名字
  private MappedStatement currentStatement;

  public BatchExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
  }

  @Override
  public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
    final Configuration configuration = ms.getConfiguration();
    final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
    final BoundSql boundSql = handler.getBoundSql();
    // 獲取需要執行的SQL,這里的SQL中參數都已經被解析成?占位符了
    // 就是#{id, jdbcType=INTEGER}  ->  ?
    final String sql = boundSql.getSql();
    final Statement stmt;
    // 如果SQL相同並且是同一個<update> | <insert> | <delete>標簽
    // 為什么還需要判斷<update> | <insert> | <delete>,因為這些標簽配置的屬性
    // 會影響具體的執行行為。
    if (sql.equals(currentSql) && ms.equals(currentStatement)) {
      int last = statementList.size() - 1;
      stmt = statementList.get(last);
      applyTransactionTimeout(stmt);
      // 設置參數
      handler.parameterize(stmt);//fix Issues 322
      BatchResult batchResult = batchResultList.get(last);
      batchResult.addParameterObject(parameterObject);
    } else {
      // 初始化Statement
      Connection connection = getConnection(ms.getStatementLog());
      stmt = handler.prepare(connection, transaction.getTimeout());
      handler.parameterize(stmt);    //fix Issues 322
      currentSql = sql;
      currentStatement = ms;
      statementList.add(stmt);
      batchResultList.add(new BatchResult(ms, sql, parameterObject));
    }
    // 調用addBatch方法
    handler.batch(stmt);
    return BATCH_UPDATE_RETURN_VALUE;
  }
}

下面來看看flushStatements方法

// 位於父類BaseExecutor中,BatchExecutor繼承了BaseExecutor
@Override
public List<BatchResult> flushStatements() throws SQLException {
    return flushStatements(false);
}

/**
 * isRollBack: 如果事務發生回滾,此參數的值設置成true
 */
public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    return doFlushStatements(isRollBack);
}

@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    try {
        List<BatchResult> results = new ArrayList<BatchResult>();
        // 直接返回
        if (isRollback) {
            return Collections.emptyList();
        }
        for (int i = 0, n = statementList.size(); i < n; i++) {
            Statement stmt = statementList.get(i);
            applyTransactionTimeout(stmt);
            BatchResult batchResult = batchResultList.get(i);
            try {
                // 執行批量操作,並將結果保存到batchResult中
                batchResult.setUpdateCounts(stmt.executeBatch());
                MappedStatement ms = batchResult.getMappedStatement();
                List<Object> parameterObjects = batchResult.getParameterObjects();
                // 以下僅針對insert語句,用於返回自增主鍵
                KeyGenerator keyGenerator = ms.getKeyGenerator();
                if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
                    Jdbc3KeyGenerator jdbc3KeyGenerator = 
                        (Jdbc3KeyGenerator) keyGenerator;
                    jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
                } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { 
                    for (Object parameter : parameterObjects) {
                        keyGenerator.processAfter(this, ms, stmt, parameter);
                    }
                }
                closeStatement(stmt);
            } catch (BatchUpdateException e) {
                // 省略異常處理
            }
            results.add(batchResult);
        }
        return results;
    } finally {
        for (Statement stmt : statementList) {
            closeStatement(stmt);
        }
        // 清空
        currentSql = null;
        statementList.clear();
        batchResultList.clear();
    }
}

總結

本文主要講解了如何在MyBatis中使用批量操作的功能,並且對源碼的關鍵位置進行了說明。下面主要總結下MyBatis中批量處理的行為。

  • 如果執行了SELECT操作,那么會將先前的UPDATE、INSERT、DELETE語句刷新到數據庫中。這一點去看BatchExecutor中的doQuery方法即可。
  • MyBatis會在commitrollback方法中調用flushStatements刷新語句。其中commit會調用flushStatements(),而rollback會調用flushStatements(true),也就是如果回滾那就不再執行批量操作了。因此即使沒有顯示調用flushStatements方法,MyBatis也會保證批量操作正常執行。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM