源碼基於MyBatis 3.4.6
如何使用
MyBatis內部提供了批量執行SQL的功能,當然這也只是對JDBC的一個包裝。在介紹MyBatis中如何使用批量功能前,先來段原生的JDBC代碼,看看如何執行一個批量SQL。大多數使用批量執行功能時,大多數都是對同一條SQL語句反復執行插入、更新、刪除,只是傳遞的參數不一致。在接下來的代碼中我將向MySQL中批量插入100000條數據。
/**
* 實體類
*/
@Getter
@Setter
public class Example {
private Integer id;
private String name;
private Integer age;
}
/**
* 測試類
*/
public class ExampleMapperTest {
// 准備100000條數據
private List<Example> prepareData() {
List<Example> examples = new ArrayList<>();
Random random = new Random();
for(int i = 1; i <= 100000; i++) {
Example example = new Example();
example.setId(i);
example.setName("example-" + i);
example.setAge(random.nextInt(101));
examples.add(example);
}
return examples;
}
// 執行批量插入
@Test
public void testJDBCBatch() throws SQLException {
String url = "jdbc:mysql://127.0.0.1:3306/test";
long start = System.currentTimeMillis();
Connection conn = DriverManager.getConnection(url, "root", "123456");
List<Example> examples = prepareData();
try {
// 開啟事務
conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement("INSERT INTO example(id, name, age) VALUES (?, ?, ?)");
for(Example example : examples) {
// 設置參數
ps.setInt(1, example.getId());
ps.setString(2, example.getName());
ps.setInt(3, example.getAge());
// 添加到批量語句集合中
ps.addBatch();
}
// 執行批量操作
int[] updateCounts = ps.executeBatch();
// 提交事務
conn.commit();
long end = System.currentTimeMillis();
System.out.println("批量執行耗時: " + (end - start) / 1000.0 + "s");
for(int updateCount : updateCounts) {
Assert.assertEquals(1, updateCount);
}
} catch (SQLException e) {
conn.rollback();
throw e;
} finally {
conn.close();
}
}
}
我對上述批量插入操作進行了一個簡單的測試,並且統計了執行時間,在MySQL5.6中,批量插入100000條數據大約需要10秒左右。
接下來看看如何在MyBatis中如何執行批量操作,上面用到的實體類就不貼了。
/**
* 獲取SqlSession的工具類
*/
public class MyBatisUtils {
private final static SqlSessionFactory sqlSessionFactory;
static{
try {
Reader reader = Resources.getResourceAsReader("mybatis-config.xml");
sqlSessionFactory = new SqlSessionFactoryBuilder().build(reader);
} catch (IOException e) {
throw new RuntimeException("sqlSessionFactory init fail.", e);
}
}
public static SqlSession getSqlSession(ExecutorType executorType) {
return sqlSessionFactory.openSession(executorType);
}
}
/**
* Mapper接口,為了方便直接使用注解的方式。
*/
public interface ExampleMapper {
@Insert(value = "INSERT INTO example(id, name, age) VALUES (#{id}, #{name}, #{age})")
int insert(Example example);
}
/**
* 測試類
*/
public class ExampleMapperTest {
@Test
public void testBatch() {
// 准備數據,上面已經貼過了。
List<Example> examples = prepareData();
// 關鍵,如果使用批量功能,需要使用BatchExecutor而不是默認的SimpleExecutor
try (SqlSession session = MyBatisUtils.getSqlSession(ExecutorType.BATCH)) {
ExampleMapper exampleMapper = session.getMapper(ExampleMapper.class);
for(Example example : examples) {
// 因為使用的Executor是BatchExecutor, 並不會真的執行
// 內部調用的是statement.addBatch()
exampleMapper.insert(example);
}
// 執行批量操作, 內部調用的是statement.executeBatch()
// 如果不需要返回值可以不用顯示調用,commit方法內部會調用此方法
List<BatchResult> results = session.flushStatements();
session.commit();
Assert.assertEquals(1, results.size());
BatchResult result = results.get(0);
for(int updateCount : result.getUpdateCounts()) {
Assert.assertEquals(1, updateCount);
}
}
}
}
這里可能需要說明的是flushStatements
方法了,此方法定義在SqlSession
接口中,簽名如下
List<BatchResult> flushStatements();
此方法的作用就是將前面所有執行過的INSERT、UPDATE、DELETE語句真正刷新到數據庫中。底層調用了JDBC的statement.executeBatch
方法。這里有疑惑的肯定是這個方法的返回值了。通俗的說如果執行的是同一個方法並且執行的是同一條SQL,注意這里的SQL還沒有設置參數,也就是說SQL里的占位符'?'還沒有被處理成真正的參數,那么每次執行的結果共用一個BatchResult
,真正的結果可以通過BatchResult
中的getUpdateCounts
方法獲取。
按照上面例子,執行的是ExampleMapper
中的insert
方法,執行的SQL語句是INSERT INTO example(id, name, age) VALUES (?, ?, ?)
。這100000萬次執行的都是同一個方法同一條SQL語句。那么返回值中只有一個BatchResult
,getUpdateCounts
返回的數組大小是100000,代表着每一次執行的結果。
源碼分析
SqlSession
這個接口操作數據庫的功能都是Executor
接口實現的,而批量功能正是由上面提到過的BatchExecutor
實現。SqlSession
接口中的update
、insert
、delete
最終都會調用Executor
的update
方法。
// 位於父類BaseExecutor中,BatchExecutor繼承了BaseExecutor
@Override
public int update(MappedStatement ms, Object parameter) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
// 清空本地緩存
clearLocalCache();
// 對數據庫做INSERT、UPDATE、DELETE操作,由子類實現
return doUpdate(ms, parameter);
}
接下來看BatchExecutor
public class BatchExecutor extends BaseExecutor {
public static final int BATCH_UPDATE_RETURN_VALUE = Integer.MIN_VALUE + 1002;
/**
* Statement集合,如果執行的方法不一樣或者SQL語句不同
* 都會創建一個Statement
*/
private final List<Statement> statementList = new ArrayList<Statement>();
/**
* 與上面一一對應,用來保存每一個statement的執行結果。
*/
private final List<BatchResult> batchResultList = new ArrayList<BatchResult>();
// 當前的SQL語句
private String currentSql;
// XML中的每一個<insert><update><delete><select>標簽最終被解析成
// 一個個的MappedStatement對象,該對象的id就是名稱空間+標簽id
// 也就是所說的接口完全限定名 + "." + 方法名字
private MappedStatement currentStatement;
public BatchExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
@Override
public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
final Configuration configuration = ms.getConfiguration();
final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
final BoundSql boundSql = handler.getBoundSql();
// 獲取需要執行的SQL,這里的SQL中參數都已經被解析成?占位符了
// 就是#{id, jdbcType=INTEGER} -> ?
final String sql = boundSql.getSql();
final Statement stmt;
// 如果SQL相同並且是同一個<update> | <insert> | <delete>標簽
// 為什么還需要判斷<update> | <insert> | <delete>,因為這些標簽配置的屬性
// 會影響具體的執行行為。
if (sql.equals(currentSql) && ms.equals(currentStatement)) {
int last = statementList.size() - 1;
stmt = statementList.get(last);
applyTransactionTimeout(stmt);
// 設置參數
handler.parameterize(stmt);//fix Issues 322
BatchResult batchResult = batchResultList.get(last);
batchResult.addParameterObject(parameterObject);
} else {
// 初始化Statement
Connection connection = getConnection(ms.getStatementLog());
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt); //fix Issues 322
currentSql = sql;
currentStatement = ms;
statementList.add(stmt);
batchResultList.add(new BatchResult(ms, sql, parameterObject));
}
// 調用addBatch方法
handler.batch(stmt);
return BATCH_UPDATE_RETURN_VALUE;
}
}
下面來看看flushStatements
方法
// 位於父類BaseExecutor中,BatchExecutor繼承了BaseExecutor
@Override
public List<BatchResult> flushStatements() throws SQLException {
return flushStatements(false);
}
/**
* isRollBack: 如果事務發生回滾,此參數的值設置成true
*/
public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
return doFlushStatements(isRollBack);
}
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
try {
List<BatchResult> results = new ArrayList<BatchResult>();
// 直接返回
if (isRollback) {
return Collections.emptyList();
}
for (int i = 0, n = statementList.size(); i < n; i++) {
Statement stmt = statementList.get(i);
applyTransactionTimeout(stmt);
BatchResult batchResult = batchResultList.get(i);
try {
// 執行批量操作,並將結果保存到batchResult中
batchResult.setUpdateCounts(stmt.executeBatch());
MappedStatement ms = batchResult.getMappedStatement();
List<Object> parameterObjects = batchResult.getParameterObjects();
// 以下僅針對insert語句,用於返回自增主鍵
KeyGenerator keyGenerator = ms.getKeyGenerator();
if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
Jdbc3KeyGenerator jdbc3KeyGenerator =
(Jdbc3KeyGenerator) keyGenerator;
jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
} else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) {
for (Object parameter : parameterObjects) {
keyGenerator.processAfter(this, ms, stmt, parameter);
}
}
closeStatement(stmt);
} catch (BatchUpdateException e) {
// 省略異常處理
}
results.add(batchResult);
}
return results;
} finally {
for (Statement stmt : statementList) {
closeStatement(stmt);
}
// 清空
currentSql = null;
statementList.clear();
batchResultList.clear();
}
}
總結
本文主要講解了如何在MyBatis中使用批量操作的功能,並且對源碼的關鍵位置進行了說明。下面主要總結下MyBatis中批量處理的行為。
- 如果執行了SELECT操作,那么會將先前的UPDATE、INSERT、DELETE語句刷新到數據庫中。這一點去看
BatchExecutor
中的doQuery
方法即可。 - MyBatis會在
commit
或rollback
方法中調用flushStatements
刷新語句。其中commit
會調用flushStatements()
,而rollback
會調用flushStatements(true)
,也就是如果回滾那就不再執行批量操作了。因此即使沒有顯示調用flushStatements
方法,MyBatis也會保證批量操作正常執行。