批量插入數據(基於Mybatis的實現-Oracle)


-----------------------------------

20170528 第二次編輯:主要是補充mysql相關內容。

-----------------------------------

 

mysql支持batch提交改進方案:聲明:mysql仍然沒有內部游標,讓數據庫支持executeBatch的方式處理。

MySql 的批量操作,要加rewriteBatchedStatements參數

引用“

MySql的JDBC連接的url中要加rewriteBatchedStatements參數,並保證5.1.13以上版本的驅動,才能實現高性能的批量插入。

 

例如: String connectionUrl="jdbc:mysql://192.168.1.100:3306/test?rewriteBatchedStatements=true" ; 

 

還要保證Mysql JDBC驅的版本。MySql的JDBC驅動的批量插入操作性能是很優秀的。

原文鏈接:http://elf8848.iteye.com/blog/770032  (建議去看看這個鏈接的評論區)

參考資料:https://www.oschina.net/question/2553117_2162171?sort=time 

20170528親測,插入26663條數據,

加上rewriteBatchedStatements后,耗時:3734毫秒
不加rewriteBatchedStatements前,耗時:672551毫秒

mysql裝在本機上,行字段數多,僅從本次測試看,性能提高了180倍。

 ---------------------------------------------------

第一版:20170526 原創

 ---------------------------------------------------

前言:做一個數據同步項目,要求:同步數據不丟失的情況下,提高插入性能。

項目DB框架:Mybatis。DataBase:Oracle。

----------------------------------------------------------------------------

批量插入數據方式:

一、Mybatis 全局設置批處理;

二、Mybatis 局部設置批處理;

三、Mybatis foreach批量插入:

①SELECT UNION ALL;

②BEGIN INSERT INTO ...;INSERT INTO...;...;END;

四、java自帶的批處理插入;

五、其他方式

-----------------------------------------------------------------------------

先說結論:Mybatis(全局/局部)批處理和java自帶的批處理 性能上差不多,屬於最優處理辦法,我這邊各種測試后,最后采用Mybatis局部批處理方式。

一、Mybatis 全局設置批處理

先上Spring-Mybatis.xml 配置信息

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
 5        xsi:schemaLocation="http://www.springframework.org/schema/beans
 6                            http://www.springframework.org/schema/beans/spring-beans.xsd
 7                            http://www.springframework.org/schema/context
 8                            http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
 9 
10     <!-- 自動掃描(自動注入) -->
11     <context:annotation-config/>
12     <context:component-scan base-package="com.company.dao"/>
13 
14     <!-- 動態數據源 -->
15     <bean id="dataSource" class="com.company.dao.datasource.DataSource">
16         <property name="myConfigFile" value="mySource.xml"/>
17     </bean>
18 
19     <!-- mybatis配置 -->
20     <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
21         <property name="dataSource" ref="dataSource"/>
22         <property name="mapperLocations" value="classpath*:mapper/*/*/*.xml"/>
23         <property name="configLocation" value="classpath:/mybatisConfig.xml"/>
24     </bean>
25 
26     <!-- 自動創建映射器,不用單獨為每個 mapper映射-->
27     <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
28         <property name="basePackage" value="com.company.dao.mapper"/>
29         <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/>
30     </bean>
31 
32     <!-- 事務管理器配置,單數據源事務 -->
33     <bean id="transactionManager"
34           class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
35         <property name="dataSource" ref="dataSource"/>
36     </bean>
37 
38     <tx:annotation-driven transaction-manager="transactionManager"/>
39 
40 </beans>
Spring-Mybatis.xml

再上mybatisConfig.xml(在本項目中,我沒有設置setting。最終采用的局部批處理,因此未設置全局批處理,具體原因后面再說。)

 1 <?mapper.xml version="1.0" encoding="UTF-8" ?>
 2 <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd">
 3 <configuration>
 4     
 5     <settings>
 6         <!-- 配置默認的執行器。SIMPLE 就是普通的執行器;REUSE 執行器會重用預處理語句(prepared statements); BATCH 執行器將重用語句並執行批量更新。-->
 7         <setting name="defaultExecutorType" value="BATCH"/>
 8         <!--詳見:http://www.mybatis.org/mybatis-3/zh/configuration.html-->
 9     </settings>
10     
11     <!-- 別名列表 -->
12     <typeAliases>
13        <!-- typeAliases 中的配置都是配置別名,在此就不貼出來了 -->
14     </typeAliases>
15 
16 </configuration>
mybatisConfig.xml

這樣子設置好后,在BaseService開放saveBatch(List<T> list)方法

 1 @Override
 2     public void save(List<T> list) {
 3         for (int i = 0;i < list.size();i++){
 4             mapper.insert(list.get(i));
 5         }
 6     }
 7 
 8     @Override
 9     public void saveBatch(List<T> list) {
10         int size = list.size();
11         int unitNum = 500;
12         int startIndex = 0;
13         int endIndex = 0;
14         while (size > 0){
15             if(size > unitNum){
16                 endIndex = startIndex+unitNum;
17             }else {
18                 endIndex = startIndex+size;
19             }
20             List<T> insertData = list.subList(startIndex,endIndex);
21             save(insertData);
22             size = size - unitNum;
23             startIndex = endIndex;
24         }
25     }
BaseService.saveBatch(List list)

 

雖然看上去是500條記錄,一次次INSERT INTO,但由於在全局已經設置Mybatis是批處理執行器,所以這500條INSERT INTO只會與Oracle數據庫通信一次。

 

全局設置批處理的局限性在哪里呢?

先附上mybatis官方的討論列表中最很關鍵的一句:“If the BATCH executor is in use, the update counts are being lost. ”

設置全局批處理后,DB里的insert、Update和delete方法,都無法返回進行DML影響DB_TABLE的行數。

1.insert 無法返回影響的行數,這個好解決,一個批處理放在一個事務里,記錄批處理失敗次數,總數-批處理失敗次數*單位批處理數據量,就能得到insert 影響DB_TABLE的行數;

2.但是update和delete就無法很簡單的去統計影響行數了,如果做反復查詢,反而降低了效率,得不償失。

 

雖現在的項目尚未有需要反饋影響DB_TABLE行數的需求,但是為了更靈活,我們放棄了全局批處理的方式。

!這里提個疑問:為什么Mybatis官方,不將批處理的選擇方式下沉到方法級別?方便開發者根據實際情況,靈活選擇。我覺得這是個可以改進的地方,如有機會,可看源碼去進行改進。

---------------------------------------------------------------------------------------------------------

二、Mybatis局部批處理方式

 

由於領導說全局批處理方式,不夠靈活,不適宜項目所需,要另想辦法支持。但是java自帶的批處理,因為項目代碼管理的要求,也不能采用。因此,在仔細閱讀官方文檔后,設想自己能否獲取SQLSession后openSession,將這個會話設置為批處理呢?

先看MyBatis官方網站(須FanQiang):http://www.mybatis.org/mybatis-3/zh/getting-started.html

 

1 SqlSession session = sqlSessionFactory.openSession();
2 try {
3   BlogMapper mapper = session.getMapper(BlogMapper.class);
4   // do work
5 } finally {
6   session.close();
7 }
官方建議的寫法

 

后查閱Mybatis java API(須FanQiang):  http://www.mybatis.org/mybatis-3/zh/java-api.html

現在你有一個 SqlSessionFactory,可以用來創建 SqlSession 實例。

SqlSessionFactory

SqlSessionFactory 有六個方法可以用來創建 SqlSession 實例。通常來說,如何決定是你 選擇下面這些方法時:

  • Transaction (事務): 你想為 session 使用事務或者使用自動提交(通常意味着很多 數據庫和/或 JDBC 驅動沒有事務)?
  • Connection (連接): 你想 MyBatis 獲得來自配置的數據源的連接還是提供你自己
  • Execution (執行): 你想 MyBatis 復用預處理語句和/或批量更新語句(包括插入和 刪除)

 

重載的 openSession()方法簽名設置允許你選擇這些可選中的任何一個組合。

1 SqlSession openSession()
2 SqlSession openSession(boolean autoCommit)
3 SqlSession openSession(Connection connection)
4 SqlSession openSession(TransactionIsolationLevel level)
5 SqlSession openSession(ExecutorType execType,TransactionIsolationLevel level)
6 SqlSession openSession(ExecutorType execType)
7 SqlSession openSession(ExecutorType execType, boolean autoCommit)
8 SqlSession openSession(ExecutorType execType, Connection connection)
9 Configuration getConfiguration();
官方提供的openSession方法

因此出來了局部批處理第一套代碼實現方式:

 1 public static void sqlSession(List<Student> data) throws IOException {
 2         String resource = "mybatis-dataSource.xml";
 3         InputStream inputStream = null;
 4         SqlSession batchSqlSession = null;
 5         try{
 6             inputStream = Resources.getResourceAsStream(resource);
 7             SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
 8             batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
 9             int batchCount = 500;//每批commit的個數
10             for(int index = 0; index < data.size();index++){
11                 Student stu = data.get(index);
12                 batchSqlSession.getMapper(Student.class).insert(stu);
13                 if(index !=0 && index%batchCount == 0){
14                     batchSqlSession.commit();
15                 }
16             }
17             batchSqlSession.commit();
18         }catch (Exception e){
19             e.printStackTrace();
20         }finally {
21             if(batchSqlSession != null){
22                 batchSqlSession.close();
23             }
24             if(inputStream != null){
25                 inputStream.close();
26             }
27         }
28     }
sqlSession(List data)
 1 <?xml version="1.0" encoding="UTF-8" ?>
 2 <!DOCTYPE configuration
 3   PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
 4   "http://mybatis.org/dtd/mybatis-3-config.dtd">
 5 <configuration>
 6   <environments default="development">
 7     <environment id="development">
 8       <transactionManager type="JDBC"/>
 9       <dataSource type="POOLED">
10         <property name="driver" value="${driver}"/>
11         <property name="url" value="${url}"/>
12         <property name="username" value="${username}"/>
13         <property name="password" value="${password}"/>
14       </dataSource>
15     </environment>
16   </environments>
17   <mappers>
18     <mapper resource="org/mybatis/example/Student.xml"/>
19   </mappers>
20 </configuration>
mybatis-dataSource.xml

已經在Spring-Mybatis.xml 中配置了SQLSessionFactory,那我為何還要自己去創建SQLSessionFactory呢?因此繼續改良代碼

 1 public static void mybatisSqlSession(List<Student> data){
 2         DefaultSqlSessionFactory sqlSessionFactory = (DefaultSqlSessionFactory) ServiceBeanConstant.CTX.getBean("sqlSessionFactory");
 3         SqlSession batchSqlSession = null;
 4         try{
 5             batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
 6             int batchCount = 500;//每批commit的個數
 7             for(int index = 0; index < data.size();index++){
 8                 Student stu = data.get(index);
 9                 batchSqlSession.getMapper(StudentMapper.class).insert(stu);
10                 if(index !=0 && index%batchCount == 0){
11                     batchSqlSession.commit();
12                 }
13             }
14             batchSqlSession.commit();
15         }catch (Exception e){
16             e.printStackTrace();
17         }finally {
18             if(batchSqlSession != null){
19                 batchSqlSession.close();
20             }
21         }
22     }
mybatisSqlSession(List data)

 

這個版本的局部批處理插入是比較滿意的,最終采用的方式也是這個版本。

 

下面放出在IService接口定義和Service的具體實現代碼:

IService接口定義

1 /**
2      * 批處理插入數據(方法內部定義500條為一個批次進行提交)
3      * 使用注意事項:必須在XxxMappper.xml中實現<insert id="insert" ...>....<insert/>的sql
4      * @param data 批量插入的數據
5      * @param mClass 調用的XxxMaperr.class
6      * @auth robin
7      * Created on 2016/3/14
8      */
9     void saveBatch(List<T> data,Class mClass);
saveBatch(List data,Class mClass)

Service實現

 1 @Override
 2     public void saveBatch(List<T> data,Class mClass) {
 3         DefaultSqlSessionFactory sqlSessionFactory = (DefaultSqlSessionFactory) ServiceBeanConstant.CTX.getBean("sqlSessionFactory");
 4         SqlSession batchSqlSession = null;
 5         try{
 6             batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
 7             int batchCount = 500;//每批commit的個數
 8             for(int index = 0; index < data.size();index++){
 9                 T t = data.get(index);
10                 ((BaseMapper)batchSqlSession.getMapper(mClass)).insert(t);
11                 if(index !=0 && index%batchCount == 0){
12                     batchSqlSession.commit();
13                 }
14             }
15             batchSqlSession.commit();
16         }catch (Exception e){
17             e.printStackTrace();
18         }finally {
19             if(batchSqlSession != null){
20                 batchSqlSession.close();
21             }
22         }
23     }
saveBatch(List data,Class mClass)

 

局部和全局批處理插入對比:局部批處理,可以對特定一類的方法,進行數據批處理,不會影響其他DML語句,其他DML語句,可以正常返回影響DB_TABLE的行數。

!這樣既能針對特殊需求(批處理)支持,也能支持未來需要返回影響數據行的要求。

 

注意:使用批處理方式進行DML操作,是無法反饋影響DB_TABLE行數的數據。無論是局部批處理還是java自帶的批處理方式,皆無法反饋DB_TABLE count。

 

補充完善:

在我的Service實現中,通過注入的方式,獲取mapper的實例

 

 1 public class BaseService<MAPPER extends BaseMapper, T, PK extends Serializable> implements IBaseService<T, PK> {
 2 
 3     protected T tt;
 4     /**
 5      * 實體操作的自動注入Mapper(隨初始化一同注入,必須用set方法)
 6      */
 7     protected MAPPER mapper;
 8 
 9     public MAPPER getMapper() {
10         return mapper;
11     }
12 
13     @Autowired
14     public void setMapper(MAPPER mapper) {
15         this.mapper = mapper;
16     }
17    //后續代碼略
18 }
Service

 

前面的Service saveBatch方法中,還需要傳入指定的Mapper.class.對本項目其他開發者來說,與之前的環境相比,多傳一個參數感覺別扭。

那么為何我不繼續封裝,外部無需傳入Mapper.class,而是通過內部注入的mapper實例獲取Mapper.class.
改良后的代碼:

 

 1 @Override
 2     public T saveBatch(List<T> data) {
 3         T back = null;
 4         DefaultSqlSessionFactory sqlSessionFactory = (DefaultSqlSessionFactory) ServiceBeanConstant.CTX.getBean("sqlSessionFactory");
 5         SqlSession batchSqlSession = null;
 6         try{
 7             batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
 8             int batchCount = 500;//每批commit的個數
 9             for(int index = 0; index < data.size();index++){
10                 T t = data.get(index);
11                 back = t;
12                 Class<?>[] interfaces=mapper.getClass().getInterfaces();
13                 Class clazz = null;
14                 for (int i=0;i<interfaces.length;i++){
15                     if(BaseMapper.class.isAssignableFrom(interfaces[i])){
16                         clazz = interfaces[i];
17                     }
18                 }
19                 if(clazz == null){
20                     throw new Exception("user-defined exception:mapper not implements interfaces com.company.dao.mapper.BaseMapper");
21                 }
22                 BaseMapper baseMapper = (BaseMapper) batchSqlSession.getMapper(clazz);
23                 baseMapper.insert(t);
24                 if(index !=0 && index%batchCount == 0){
25                     batchSqlSession.commit();
26                 }
27             }
28             batchSqlSession.commit();
29         }catch (Exception e){
30             e.printStackTrace();
31         }finally {
32             if(batchSqlSession != null){
33                 batchSqlSession.close();
34             }
35             return back;
36         }
37     }
saveBatch(List data)

 

這里對mapper實例進行一個簡短的說明:

1.mapper實例是通過java動態代理來實例化的;

2.mapper的SQLSession是使用mybatis統一的配置實例的;

3.mapper的默認執行器是SIMPLE(普通的執行器);

-------------------------------------------------------------------------------------

 三、Mybatis foreach批量插入

Mybatis foreach 批量插入,如果批量插入的數據量大,不得不說這真是一個非常糟糕的做法

無論是SELECT ** UNION ALL 還是BEGIN ...;END; ,相對而言后者比前者稍微好點。

放出DB和我測試的結果:

耗時 占當時整個數據庫CPU百分比 說明
15.5 98.33 union all方式拼接插入
16.4 97.75 begin end方式插入塊
1.54 64.81 java 自帶的batch方式插入

①foreach union all的批量插入,現已有大量的博客資源可供參考,我就不貼出自己的實現方式了。

如果有興趣可以參閱:http://blog.csdn.net/sanyuesan0000/article/details/19998727 (打開瀏覽器,復制url)

這篇博客。BEGIN END的方式,也是從這篇博客中得到啟發。只不過他是把BEGIN END用在update中。

②foreach begin end 語句塊

我的實現:

1 <insert id="insertBatch" parameterType="java.util.List">
2            BEGIN
3            <foreach collection="list" item="item" index="index" separator=";" >
4                INSERT INTO TABLE.STUDENT (ID,AGE,NAME,STU_ID) VALUES
5                ( DEMO.SEQ_EID.NEXTVAL,#{item.age},#{item.name},#{item.stuId} )
6            </foreach>
7            ;END ;
8        </insert>
insertBatch

 調用方式:

 1 @Override
 2     public void saveBatch(List<T> list) {
 3         int size = list.size();
 4         int unitNum = 500;
 5         int startIndex = 0;
 6         int endIndex = 0;
 7         while (size > 0){
 8             if(size > unitNum){
 9                 endIndex = startIndex+unitNum;
10             }else {
11                 endIndex = startIndex+size;
12             }
13             List<T> insertData = list.subList(startIndex,endIndex);
14             mapper.insertBatch(insertData);
15             size = size - unitNum;
16             startIndex = endIndex;
17         }
saveBatch(List list)

---------------------------------------------------------------------

四、java自帶的批處理方式

廢話不多說,直接上代碼

 1 package DB;
 2 
 3 import base.Student;
 4 
 5 import java.sql.Connection;
 6 import java.sql.DriverManager;
 7 import java.sql.PreparedStatement;
 8 import java.sql.SQLException;
 9 import java.util.ArrayList;
10 import java.util.List;
11 
12 /**
13  * Created by robin on 2016/5/23.
14  *
15  * @author robin
16  */
17 public class InsertTableDemo {
18 
19     public static void main(String args[]) throws SQLException {
20         Connection connection = null;
21         List<Student> dataList = getDataList(100000);
22         long startTime = 0;
23         try{
24             connection = getConn();
25             startTime=System.currentTimeMillis();
26             connection.setAutoCommit(false);
27             PreparedStatement statement = connection.prepareStatement("INSERT INTO STUDENT (ID,AGE,NAME,STU_ID) VALUES ( DEMO.SEQ_EID.NEXTVAL, ?,?,? ) ");
28             int num = 0;
29             for (int i = 0;i< dataList.size();i++){
30                 Student s = dataList.get(i);
31                 statement.setInt(1, s.getAge());
32                 statement.setString(2, s.getName());
33                 statement.setString(3, s.getStuId());
34                 statement.addBatch();
35                 num++;
36                 if(num !=0 && num%500 == 0){
37                     statement.executeBatch();
38                     connection.commit();
39                     num = 0;
40                 }
41             }
42             statement.executeBatch();
43             connection.commit();
44         }catch (Exception e){
45             e.printStackTrace();
46             connection.rollback();
47         }finally {
48             if(connection != null){
49                 connection.close();
50             }
51             long endTime=System.currentTimeMillis();
52             System.out.println("方法執行時間:"+(endTime-startTime)+"ms");
53         }
54 
55     }
56 
57     public static Connection getConn(){
58         String driver = "oracle.jdbc.driver.OracleDriver";
59         String url = "jdbc:oracle:thin:@//ip:port/DMEO"; //DMEO為數據庫名
60         String user = "user";
61         String password = "pwd";
62         try{
63             Class.forName(driver);
64             Connection conn = DriverManager.getConnection(url, user, password);
65             return conn;
66         } catch (ClassNotFoundException e) {
67             e.printStackTrace();
68         } catch (SQLException e) {
69             e.printStackTrace();
70         }
71         return null;
72     }
73     public static List<Student> getDataList(int f){
74         List<Student> data = new ArrayList<>();
75         for (int i =0;i<f;i++){
76             Student s = new Student(""+i,"小明" + i,i);
77             data.add(s);
78         }
79         return data;
80     }
81 
82 
83 }
JDBC BATCH

這種批量插入大量數據的方式,性能上最好。但是因為我們小組代碼管理所限制,因此這種方式不使用。

------------------------------------------------------------------------

五、其他方式

現在已經忘了,其他方式到底使用過哪些,但總歸是比以上四種效果都更差,所以沒什么印象了。

如果各位,還有什么其他更好的批量插入數據的方式,歡迎加入討論,集思廣益。 

 

以上就是這兩天,對在原項目基礎上不進行大變動的基礎上,提供批處理插入數據的所思所行。

-------------------------------------------------------------------------------

后記:

 

這里吐槽一句:希望大家不要把未經過自己驗證的東西,言之鑿鑿地寫到博客中去。

在我做批處理這件事的時候,領導也在參閱網上的博客。其中有一篇博客,說自己在oracle中批量插入數據,采用foreach insert into (field1,field2,...) values (v11,v12,...),(v21,v22,...) ,(v31,v32,...),...也可以。

雖然我明知不行,但是無可奈何還是要去演示給領導看,在oracle中,這種寫法確實不適用。

領導問我為何他說可以,我想我也只能回答:他抄別人的博客唄,抄來抄去都不自己實踐驗證,就想當然地寫到博客里。

 

所以,如果你看完了我這篇分享,希望您也能寫個demo驗證下,起碼可以加深自己的理解。

 

 感謝領導和DB同事,在此過程中的幫助。

以上內容,都經過本人實踐驗證過。若轉發,請在標題上標記[轉],並注明原文鏈接:http://www.cnblogs.com/robinjava77/p/5530681.html,作者名稱:robin。並在文章最后一行附上本句話。否則,作者保留追究的權利。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM