從Oracle到Elasticsearch


自己寫的數據交換工具——從Oracle到Elasticsearch

 

先說說需求的背景,由於業務數據都在Oracle數據庫中,想要對它進行數據的分析會非常非常慢,用傳統的數據倉庫-->數據集市這種方式,集市層表會非常大,查詢的時候如果再做一些group的操作,一個訪問需要一分鍾甚至更久才能響應。

為了解決這個問題,就想把業務庫的數據遷移到Elasticsearch中,然后針對es再去做聚合查詢。

問題來了,數據庫中的數據量很大,如何導入到ES中呢?

Logstash JDBC

Logstash提供了一款JDBC的插件,可以在里面寫sql語句,自動查詢然后導入到ES中。這種方式比較簡單,需要注意的就是需要用戶自己下載jdbc的驅動jar包。

input {
    jdbc {
        jdbc_driver_library => "ojdbc14-10.2.0.3.0.jar" jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:test" jdbc_user => "test" jdbc_password => "test123" schedule => "* * * * *" statement => "select * from TARGET_TABLE" add_field => ["type","a"] } } output{ elasticsearch { hosts =>["10.10.1.205:9200"] index => "product" document_type => "%{type}" } }

不過,它的性能實在是太差了!我導了一天,才導了兩百多萬的數據。

因此,就考慮自己來導。

自己的數據交換工具

思路:

最后使用發現,自己寫的導入程序,比Logstash jdbc快5-6倍~~~~~~ 嗨皮!!!!

遇到的問題

  • 1 JDBC需要采用分頁的方式讀取全量數據
  • 2 要模仿bulk文件進行存儲
  • 3 由於bulk文件過大,導致curl內存溢出

程序開源

下面的代碼需要注意的就是

public class JDBCUtil { private static Connection conn = null; private static PreparedStatement sta=null; static{ try { Class.forName("oracle.jdbc.driver.OracleDriver"); conn = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:test", "test", "test123"); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (SQLException e) { e.printStackTrace(); } System.out.println("Database connection established"); } /**  * 把查到的數據格式化寫入到文件  *  * @param list 需要存儲的數據  * @param index 索引的名稱  * @param type 類型的名稱  * @param path 文件存儲的路徑  **/ public static void writeTable(List<Map> list,String index,String type,String path) throws SQLException, IOException { System.out.println("開始寫文件"); File file = new File(path); int count = 0; int size = list.size(); for(Map map : list){ FileUtils.write(file, "{ \"index\" : { \"_index\" : \""+index+"\", \"_type\" : \""+type+"\" } }\n","UTF-8",true); FileUtils.write(file, JSON.toJSONString(map)+"\n","UTF-8",true); // System.out.println("寫入了" + ((count++)+1) + "[" + size + "]"); } System.out.println("寫入完成"); } /**  * 讀取數據  * @param sql  * @return  * @throws SQLException  */ public static List<Map> readTable(String tablename,int start,int end) throws SQLException { System.out.println("開始讀數據庫"); //執行查詢 sta = conn.prepareStatement("select * from(select rownum as rn,t.* from "+tablename+" t )where rn >="+start+" and rn <"+end); ResultSet rs = sta.executeQuery(); //獲取數據列表 List<Map> data = new ArrayList(); List<String> columnLabels = getColumnLabels(rs); Map<String, Object> map = null; while(rs.next()){ map = new HashMap<String, Object>(); for (String columnLabel : columnLabels) { Object value = rs.getObject(columnLabel); map.put(columnLabel.toLowerCase(), value); } data.add(map); } sta.close(); System.out.println("數據讀取完畢"); return data; } /**  * 獲得列名  * @param resultSet  * @return  * @throws SQLException  */ private static List<String> getColumnLabels(ResultSet resultSet) throws SQLException { List<String> labels = new ArrayList<String>(); ResultSetMetaData rsmd = (ResultSetMetaData) resultSet.getMetaData(); for (int i = 0; i < rsmd.getColumnCount(); i++) { labels.add(rsmd.getColumnLabel(i + 1)); } return labels; } /**  * 獲得數據庫表的總數,方便進行分頁  *  * @param tablename 表名  */ public static int count(String tablename) throws SQLException { int count = 0; Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE); ResultSet rs = stmt.executeQuery("select count(1) from "+tablename); while (rs.next()) { count = rs.getInt(1); } System.out.println("Total Size = " + count); rs.close(); stmt.close(); return count; } /**  * 執行查詢,並持久化文件  *   * @param tablename 導出的表明  * @param page 分頁的大小  * @param path 文件的路徑  * @param index 索引的名稱  * @param type 類型的名稱  * @return  * @throws SQLException  */ public static void readDataByPage(String tablename,int page,String path,String index,String type) throws SQLException, IOException { int count = count(tablename); int i =0; for(i =0;i<count;){ List<Map> map = JDBCUtil.readTable(tablename,i,i+page); JDBCUtil.writeTable(map,index,type,path); i+=page; } } }

在main方法中傳入必要的參數即可:

public class Main { public static void main(String[] args) { try { JDBCUtil.readDataByPage("TABLE_NAME",1000,"D://data.json","index","type"); } catch (SQLException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }

這樣得到bulk的數據后,就可以運行腳本分批導入了。

下面腳本的思路,就是每100000行左右的數據導入到一個目標文件,使用bulk命令導入到es中。注意一個細節就是不能隨意的切分文件,因為bulk的文件是兩行為一條數據的。

#!/bin/bash count=0 rm target.json touch target.json while read line;do ((count++)) { echo $line >> target.json if [ $count -gt 100000 ] && [ $((count%2)) -eq 0 ];then count=0 curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null rm target.json touch target.json fi } done < $1 echo 'last submit' curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null

最后執行腳本:

sh auto_bulk.sh data.json

自己測試最后要比logstasj jdbc快5-6倍。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM