從文件導數據到數據庫的性能優化思路(筆記)


概述

     最近公司一.NET項目需要對其日志Log入數據庫統計,寫這個腳本導入的任務便落到我身上了。采用了熟練的Java,這個日志也不是很大,一個文件大概幾兆,有上萬條數據,一天大概有7,8個文件需要這樣的導入處理。由於之前寫Web沒有這么去批處理這么多數據,所以沒有太注意性能,第一個版本程序導入速度慢的嚇人,一個文件導完可能需要10多分鍾,也就是說如果把每天的文件導完可能需要2個多小時的時間,聽聽就很蛋疼,最終經過優化后,一個文件導入也就幾秒,甚至可以更短。目標日志文件的信息都是按行存儲,所以程序中按行讀取后,然后進行相應的字符串截取入庫。下面則為思路分享以及主要代碼的分享。
 
優化思路
     1.程序流程:
       程序先讀取本地的文件到內存,然后把內存的數據批量Insert到數據庫。
     2.歸納:
     可以看出首先程序需要進行文件IO操作,然后則是數據JDBC操作,所以優化方向大致可以是以下幾個:
          a.文件IO優化
          b.JDBC操作優化
          c.使用多線程並行JDBC操作
 文件常見IO簡介
     Java的文件讀寫操作大概有這么幾種方式,但是我們應該注意幾種文件操作方式的區別,哪些操作方式適合不同的數據文件對象。
     1.(InputStream/OutputStream)    為字節輸入/輸出流,這種讀寫方式都是按一定字節量讀取數據。
     2. (FileInputStream/FileOutputStream) 此方法繼承自上面的(InputStream/OutpustStream),同樣按字節流輸入/輸出,用於讀取圖像之類的原始字節流
     3.(FileReader/FileWriter) 此方法適用於按字符流的文件操作
     4. (BufferedReader/BufferedWriter) 從字符輸入流中讀取文本,緩沖各個字符,從而實現字符、數組和行的高效讀取。
 
      注:更詳細的IO操作說明,請查看具體的JDK文檔。
   此處我采用的BufferedReader按行讀取, 代碼片段:
 1     public static List<String> getLogLinesByBuf(String filePath){
 2         
 3         List<String> items = new ArrayList<String>();
 4         File file = new File(filePath);    
 5         BufferedReader reader;
 6         if (file.exists()) {
 7             
 8             try {
 9                 reader = new BufferedReader(new FileReader(file));
10                 String temp = "";
11                 while((temp = reader.readLine()) != null) {
12                     items.add(temp);
13                 }            
14                 //close
15                 reader.close();
16             } catch (Exception e) {                
17                 e.printStackTrace();
18             }
19         } else {
20             System.out.println("該路徑文件不存在.");
21         }        
22         return items;
23     }

 

PreparedStatement和Statement 
        JDBC操作我們經常會用到PreparedStatement和Statement,PreparedStatement相對Statement來講,PreparedStatement擁有預編譯能力,性能更好,2者其它的優缺點比較可以查看相關的資料。另外,平常我們插入數據都是一條,2條,當完成成千上萬條數據插入操作的時候,你會看到性能是直線下降的,所以這里會采用sql批處理。
       代碼片段:
    
    public static void insertLogInfo(List<String> data) {
        
        String sql = "INSERT INTO log_info(date_time,s_sitename,s_ip,cs_method,cs_uri_stem,cs_uri_query,"
                + "s_port,cs_username,c_ip,cs_user_agent,sc_status,sc_substatus,sc_win32_status"
                + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)";
        Connection conn = DBSource.getConnection();
        int count = 0;
        try {
            conn.setAutoCommit(false);
            PreparedStatement prest = conn.prepareStatement(sql);  
            
            for(String str : data) {
                String[] arr = str.split(" ");
                prest.setString(1, arr[0]+" "+arr[1]);
                prest.setString(2, arr[2]);
                prest.setString(3, arr[3]);
                prest.setString(4, arr[4]);
                prest.setString(5, arr[5]);
                prest.setString(6, arr[6]);
                prest.setString(7, arr[7]);
                prest.setString(8, arr[8]);
                prest.setString(9, arr[9]);
                prest.setString(10, arr[10]);
                prest.setString(11, arr[11]);
                prest.setString(12, arr[12]);
                prest.setString(13, arr[13]);
                //添加到批處理
                prest.addBatch();    
            }

            int [] intarr = prest.executeBatch();
            conn.commit();  
            prest.clearBatch();                      
            prest.close();
            conn.close();
            for (int j = 0 ; j < intarr.length; j++) {
                if (intarr[j] > 0) {
                    count +=1;
                }
            }    
        } catch (Exception e) {
            System.out.println(new Date().toLocaleString()+":數據庫插入操作失敗"+e.getMessage());
        }      
        System.out.println("本次操作成功插入"+count+"行數據");    
    }
View Code

 

多線程並行處理
         例如本來1萬條數據是一個線程進行JDBC批量提交,現在啟用5個線程並行處理,每個線程2000條數據,甚至你可以根據數據量來分配更多線程來完成同步提交,性能提升會比較明顯。
       代碼片段:
  Thread類:  
  1 package com.xj.dbsource;
  2 
  3 import java.io.File;
  4 
  5 import java.sql.Connection;
  6 import java.sql.DriverManager;
  7 import java.sql.PreparedStatement;
  8 import java.sql.SQLException;
  9 import java.sql.Statement;
 10 import java.util.Date;
 11 import java.util.List;
 12 
 13 import com.json.utils.JsonFileUtils;
 14 import com.xj.iislog.bean.JDBCInfo;
 15 
 16 
 17 
 18 /**
 19  * 
 20  * @author Ziv
 21  * 數據操作源
 22  */
 23 public class DBSource extends Thread {
 24     
 25     //聲明對象
 26     private static Statement statement;
 27     //連接對象
 28     private static Connection conn;
 29     
 30     private List<String> data;
 31         
 32     public DBSource(List<String> data) {
 33         super();
 34         this.data = data;
 35     }
 36 
 37     public void run(){
 38         System.out.println(System.currentTimeMillis());
 39         DBSource.insertLogInfo(data);
 40         System.out.println(System.currentTimeMillis());
 41     }
 42     
 43     /**
 44      * 
 45      * @param sql
 46      * @return int
 47      */
 48     @SuppressWarnings("deprecation")
 49     public int insert(String sql) {
 50         
 51         int result = 0;
 52         try {
 53             conn = getConnection();
 54             statement = conn.createStatement();
 55             result = statement.executeUpdate(sql);        
 56             //關閉連接
 57             conn.close();
 58         } catch (SQLException e) {
 59             System.out.println(new Date().toLocaleString()+":數據庫插入操作失敗" +e.getMessage());
 60         }
 61         
 62         return result;
 63     }
 64     
 65     /**
 66      * prepared方式入庫
 67      * @param arr
 68      * @return
 69      * @throws SQLException
 70      */
 71     @SuppressWarnings("deprecation")
 72     public static void insertLogInfo(List<String> data) {
 73         
 74         String sql = "INSERT INTO log_info(date_time,s_sitename,s_ip,cs_method,cs_uri_stem,cs_uri_query,"
 75                 + "s_port,cs_username,c_ip,cs_user_agent,sc_status,sc_substatus,sc_win32_status"
 76                 + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)";
 77         Connection conn = DBSource.getConnection();
 78         int count = 0;
 79         try {
 80             conn.setAutoCommit(false);
 81             PreparedStatement prest = conn.prepareStatement(sql);  
 82             
 83             for(String str : data) {
 84                 String[] arr = str.split(" ");
 85                 prest.setString(1, arr[0]+" "+arr[1]);
 86                 prest.setString(2, arr[2]);
 87                 prest.setString(3, arr[3]);
 88                 prest.setString(4, arr[4]);
 89                 prest.setString(5, arr[5]);
 90                 prest.setString(6, arr[6]);
 91                 prest.setString(7, arr[7]);
 92                 prest.setString(8, arr[8]);
 93                 prest.setString(9, arr[9]);
 94                 prest.setString(10, arr[10]);
 95                 prest.setString(11, arr[11]);
 96                 prest.setString(12, arr[12]);
 97                 prest.setString(13, arr[13]);
 98                 //添加到批處理
 99                 prest.addBatch();    
100             }
101 
102             int [] intarr = prest.executeBatch();
103             conn.commit();  
104             prest.clearBatch();                      
105             prest.close();
106             conn.close();
107             for (int j = 0 ; j < intarr.length; j++) {
108                 if (intarr[j] > 0) {
109                     count +=1;
110                 }
111             }    
112         } catch (Exception e) {
113             System.out.println(new Date().toLocaleString()+":數據庫插入操作失敗"+e.getMessage());
114         }      
115         System.out.println("本次操作成功插入"+count+"行數據");    
116     }
117     
118     /**
119      * 創建連接池
120      * @return Connection
121      */
122     public static Connection getConnection() {
123         Connection con = null;
124         try {
125             //從配置文件中獲取jdbc config
126             JDBCInfo jdbc = JsonFileUtils.readJsonFile(new File("resource/config.json"), JDBCInfo.class);
127             if (jdbc != null) {
128                 //mysql驅動加載
129                 Class.forName(jdbc.getDriver());
130                 con = DriverManager.getConnection(jdbc.getUrl(),
131                         jdbc.getUser(), jdbc.getPassword());                            
132             }
133         } catch (Exception e) {
134             System.out.println("數據庫連接失敗" +e.getMessage());
135         }
136         return con;
137     }
138     
139     
140     /**
141      * 獲取Sql
142      * @param arr
143      * @return
144      */
145     public String getSql(String[] arr) {
146         
147         StringBuffer sql = new StringBuffer("INSERT INTO log_info (");
148         sql.append("date_time,");
149         sql.append("s_sitename,");
150         sql.append("s_ip,");
151         sql.append("cs_method,");
152         sql.append("cs_uri_stem,");
153         sql.append("cs_uri_query,");
154         sql.append("s_port,");
155         sql.append("cs_username,");
156         sql.append("c_ip,");
157         sql.append("cs_user_agent,");
158         sql.append("sc_status,");
159         sql.append("sc_substatus,");
160         sql.append("sc_win32_status");
161         sql.append(") VALUES ('");
162         sql.append(arr[0]+" "+arr[1]);
163         sql.append("','");
164         sql.append(arr[2]);
165         sql.append("','");
166         sql.append(arr[3]);
167         sql.append("','");
168         sql.append(arr[4]);
169         sql.append("','");
170         sql.append(arr[5]);
171         sql.append("','");
172         sql.append(arr[6]);
173         sql.append("','");
174         sql.append(arr[7]);
175         sql.append("','");
176         sql.append(arr[8]);
177         sql.append("','");
178         sql.append(arr[9]);
179         sql.append("','");
180         sql.append(arr[10]);
181         sql.append("','");
182         sql.append(arr[11]);
183         sql.append("','");
184         sql.append(arr[12]);
185         sql.append("','");
186         sql.append(arr[13]);
187         sql.append("')");
188         
189         return sql.toString();
190     }
191 }
View Code
  調用代碼:
 1     /**
 2      * 此方法采用遞歸操作,直至數據全部入庫寫入完畢
 3      * 同時調用5個線程進行入庫操作
 4      * @param data
 5      * @param start
 6      * @param end
 7      */
 8     public static void threadsHandle(List<String> data, int start, int end) {
 9         
10         int total = data.size();
11         int size = (int)data.size()/5;
12         //數據不越界
13         if (start < total) {
14             List<String> temp = null;
15             if (end < total) {
16                 temp = data.subList(start, end);                        
17             } else if (end >= total) {
18                 temp = data.subList(start, total);
19             }
20             //執行數據寫入
21             DBSource thread = new DBSource(temp);
22             thread.start();
23 
24             start = end;
25             end = start+size;
26             threadsHandle(data, start, end);
27         }
28     }

 

最終結果:
     原來的12分鍾,變成了6秒左右,效率大了一大截。其他朋友如果有更好的建議,可以跟我交流下0.0。下次再把數據弄的更大些。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM