java mysql大數據量批量插入與流式讀取分析



總結下這周幫助客戶解決報表生成操作的mysql 驅動的使用上的一些問題,與解決方案。由於生成報表邏輯要從數據庫讀取大量數據並在內存中加工處理后在

生成大量的匯總數據然后寫入到數據庫。基本流程是 讀取->處理->寫入。

1 讀取操作開始遇到的問題是當sql查詢數據量比較大時候基本讀不出來。開始以為是server端處理太慢。但是在控制台是可以立即返回數據的。於是在應用

這邊抓包,發現也是發送sql后立即有數據返回。但是執行ResultSet的next方法確實阻塞的。查文檔翻代碼原來mysql驅動默認的行為是需要把整個結果全部讀取到

內存中才開始允許應用讀取結果。顯然與期望的行為不一致,期望的行為是流的方式讀取,當結果從myql服務端返回后立即還是讀取處理。這樣應用就不需要大量內存

來存儲這個結果集。正確的流式讀取方式代碼示例:

 

PreparedStatement ps = connection.prepareStatement("select .. from ..", 
            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); 

//forward only read only也是mysql 驅動的默認值,所以不指定也是可以的 比如: PreparedStatement ps = connection.prepareStatement("select .. from .."); 

ps.setFetchSize(Integer.MIN_VALUE); //也可以修改jdbc url通過defaultFetchSize參數來設置,這樣默認所以的返回結果都是通過流方式讀取.
ResultSet rs = ps.executeQuery();

while (rs.next()) {
  System.out.println(rs.getString("fieldName"));
}

 


代碼分析:下面是mysql判斷是否開啟流式讀取結果的方法,有三個條件forward-only,read-only,fatch size是Integer.MIN_VALUE

/**
 * We only stream result sets when they are forward-only, read-only, and the
 * fetch size has been set to Integer.MIN_VALUE
 *
 * @return true if this result set should be streamed row at-a-time, rather
 * than read all at once.
 */
protected boolean createStreamingResultSet() {
    try { synchronized(checkClosed().getConnectionMutex()) { return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY) && (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) && (this.fetchSize == Integer.MIN_VALUE)); } } catch (SQLException e) { // we can't break the interface, having this be no-op in case of error is ok return false; } }

 

 

2 批量寫入問題。開始時應用程序是一條一條的執行insert來寫入報表結果。寫入也是比較慢的。主要原因是單條寫入時候需要應用於db之間大量的
請求響應交互。每個請求都是一個獨立的事務提交。這樣網絡延遲大的情況下多次請求會有大量的時間消耗的網絡延遲上。第二個是由於每個事務db都會
有刷新磁盤操作寫事務日志,保證事務的持久性。由於每個事務只是寫入一條數據 所以磁盤io利用率不高,因為對於磁盤io是按塊來的,所以連續寫入大量數據效率
更好。所以必須改成批量插入的方式,減少請求數與事務數。下面是批量插入的例子:還有jdbc連接串必須加下rewriteBatchedStatements=true

int batchSize = 1000;
PreparedStatement ps = connection.prepareStatement("insert into tb1 (c1,c2,c3...) values (?,?,?...)"); for (int i = 0; i < list.size(); i++) { ps.setXXX(list.get(i).getC1()); ps.setYYY(list.get(i).getC2()); ps.setZZZ(list.get(i).getC3()); ps.addBatch(); if ((i + 1) % batchSize == 0) { ps.executeBatch(); } } if (list.size() % batchSize != 0) { ps.executeBatch(); }

 

上面代碼示例是每1000條數據發送一次請求。mysql驅動內部在應用端會把多次addBatch()的參數合並成一條multi value的insert語句發送給db去執行
比如insert into tb1(c1,c2,c3) values (v1,v2,v3),(v4,v5,v6),(v7,v8,v9)...
這樣可以比每條一個insert 明顯少很多請求。減少了網絡延遲消耗時間與磁盤io時間,從而提高了tps。

代碼分析: 從代碼可以看出,
1 rewriteBatchedStatements=true,insert是參數化語句且不是insert ... select 或者 insert... on duplicate key update with an id=last_insert_id(...)的話會執行 
executeBatchedInserts,也就是muti value的方式

2 rewriteBatchedStatements=true 語句是都是參數化(沒有addbatch(sql)方式加入的)的而且mysql server版本在4.1以上 語句超過三條,則執行executePreparedBatchAsMultiStatement
就是將多個語句通過;分隔一次提交多條sql。比如 "insert into tb1(c1,c2,c3) values (v1,v2,v3);insert into tb1(c1,c2,c3) values (v1,v2,v3)..."

3 其余的執行executeBatchSerially,也就是還是一條條處理

public void addBatch(String sql)throws SQLException {
    synchronized(checkClosed().getConnectionMutex()) { this.batchHasPlainStatements = true; super.addBatch(sql); } } public int[] executeBatch()throws SQLException { //... if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) { if (canRewriteAsMultiValueInsertAtSqlLevel()) { return executeBatchedInserts(batchTimeout); } if (this.connection.versionMeetsMinimum(4, 1, 0) && !this.batchHasPlainStatements && this.batchedArgs != null && this.batchedArgs.size() > 3 /* cost of option setting rt-wise */ ) { return executePreparedBatchAsMultiStatement(batchTimeout); } } return executeBatchSerially(batchTimeout); //..... }

 


executeBatchedInserts相比executePreparedBatchAsMultiStatement的方式傳輸效率更好,因為一次請求只重復一次前面的insert table (c1,c2,c3)

mysql server 對請求報文的最大長度有限制,如果batch size 太大造成請求報文超過最大限制,mysql 驅動會內部按最大報文限制查分成多個報文。所以要真正減少提交次數

還要檢查下mysql server的max_allowed_packet 否則batch size 再大也沒用.

mysql> show VARIABLES like '%max_allowed_packet%';
+--------------------+-----------+
| Variable_name | Value |
+--------------------+-----------+
| max_allowed_packet | 167772160 |
+--------------------+-----------+
1 row in set (0.00 sec)

 

 要想驗證mysql 發送了正確的sql 有兩種方式

1 抓包,下圖是wireshark在 應用端抓包mysql的報文

 

2 另一個辦法是在mysql server端開啟general log 可以查看mysql收到的所有sql

 

3 在jdbc url上加上參數traceProtocol=true 或者profileSQL=true or autoGenerateTestcaseScript=true

 

性能測試對比

import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.SQLException; import com.alibaba.druid.pool.DruidDataSource; public class BatchInsert { public static void main(String[] args) throws SQLException { int batchSize = 1000; int insertCount = 1000; testDefault(batchSize, insertCount); testRewriteBatchedStatements(batchSize,insertCount); } private static void testDefault(int batchSize, int insertCount) throws SQLException { long start = System.currentTimeMillis(); doBatchedInsert(batchSize, insertCount,""); long end = System.currentTimeMillis(); System.out.println("default:" + (end -start) + "ms"); } private static void testRewriteBatchedStatements(int batchSize, int insertCount) throws SQLException { long start = System.currentTimeMillis(); doBatchedInsert(batchSize, insertCount, "rewriteBatchedStatements=true"); long end = System.currentTimeMillis(); System.out.println("rewriteBatchedStatements:" + (end -start) + "ms"); } private static void doBatchedInsert(int batchSize, int insertCount, String mysqlProperties) throws SQLException { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl("jdbc:mysql://ip:3306/test?" + mysqlProperties); dataSource.setUsername("name"); dataSource.setPassword("password"); dataSource.init(); Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement("insert into Test (name,gmt_created,gmt_modified) values (?,now(),now())"); for (int i = 0; i < insertCount; i++) { preparedStatement.setString(1, i+" "); preparedStatement.addBatch(); if((i+1) % batchSize == 0) { preparedStatement.executeBatch(); } } preparedStatement.executeBatch(); connection.close(); dataSource.close(); } }
 
        

網絡環境ping測試延遲是35ms ,測試結果:

default:75525ms
rewriteBatchedStatements:914ms

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM