概述
最近公司一.NET項目需要對其日志Log入數據庫統計,寫這個腳本導入的任務便落到我身上了。采用了熟練的Java,這個日志也不是很大,一個文件大概幾兆,有上萬條數據,一天大概有7,8個文件需要這樣的導入處理。由於之前寫Web沒有這么去批處理這么多數據,所以沒有太注意性能,第一個版本程序導入速度慢的嚇人,一個文件導完可能需要10多分鍾,也就是說如果把每天的文件導完可能需要2個多小時的時間,聽聽就很蛋疼,最終經過優化后,一個文件導入也就幾秒,甚至可以更短。目標日志文件的信息都是按行存儲,所以程序中按行讀取后,然后進行相應的字符串截取入庫。下面則為思路分享以及主要代碼的分享。
優化思路
1.程序流程:
程序先讀取本地的文件到內存,然后把內存的數據批量Insert到數據庫。
2.歸納:
可以看出首先程序需要進行文件IO操作,然后則是數據JDBC操作,所以優化方向大致可以是以下幾個:
a.文件IO優化
b.JDBC操作優化
c.使用多線程並行JDBC操作
文件常見IO簡介
Java的文件讀寫操作大概有這么幾種方式,但是我們應該注意幾種文件操作方式的區別,哪些操作方式適合不同的數據文件對象。
1.(InputStream/OutputStream) 為字節輸入/輸出流,這種讀寫方式都是按一定字節量讀取數據。
2. (FileInputStream/FileOutputStream) 此方法繼承自上面的(InputStream/OutpustStream),同樣按字節流輸入/輸出,用於讀取圖像之類的原始字節流
3.(FileReader/FileWriter) 此方法適用於按字符流的文件操作
4. (BufferedReader/BufferedWriter) 從字符輸入流中讀取文本,緩沖各個字符,從而實現字符、數組和行的高效讀取。
注:更詳細的IO操作說明,請查看具體的JDK文檔。
此處我采用的BufferedReader按行讀取,
代碼片段:
1 public static List<String> getLogLinesByBuf(String filePath){ 2 3 List<String> items = new ArrayList<String>(); 4 File file = new File(filePath); 5 BufferedReader reader; 6 if (file.exists()) { 7 8 try { 9 reader = new BufferedReader(new FileReader(file)); 10 String temp = ""; 11 while((temp = reader.readLine()) != null) { 12 items.add(temp); 13 } 14 //close 15 reader.close(); 16 } catch (Exception e) { 17 e.printStackTrace(); 18 } 19 } else { 20 System.out.println("該路徑文件不存在."); 21 } 22 return items; 23 }
PreparedStatement和Statement
JDBC操作我們經常會用到PreparedStatement和Statement,PreparedStatement相對Statement來講,PreparedStatement擁有預編譯能力,性能更好,2者其它的優缺點比較可以查看相關的資料。另外,平常我們插入數據都是一條,2條,當完成成千上萬條數據插入操作的時候,你會看到性能是直線下降的,所以這里會采用sql批處理。
代碼片段:

public static void insertLogInfo(List<String> data) { String sql = "INSERT INTO log_info(date_time,s_sitename,s_ip,cs_method,cs_uri_stem,cs_uri_query," + "s_port,cs_username,c_ip,cs_user_agent,sc_status,sc_substatus,sc_win32_status" + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)"; Connection conn = DBSource.getConnection(); int count = 0; try { conn.setAutoCommit(false); PreparedStatement prest = conn.prepareStatement(sql); for(String str : data) { String[] arr = str.split(" "); prest.setString(1, arr[0]+" "+arr[1]); prest.setString(2, arr[2]); prest.setString(3, arr[3]); prest.setString(4, arr[4]); prest.setString(5, arr[5]); prest.setString(6, arr[6]); prest.setString(7, arr[7]); prest.setString(8, arr[8]); prest.setString(9, arr[9]); prest.setString(10, arr[10]); prest.setString(11, arr[11]); prest.setString(12, arr[12]); prest.setString(13, arr[13]); //添加到批處理 prest.addBatch(); } int [] intarr = prest.executeBatch(); conn.commit(); prest.clearBatch(); prest.close(); conn.close(); for (int j = 0 ; j < intarr.length; j++) { if (intarr[j] > 0) { count +=1; } } } catch (Exception e) { System.out.println(new Date().toLocaleString()+":數據庫插入操作失敗"+e.getMessage()); } System.out.println("本次操作成功插入"+count+"行數據"); }
多線程並行處理
例如本來1萬條數據是一個線程進行JDBC批量提交,現在啟用5個線程並行處理,每個線程2000條數據,甚至你可以根據數據量來分配更多線程來完成同步提交,性能提升會比較明顯。
代碼片段:
Thread類:

1 package com.xj.dbsource; 2 3 import java.io.File; 4 5 import java.sql.Connection; 6 import java.sql.DriverManager; 7 import java.sql.PreparedStatement; 8 import java.sql.SQLException; 9 import java.sql.Statement; 10 import java.util.Date; 11 import java.util.List; 12 13 import com.json.utils.JsonFileUtils; 14 import com.xj.iislog.bean.JDBCInfo; 15 16 17 18 /** 19 * 20 * @author Ziv 21 * 數據操作源 22 */ 23 public class DBSource extends Thread { 24 25 //聲明對象 26 private static Statement statement; 27 //連接對象 28 private static Connection conn; 29 30 private List<String> data; 31 32 public DBSource(List<String> data) { 33 super(); 34 this.data = data; 35 } 36 37 public void run(){ 38 System.out.println(System.currentTimeMillis()); 39 DBSource.insertLogInfo(data); 40 System.out.println(System.currentTimeMillis()); 41 } 42 43 /** 44 * 45 * @param sql 46 * @return int 47 */ 48 @SuppressWarnings("deprecation") 49 public int insert(String sql) { 50 51 int result = 0; 52 try { 53 conn = getConnection(); 54 statement = conn.createStatement(); 55 result = statement.executeUpdate(sql); 56 //關閉連接 57 conn.close(); 58 } catch (SQLException e) { 59 System.out.println(new Date().toLocaleString()+":數據庫插入操作失敗" +e.getMessage()); 60 } 61 62 return result; 63 } 64 65 /** 66 * prepared方式入庫 67 * @param arr 68 * @return 69 * @throws SQLException 70 */ 71 @SuppressWarnings("deprecation") 72 public static void insertLogInfo(List<String> data) { 73 74 String sql = "INSERT INTO log_info(date_time,s_sitename,s_ip,cs_method,cs_uri_stem,cs_uri_query," 75 + "s_port,cs_username,c_ip,cs_user_agent,sc_status,sc_substatus,sc_win32_status" 76 + ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)"; 77 Connection conn = DBSource.getConnection(); 78 int count = 0; 79 try { 80 conn.setAutoCommit(false); 81 PreparedStatement prest = conn.prepareStatement(sql); 82 83 for(String str : data) { 84 String[] arr = str.split(" "); 85 prest.setString(1, arr[0]+" "+arr[1]); 86 prest.setString(2, arr[2]); 87 prest.setString(3, arr[3]); 88 prest.setString(4, arr[4]); 89 prest.setString(5, arr[5]); 90 prest.setString(6, arr[6]); 91 prest.setString(7, arr[7]); 92 prest.setString(8, arr[8]); 93 prest.setString(9, arr[9]); 94 prest.setString(10, arr[10]); 95 prest.setString(11, arr[11]); 96 prest.setString(12, arr[12]); 97 prest.setString(13, arr[13]); 98 //添加到批處理 99 prest.addBatch(); 100 } 101 102 int [] intarr = prest.executeBatch(); 103 conn.commit(); 104 prest.clearBatch(); 105 prest.close(); 106 conn.close(); 107 for (int j = 0 ; j < intarr.length; j++) { 108 if (intarr[j] > 0) { 109 count +=1; 110 } 111 } 112 } catch (Exception e) { 113 System.out.println(new Date().toLocaleString()+":數據庫插入操作失敗"+e.getMessage()); 114 } 115 System.out.println("本次操作成功插入"+count+"行數據"); 116 } 117 118 /** 119 * 創建連接池 120 * @return Connection 121 */ 122 public static Connection getConnection() { 123 Connection con = null; 124 try { 125 //從配置文件中獲取jdbc config 126 JDBCInfo jdbc = JsonFileUtils.readJsonFile(new File("resource/config.json"), JDBCInfo.class); 127 if (jdbc != null) { 128 //mysql驅動加載 129 Class.forName(jdbc.getDriver()); 130 con = DriverManager.getConnection(jdbc.getUrl(), 131 jdbc.getUser(), jdbc.getPassword()); 132 } 133 } catch (Exception e) { 134 System.out.println("數據庫連接失敗" +e.getMessage()); 135 } 136 return con; 137 } 138 139 140 /** 141 * 獲取Sql 142 * @param arr 143 * @return 144 */ 145 public String getSql(String[] arr) { 146 147 StringBuffer sql = new StringBuffer("INSERT INTO log_info ("); 148 sql.append("date_time,"); 149 sql.append("s_sitename,"); 150 sql.append("s_ip,"); 151 sql.append("cs_method,"); 152 sql.append("cs_uri_stem,"); 153 sql.append("cs_uri_query,"); 154 sql.append("s_port,"); 155 sql.append("cs_username,"); 156 sql.append("c_ip,"); 157 sql.append("cs_user_agent,"); 158 sql.append("sc_status,"); 159 sql.append("sc_substatus,"); 160 sql.append("sc_win32_status"); 161 sql.append(") VALUES ('"); 162 sql.append(arr[0]+" "+arr[1]); 163 sql.append("','"); 164 sql.append(arr[2]); 165 sql.append("','"); 166 sql.append(arr[3]); 167 sql.append("','"); 168 sql.append(arr[4]); 169 sql.append("','"); 170 sql.append(arr[5]); 171 sql.append("','"); 172 sql.append(arr[6]); 173 sql.append("','"); 174 sql.append(arr[7]); 175 sql.append("','"); 176 sql.append(arr[8]); 177 sql.append("','"); 178 sql.append(arr[9]); 179 sql.append("','"); 180 sql.append(arr[10]); 181 sql.append("','"); 182 sql.append(arr[11]); 183 sql.append("','"); 184 sql.append(arr[12]); 185 sql.append("','"); 186 sql.append(arr[13]); 187 sql.append("')"); 188 189 return sql.toString(); 190 } 191 }
調用代碼:
1 /** 2 * 此方法采用遞歸操作,直至數據全部入庫寫入完畢 3 * 同時調用5個線程進行入庫操作 4 * @param data 5 * @param start 6 * @param end 7 */ 8 public static void threadsHandle(List<String> data, int start, int end) { 9 10 int total = data.size(); 11 int size = (int)data.size()/5; 12 //數據不越界 13 if (start < total) { 14 List<String> temp = null; 15 if (end < total) { 16 temp = data.subList(start, end); 17 } else if (end >= total) { 18 temp = data.subList(start, total); 19 } 20 //執行數據寫入 21 DBSource thread = new DBSource(temp); 22 thread.start(); 23 24 start = end; 25 end = start+size; 26 threadsHandle(data, start, end); 27 } 28 }
最終結果:
原來的12分鍾,變成了6秒左右,效率大了一大截。其他朋友如果有更好的建議,可以跟我交流下0.0。下次再把數據弄的更大些。