-----------------------------------
20170528 第二次編輯:主要是補充mysql相關內容。
-----------------------------------
mysql支持batch提交改進方案:聲明:mysql仍然沒有內部游標,讓數據庫支持executeBatch的方式處理。
MySql 的批量操作,要加rewriteBatchedStatements參數
引用“
MySql的JDBC連接的url中要加rewriteBatchedStatements參數,並保證5.1.13以上版本的驅動,才能實現高性能的批量插入。
例如: String connectionUrl="jdbc:mysql://192.168.1.100:3306/test?rewriteBatchedStatements=true" ;
還要保證Mysql JDBC驅的版本。MySql的JDBC驅動的批量插入操作性能是很優秀的。
”
原文鏈接:http://elf8848.iteye.com/blog/770032 (建議去看看這個鏈接的評論區)
參考資料:https://www.oschina.net/question/2553117_2162171?sort=time
20170528親測,插入26663條數據,
加上rewriteBatchedStatements后,耗時:3734毫秒
不加rewriteBatchedStatements前,耗時:672551毫秒
mysql裝在本機上,行字段數多,僅從本次測試看,性能提高了180倍。
---------------------------------------------------
第一版:20170526 原創
---------------------------------------------------
前言:做一個數據同步項目,要求:同步數據不丟失的情況下,提高插入性能。
項目DB框架:Mybatis。DataBase:Oracle。
----------------------------------------------------------------------------
批量插入數據方式:
一、Mybatis 全局設置批處理;
二、Mybatis 局部設置批處理;
三、Mybatis foreach批量插入:
①SELECT UNION ALL;
②BEGIN INSERT INTO ...;INSERT INTO...;...;END;
四、java自帶的批處理插入;
五、其他方式
-----------------------------------------------------------------------------
先說結論:Mybatis(全局/局部)批處理和java自帶的批處理 性能上差不多,屬於最優處理辦法,我這邊各種測試后,最后采用Mybatis局部批處理方式。
一、Mybatis 全局設置批處理
先上Spring-Mybatis.xml 配置信息

1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" 5 xsi:schemaLocation="http://www.springframework.org/schema/beans 6 http://www.springframework.org/schema/beans/spring-beans.xsd 7 http://www.springframework.org/schema/context 8 http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"> 9 10 <!-- 自動掃描(自動注入) --> 11 <context:annotation-config/> 12 <context:component-scan base-package="com.company.dao"/> 13 14 <!-- 動態數據源 --> 15 <bean id="dataSource" class="com.company.dao.datasource.DataSource"> 16 <property name="myConfigFile" value="mySource.xml"/> 17 </bean> 18 19 <!-- mybatis配置 --> 20 <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> 21 <property name="dataSource" ref="dataSource"/> 22 <property name="mapperLocations" value="classpath*:mapper/*/*/*.xml"/> 23 <property name="configLocation" value="classpath:/mybatisConfig.xml"/> 24 </bean> 25 26 <!-- 自動創建映射器,不用單獨為每個 mapper映射--> 27 <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer"> 28 <property name="basePackage" value="com.company.dao.mapper"/> 29 <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/> 30 </bean> 31 32 <!-- 事務管理器配置,單數據源事務 --> 33 <bean id="transactionManager" 34 class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> 35 <property name="dataSource" ref="dataSource"/> 36 </bean> 37 38 <tx:annotation-driven transaction-manager="transactionManager"/> 39 40 </beans>
再上mybatisConfig.xml(在本項目中,我沒有設置setting。最終采用的局部批處理,因此未設置全局批處理,具體原因后面再說。)

1 <?mapper.xml version="1.0" encoding="UTF-8" ?> 2 <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd"> 3 <configuration> 4 5 <settings> 6 <!-- 配置默認的執行器。SIMPLE 就是普通的執行器;REUSE 執行器會重用預處理語句(prepared statements); BATCH 執行器將重用語句並執行批量更新。--> 7 <setting name="defaultExecutorType" value="BATCH"/> 8 <!--詳見:http://www.mybatis.org/mybatis-3/zh/configuration.html--> 9 </settings> 10 11 <!-- 別名列表 --> 12 <typeAliases> 13 <!-- typeAliases 中的配置都是配置別名,在此就不貼出來了 --> 14 </typeAliases> 15 16 </configuration>
這樣子設置好后,在BaseService開放saveBatch(List<T> list)方法

1 @Override 2 public void save(List<T> list) { 3 for (int i = 0;i < list.size();i++){ 4 mapper.insert(list.get(i)); 5 } 6 } 7 8 @Override 9 public void saveBatch(List<T> list) { 10 int size = list.size(); 11 int unitNum = 500; 12 int startIndex = 0; 13 int endIndex = 0; 14 while (size > 0){ 15 if(size > unitNum){ 16 endIndex = startIndex+unitNum; 17 }else { 18 endIndex = startIndex+size; 19 } 20 List<T> insertData = list.subList(startIndex,endIndex); 21 save(insertData); 22 size = size - unitNum; 23 startIndex = endIndex; 24 } 25 }
雖然看上去是500條記錄,一次次INSERT INTO,但由於在全局已經設置Mybatis是批處理執行器,所以這500條INSERT INTO只會與Oracle數據庫通信一次。
全局設置批處理的局限性在哪里呢?
先附上mybatis官方的討論列表中最很關鍵的一句:“If the BATCH executor is in use, the update counts are being lost. ”
設置全局批處理后,DB里的insert、Update和delete方法,都無法返回進行DML影響DB_TABLE的行數。
1.insert 無法返回影響的行數,這個好解決,一個批處理放在一個事務里,記錄批處理失敗次數,總數-批處理失敗次數*單位批處理數據量,就能得到insert 影響DB_TABLE的行數;
2.但是update和delete就無法很簡單的去統計影響行數了,如果做反復查詢,反而降低了效率,得不償失。
雖現在的項目尚未有需要反饋影響DB_TABLE行數的需求,但是為了更靈活,我們放棄了全局批處理的方式。
!這里提個疑問:為什么Mybatis官方,不將批處理的選擇方式下沉到方法級別?方便開發者根據實際情況,靈活選擇。我覺得這是個可以改進的地方,如有機會,可看源碼去進行改進。
---------------------------------------------------------------------------------------------------------
二、Mybatis局部批處理方式
由於領導說全局批處理方式,不夠靈活,不適宜項目所需,要另想辦法支持。但是java自帶的批處理,因為項目代碼管理的要求,也不能采用。因此,在仔細閱讀官方文檔后,設想自己能否獲取SQLSession后openSession,將這個會話設置為批處理呢?
先看MyBatis官方網站(須FanQiang):http://www.mybatis.org/mybatis-3/zh/getting-started.html

1 SqlSession session = sqlSessionFactory.openSession(); 2 try { 3 BlogMapper mapper = session.getMapper(BlogMapper.class); 4 // do work 5 } finally { 6 session.close(); 7 }
后查閱Mybatis java API(須FanQiang): http://www.mybatis.org/mybatis-3/zh/java-api.html
現在你有一個 SqlSessionFactory,可以用來創建 SqlSession 實例。
SqlSessionFactory
SqlSessionFactory 有六個方法可以用來創建 SqlSession 實例。通常來說,如何決定是你 選擇下面這些方法時:
- Transaction (事務): 你想為 session 使用事務或者使用自動提交(通常意味着很多 數據庫和/或 JDBC 驅動沒有事務)?
- Connection (連接): 你想 MyBatis 獲得來自配置的數據源的連接還是提供你自己
- Execution (執行): 你想 MyBatis 復用預處理語句和/或批量更新語句(包括插入和 刪除)
重載的 openSession()方法簽名設置允許你選擇這些可選中的任何一個組合。

1 SqlSession openSession() 2 SqlSession openSession(boolean autoCommit) 3 SqlSession openSession(Connection connection) 4 SqlSession openSession(TransactionIsolationLevel level) 5 SqlSession openSession(ExecutorType execType,TransactionIsolationLevel level) 6 SqlSession openSession(ExecutorType execType) 7 SqlSession openSession(ExecutorType execType, boolean autoCommit) 8 SqlSession openSession(ExecutorType execType, Connection connection) 9 Configuration getConfiguration();
因此出來了局部批處理第一套代碼實現方式:

1 public static void sqlSession(List<Student> data) throws IOException { 2 String resource = "mybatis-dataSource.xml"; 3 InputStream inputStream = null; 4 SqlSession batchSqlSession = null; 5 try{ 6 inputStream = Resources.getResourceAsStream(resource); 7 SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream); 8 batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false); 9 int batchCount = 500;//每批commit的個數 10 for(int index = 0; index < data.size();index++){ 11 Student stu = data.get(index); 12 batchSqlSession.getMapper(Student.class).insert(stu); 13 if(index !=0 && index%batchCount == 0){ 14 batchSqlSession.commit(); 15 } 16 } 17 batchSqlSession.commit(); 18 }catch (Exception e){ 19 e.printStackTrace(); 20 }finally { 21 if(batchSqlSession != null){ 22 batchSqlSession.close(); 23 } 24 if(inputStream != null){ 25 inputStream.close(); 26 } 27 } 28 }

1 <?xml version="1.0" encoding="UTF-8" ?> 2 <!DOCTYPE configuration 3 PUBLIC "-//mybatis.org//DTD Config 3.0//EN" 4 "http://mybatis.org/dtd/mybatis-3-config.dtd"> 5 <configuration> 6 <environments default="development"> 7 <environment id="development"> 8 <transactionManager type="JDBC"/> 9 <dataSource type="POOLED"> 10 <property name="driver" value="${driver}"/> 11 <property name="url" value="${url}"/> 12 <property name="username" value="${username}"/> 13 <property name="password" value="${password}"/> 14 </dataSource> 15 </environment> 16 </environments> 17 <mappers> 18 <mapper resource="org/mybatis/example/Student.xml"/> 19 </mappers> 20 </configuration>
已經在Spring-Mybatis.xml 中配置了SQLSessionFactory,那我為何還要自己去創建SQLSessionFactory呢?因此繼續改良代碼

1 public static void mybatisSqlSession(List<Student> data){ 2 DefaultSqlSessionFactory sqlSessionFactory = (DefaultSqlSessionFactory) ServiceBeanConstant.CTX.getBean("sqlSessionFactory"); 3 SqlSession batchSqlSession = null; 4 try{ 5 batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false); 6 int batchCount = 500;//每批commit的個數 7 for(int index = 0; index < data.size();index++){ 8 Student stu = data.get(index); 9 batchSqlSession.getMapper(StudentMapper.class).insert(stu); 10 if(index !=0 && index%batchCount == 0){ 11 batchSqlSession.commit(); 12 } 13 } 14 batchSqlSession.commit(); 15 }catch (Exception e){ 16 e.printStackTrace(); 17 }finally { 18 if(batchSqlSession != null){ 19 batchSqlSession.close(); 20 } 21 } 22 }
這個版本的局部批處理插入是比較滿意的,最終采用的方式也是這個版本。
下面放出在IService接口定義和Service的具體實現代碼:
IService接口定義

1 /** 2 * 批處理插入數據(方法內部定義500條為一個批次進行提交) 3 * 使用注意事項:必須在XxxMappper.xml中實現<insert id="insert" ...>....<insert/>的sql 4 * @param data 批量插入的數據 5 * @param mClass 調用的XxxMaperr.class 6 * @auth robin 7 * Created on 2016/3/14 8 */ 9 void saveBatch(List<T> data,Class mClass);
Service實現

1 @Override 2 public void saveBatch(List<T> data,Class mClass) { 3 DefaultSqlSessionFactory sqlSessionFactory = (DefaultSqlSessionFactory) ServiceBeanConstant.CTX.getBean("sqlSessionFactory"); 4 SqlSession batchSqlSession = null; 5 try{ 6 batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false); 7 int batchCount = 500;//每批commit的個數 8 for(int index = 0; index < data.size();index++){ 9 T t = data.get(index); 10 ((BaseMapper)batchSqlSession.getMapper(mClass)).insert(t); 11 if(index !=0 && index%batchCount == 0){ 12 batchSqlSession.commit(); 13 } 14 } 15 batchSqlSession.commit(); 16 }catch (Exception e){ 17 e.printStackTrace(); 18 }finally { 19 if(batchSqlSession != null){ 20 batchSqlSession.close(); 21 } 22 } 23 }
局部和全局批處理插入對比:局部批處理,可以對特定一類的方法,進行數據批處理,不會影響其他DML語句,其他DML語句,可以正常返回影響DB_TABLE的行數。
!這樣既能針對特殊需求(批處理)支持,也能支持未來需要返回影響數據行的要求。
注意:使用批處理方式進行DML操作,是無法反饋影響DB_TABLE行數的數據。無論是局部批處理還是java自帶的批處理方式,皆無法反饋DB_TABLE count。
補充完善:
在我的Service實現中,通過注入的方式,獲取mapper的實例

1 public class BaseService<MAPPER extends BaseMapper, T, PK extends Serializable> implements IBaseService<T, PK> { 2 3 protected T tt; 4 /** 5 * 實體操作的自動注入Mapper(隨初始化一同注入,必須用set方法) 6 */ 7 protected MAPPER mapper; 8 9 public MAPPER getMapper() { 10 return mapper; 11 } 12 13 @Autowired 14 public void setMapper(MAPPER mapper) { 15 this.mapper = mapper; 16 } 17 //后續代碼略 18 }
前面的Service saveBatch方法中,還需要傳入指定的Mapper.class.對本項目其他開發者來說,與之前的環境相比,多傳一個參數感覺別扭。
那么為何我不繼續封裝,外部無需傳入Mapper.class,而是通過內部注入的mapper實例獲取Mapper.class.
改良后的代碼:

1 @Override 2 public T saveBatch(List<T> data) { 3 T back = null; 4 DefaultSqlSessionFactory sqlSessionFactory = (DefaultSqlSessionFactory) ServiceBeanConstant.CTX.getBean("sqlSessionFactory"); 5 SqlSession batchSqlSession = null; 6 try{ 7 batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false); 8 int batchCount = 500;//每批commit的個數 9 for(int index = 0; index < data.size();index++){ 10 T t = data.get(index); 11 back = t; 12 Class<?>[] interfaces=mapper.getClass().getInterfaces(); 13 Class clazz = null; 14 for (int i=0;i<interfaces.length;i++){ 15 if(BaseMapper.class.isAssignableFrom(interfaces[i])){ 16 clazz = interfaces[i]; 17 } 18 } 19 if(clazz == null){ 20 throw new Exception("user-defined exception:mapper not implements interfaces com.company.dao.mapper.BaseMapper"); 21 } 22 BaseMapper baseMapper = (BaseMapper) batchSqlSession.getMapper(clazz); 23 baseMapper.insert(t); 24 if(index !=0 && index%batchCount == 0){ 25 batchSqlSession.commit(); 26 } 27 } 28 batchSqlSession.commit(); 29 }catch (Exception e){ 30 e.printStackTrace(); 31 }finally { 32 if(batchSqlSession != null){ 33 batchSqlSession.close(); 34 } 35 return back; 36 } 37 }
這里對mapper實例進行一個簡短的說明:
1.mapper實例是通過java動態代理來實例化的;
2.mapper的SQLSession是使用mybatis統一的配置實例的;
3.mapper的默認執行器是SIMPLE(普通的執行器);
-------------------------------------------------------------------------------------
三、Mybatis foreach批量插入
Mybatis foreach 批量插入,如果批量插入的數據量大,不得不說這真是一個非常糟糕的做法。
無論是SELECT ** UNION ALL 還是BEGIN ...;END; ,相對而言后者比前者稍微好點。
放出DB和我測試的結果:
耗時 | 占當時整個數據庫CPU百分比 | 說明 |
15.5 | 98.33 | union all方式拼接插入 |
16.4 | 97.75 | begin end方式插入塊 |
1.54 | 64.81 | java 自帶的batch方式插入 |
①foreach union all的批量插入,現已有大量的博客資源可供參考,我就不貼出自己的實現方式了。
如果有興趣可以參閱:http://blog.csdn.net/sanyuesan0000/article/details/19998727 (打開瀏覽器,復制url)
這篇博客。BEGIN END的方式,也是從這篇博客中得到啟發。只不過他是把BEGIN END用在update中。
②foreach begin end 語句塊
我的實現:

1 <insert id="insertBatch" parameterType="java.util.List"> 2 BEGIN 3 <foreach collection="list" item="item" index="index" separator=";" > 4 INSERT INTO TABLE.STUDENT (ID,AGE,NAME,STU_ID) VALUES 5 ( DEMO.SEQ_EID.NEXTVAL,#{item.age},#{item.name},#{item.stuId} ) 6 </foreach> 7 ;END ; 8 </insert>
調用方式:

1 @Override 2 public void saveBatch(List<T> list) { 3 int size = list.size(); 4 int unitNum = 500; 5 int startIndex = 0; 6 int endIndex = 0; 7 while (size > 0){ 8 if(size > unitNum){ 9 endIndex = startIndex+unitNum; 10 }else { 11 endIndex = startIndex+size; 12 } 13 List<T> insertData = list.subList(startIndex,endIndex); 14 mapper.insertBatch(insertData); 15 size = size - unitNum; 16 startIndex = endIndex; 17 }
---------------------------------------------------------------------
四、java自帶的批處理方式
廢話不多說,直接上代碼

1 package DB; 2 3 import base.Student; 4 5 import java.sql.Connection; 6 import java.sql.DriverManager; 7 import java.sql.PreparedStatement; 8 import java.sql.SQLException; 9 import java.util.ArrayList; 10 import java.util.List; 11 12 /** 13 * Created by robin on 2016/5/23. 14 * 15 * @author robin 16 */ 17 public class InsertTableDemo { 18 19 public static void main(String args[]) throws SQLException { 20 Connection connection = null; 21 List<Student> dataList = getDataList(100000); 22 long startTime = 0; 23 try{ 24 connection = getConn(); 25 startTime=System.currentTimeMillis(); 26 connection.setAutoCommit(false); 27 PreparedStatement statement = connection.prepareStatement("INSERT INTO STUDENT (ID,AGE,NAME,STU_ID) VALUES ( DEMO.SEQ_EID.NEXTVAL, ?,?,? ) "); 28 int num = 0; 29 for (int i = 0;i< dataList.size();i++){ 30 Student s = dataList.get(i); 31 statement.setInt(1, s.getAge()); 32 statement.setString(2, s.getName()); 33 statement.setString(3, s.getStuId()); 34 statement.addBatch(); 35 num++; 36 if(num !=0 && num%500 == 0){ 37 statement.executeBatch(); 38 connection.commit(); 39 num = 0; 40 } 41 } 42 statement.executeBatch(); 43 connection.commit(); 44 }catch (Exception e){ 45 e.printStackTrace(); 46 connection.rollback(); 47 }finally { 48 if(connection != null){ 49 connection.close(); 50 } 51 long endTime=System.currentTimeMillis(); 52 System.out.println("方法執行時間:"+(endTime-startTime)+"ms"); 53 } 54 55 } 56 57 public static Connection getConn(){ 58 String driver = "oracle.jdbc.driver.OracleDriver"; 59 String url = "jdbc:oracle:thin:@//ip:port/DMEO"; //DMEO為數據庫名 60 String user = "user"; 61 String password = "pwd"; 62 try{ 63 Class.forName(driver); 64 Connection conn = DriverManager.getConnection(url, user, password); 65 return conn; 66 } catch (ClassNotFoundException e) { 67 e.printStackTrace(); 68 } catch (SQLException e) { 69 e.printStackTrace(); 70 } 71 return null; 72 } 73 public static List<Student> getDataList(int f){ 74 List<Student> data = new ArrayList<>(); 75 for (int i =0;i<f;i++){ 76 Student s = new Student(""+i,"小明" + i,i); 77 data.add(s); 78 } 79 return data; 80 } 81 82 83 }
這種批量插入大量數據的方式,性能上最好。但是因為我們小組代碼管理所限制,因此這種方式不使用。
------------------------------------------------------------------------
五、其他方式
現在已經忘了,其他方式到底使用過哪些,但總歸是比以上四種效果都更差,所以沒什么印象了。
如果各位,還有什么其他更好的批量插入數據的方式,歡迎加入討論,集思廣益。
以上就是這兩天,對在原項目基礎上不進行大變動的基礎上,提供批處理插入數據的所思所行。
-------------------------------------------------------------------------------
后記:
這里吐槽一句:希望大家不要把未經過自己驗證的東西,言之鑿鑿地寫到博客中去。
在我做批處理這件事的時候,領導也在參閱網上的博客。其中有一篇博客,說自己在oracle中批量插入數據,采用foreach insert into (field1,field2,...) values (v11,v12,...),(v21,v22,...) ,(v31,v32,...),...也可以。
雖然我明知不行,但是無可奈何還是要去演示給領導看,在oracle中,這種寫法確實不適用。
領導問我為何他說可以,我想我也只能回答:他抄別人的博客唄,抄來抄去都不自己實踐驗證,就想當然地寫到博客里。
所以,如果你看完了我這篇分享,希望您也能寫個demo驗證下,起碼可以加深自己的理解。
感謝領導和DB同事,在此過程中的幫助。
以上內容,都經過本人實踐驗證過。若轉發,請在標題上標記[轉],並注明原文鏈接:http://www.cnblogs.com/robinjava77/p/5530681.html,作者名稱:robin。並在文章最后一行附上本句話。否則,作者保留追究的權利。