自己寫的數據交換工具——從Oracle到Elasticsearch
先說說需求的背景,由於業務數據都在Oracle數據庫中,想要對它進行數據的分析會非常非常慢,用傳統的數據倉庫-->數據集市這種方式,集市層表會非常大,查詢的時候如果再做一些group的操作,一個訪問需要一分鍾甚至更久才能響應。
為了解決這個問題,就想把業務庫的數據遷移到Elasticsearch中,然后針對es再去做聚合查詢。
問題來了,數據庫中的數據量很大,如何導入到ES中呢?
Logstash JDBC
Logstash提供了一款JDBC的插件,可以在里面寫sql語句,自動查詢然后導入到ES中。這種方式比較簡單,需要注意的就是需要用戶自己下載jdbc的驅動jar包。
input {
jdbc {
jdbc_driver_library => "ojdbc14-10.2.0.3.0.jar" jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:test" jdbc_user => "test" jdbc_password => "test123" schedule => "* * * * *" statement => "select * from TARGET_TABLE" add_field => ["type","a"] } } output{ elasticsearch { hosts =>["10.10.1.205:9200"] index => "product" document_type => "%{type}" } }
不過,它的性能實在是太差了!我導了一天,才導了兩百多萬的數據。
因此,就考慮自己來導。
自己的數據交換工具
思路:
- 1 采用JDBC的方式,通過分頁讀取數據庫的全部數據。
- 2 數據庫讀取的數據存儲成bulk形式的數據,關於bulk需要的文件格式,可以參考這里
- 3 利用bulk命令分批導入到es中
最后使用發現,自己寫的導入程序,比Logstash jdbc快5-6倍~~~~~~ 嗨皮!!!!
遇到的問題
- 1 JDBC需要采用分頁的方式讀取全量數據
- 2 要模仿bulk文件進行存儲
- 3 由於bulk文件過大,導致curl內存溢出
程序開源
下面的代碼需要注意的就是
public class JDBCUtil { private static Connection conn = null; private static PreparedStatement sta=null; static{ try { Class.forName("oracle.jdbc.driver.OracleDriver"); conn = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:test", "test", "test123"); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (SQLException e) { e.printStackTrace(); } System.out.println("Database connection established"); } /** * 把查到的數據格式化寫入到文件 * * @param list 需要存儲的數據 * @param index 索引的名稱 * @param type 類型的名稱 * @param path 文件存儲的路徑 **/ public static void writeTable(List<Map> list,String index,String type,String path) throws SQLException, IOException { System.out.println("開始寫文件"); File file = new File(path); int count = 0; int size = list.size(); for(Map map : list){ FileUtils.write(file, "{ \"index\" : { \"_index\" : \""+index+"\", \"_type\" : \""+type+"\" } }\n","UTF-8",true); FileUtils.write(file, JSON.toJSONString(map)+"\n","UTF-8",true); // System.out.println("寫入了" + ((count++)+1) + "[" + size + "]"); } System.out.println("寫入完成"); } /** * 讀取數據 * @param sql * @return * @throws SQLException */ public static List<Map> readTable(String tablename,int start,int end) throws