java實現高性能的數據同步


最近在做一個銀行的生產數據脫敏系統,今天寫代碼時遇到了一個“瓶頸”,脫敏系統需要將生產環境上Infoxmix里的數據原封不動的Copy到另一台 Oracle數據庫服務器上,然后對Copy后的數據作些漂白處理。為了將人為干預的因素降到最低,在系統設計時采用Java代碼對數據作Copy,思路


    首 先在代碼與生產庫間建立一個Connection,將讀取到的數據放在ResultSet對象,然后再與開發庫建立一個Connection。從 ResultSet取出數據后通過TestConnection插入到開發庫,以此來實現Copy。代碼寫完后運行程序,速度太慢了,一秒鍾只能Copy 一千條數據,生產庫上有上億條數據,按照這個速度同步完要到猴年馬月呀,用PreparedStatement批處理速度也沒有提交多少。我想能不能用多 線程處理,多個人干活總比一個人干活速度要快。
    假設生產庫有1萬條數據,我開5個線程,每個線程分2000條數據,同時向開發庫里插數據,Oracle支持高並發這樣的話速度至少會提高好多倍,按照這 個思路重新進行了編碼,批處理設置為1萬條一提交,統計插入數量的變量使用 java.util.concurrent.atomic.AtomicLong,程序一運行,傳輸速度飛快CPU利用率在70%~90%,現在一秒鍾可 以拷貝50萬條記錄,沒過幾分鍾上億條數據一條不落地全部Copy到目標庫。

在查詢的時候我用了如下語句
  1. String queryStr = "SELECT * FROM xx";  
  2. ResultSet coreRs = PreparedStatement.executeQuery(queryStr);  

實習生問如果xx表里有上千萬條記錄,你全部查詢出來放到ResultSet, 那內存不溢出了么?Java在設計的時候已經考慮到這個問題了,並沒有查詢出所有的數據,而是只查詢了一部分數據放到ResultSet,數據“用完”它 會自動查詢下一批數據,你可以用setFetchSize(int rows)方法設置一個建議值給ResultSet,告訴它每次從數據庫Fetch多少條數據。但我不贊成,因為JDBC驅動會根據實際情況自動調整 Fetch的數量。另外性能也與網線的帶寬有直接的關系。
相關代碼
package com.dlbank.domain;  
   
 import java.sql.Connection;  
 import java.sql.PreparedStatement;  
 import java.sql.ResultSet;  
 import java.sql.Statement;  
 import java.util.List;  
 import java.util.concurrent.atomic.AtomicLong;  
   
 import org.apache.log4j.Logger;  
   
 /**
  * <p>
  * title: 數據同步類
  * </p>
  * <p>
  * Description: 該類用於將生產核心庫數據同步到開發庫
  * </p>
  * 
  * @author Tank Zhang
  */  
 public class CoreDataSyncImpl implements CoreDataSync {  
       
     private List<String> coreTBNames; // 要同步的核心庫表名
     private ConnectionFactory connectionFactory;  
     private Logger log = Logger.getLogger(getClass());  
       
     private AtomicLong currentSynCount = new AtomicLong(0L); // 當前已同步的條數
       
     private int syncThreadNum;  // 同步的線程數
   
     @Override  
     public void syncData(int businessType) throws Exception {  
           
         for (String tmpTBName : coreTBNames) {  
             log.info("開始同步核心庫" + tmpTBName + "表數據");  
             // 獲得核心庫連接
             Connection coreConnection = connectionFactory.getDMSConnection(4);  
             Statement coreStmt = coreConnection.createStatement();  
             // 為每個線程分配結果集
             ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);  
             coreRs.next();  
             // 總共處理的數量
             long totalNum = coreRs.getLong(1);  
             // 每個線程處理的數量
             long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));   
             log.info("共需要同步的數據量:"+totalNum);  
             log.info("同步線程數量:"+syncThreadNum);  
             log.info("每個線程可處理的數量:"+ownerRecordNum);  
             // 開啟五個線程向目標庫同步數據
             for(int i=0; i < syncThreadNum; i ++){  
                 StringBuilder sqlBuilder = new StringBuilder();  
                 // 拼裝后SQL示例
                 // Select * From dms_core_ds Where id between 1 And 657398
                 // Select * From dms_core_ds Where id between 657399 And
     // 1314796
                 // Select * From dms_core_ds Where id between 1314797 And
     // 1972194
                 // Select * From dms_core_ds Where id between 1972195 And
     // 2629592
                 // Select * From dms_core_ds Where id between 2629593 And
     // 3286990
                 // ..
                 sqlBuilder.append("Select * From ").append(tmpTBName)  
                         .append(" Where id between " ).append(i * ownerRecordNum +1)  
                         .append( " And ")  
                         .append((i * ownerRecordNum + ownerRecordNum));  
                 Thread workThread = new Thread(  
                         new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));  
                 workThread.setName("SyncThread-"+i);  
                 workThread.start();  
             }  
             while (currentSynCount.get() < totalNum);  
             // 休眠一會兒讓數據庫有機會commit剩余的批處理(只針對JUnit單元測試,因為單元測試完成后會關閉虛擬器,使線程里的代碼沒有機會作提交操作);
             // Thread.sleep(1000 * 3);
             log.info( "核心庫"+tmpTBName+"表數據同步完成,共同步了" + currentSynCount.get() + "條數據");  
         }  
     }// end for loop
       
     public void setCoreTBNames(List<String> coreTBNames) {  
         this.coreTBNames = coreTBNames;  
     }  
   
     public void setConnectionFactory(ConnectionFactory connectionFactory) {  
         this.connectionFactory = connectionFactory;  
     }  
       
     public void setSyncThreadNum(int syncThreadNum) {  
         this.syncThreadNum = syncThreadNum;  
     }  
       
     // 數據同步線程
     final class WorkerHandler implements Runnable {  
         ResultSet coreRs;  
         String queryStr;  
         int businessType;  
         String targetTBName;  
         public WorkerHandler(String queryStr,int businessType,String targetTBName) {  
             this.queryStr = queryStr;  
             this.businessType = businessType;  
             this.targetTBName = targetTBName;  
         }  
         @Override  
         public void run() {  
             try {  
                 // 開始同步
                 launchSyncData();  
             } catch(Exception e){  
                 log.error(e);  
                 e.printStackTrace();  
             }  
         }  
         // 同步數據方法
         void launchSyncData() throws Exception{  
             // 獲得核心庫連接
             Connection coreConnection = connectionFactory.getDMSConnection(4);  
             Statement coreStmt = coreConnection.createStatement();  
             // 獲得目標庫連接
             Connection targetConn = connectionFactory.getDMSConnection(businessType);  
             targetConn.setAutoCommit(false);// 設置手動提交
             PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");  
             ResultSet coreRs = coreStmt.executeQuery(queryStr);  
             log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);  
             int batchCounter = 0; // 累加的批處理數量
             while (coreRs.next()) {  
                 targetPstmt.setString(1, coreRs.getString(2));  
                 targetPstmt.setString(2, coreRs.getString(3));  
                 targetPstmt.setString(3, coreRs.getString(4));  
                 targetPstmt.setString(4, coreRs.getString(5));  
                 targetPstmt.setString(5, coreRs.getString(6));  
                 targetPstmt.addBatch();  
                 batchCounter++;  
                 currentSynCount.incrementAndGet();// 遞增
                 if (batchCounter % 10000 == 0) { // 1萬條數據一提交
                     targetPstmt.executeBatch();  
                     targetPstmt.clearBatch();  
                     targetConn.commit();  
                 }  
             }  
             // 提交剩余的批處理
             targetPstmt.executeBatch();  
             targetPstmt.clearBatch();  
             targetConn.commit();  
             // 釋放連接
             connectionFactory.release(targetConn, targetPstmt,coreRs);  
         }  
     }  
 }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM