从Oracle到Elasticsearch


自己写的数据交换工具——从Oracle到Elasticsearch

 

先说说需求的背景,由于业务数据都在Oracle数据库中,想要对它进行数据的分析会非常非常慢,用传统的数据仓库-->数据集市这种方式,集市层表会非常大,查询的时候如果再做一些group的操作,一个访问需要一分钟甚至更久才能响应。

为了解决这个问题,就想把业务库的数据迁移到Elasticsearch中,然后针对es再去做聚合查询。

问题来了,数据库中的数据量很大,如何导入到ES中呢?

Logstash JDBC

Logstash提供了一款JDBC的插件,可以在里面写sql语句,自动查询然后导入到ES中。这种方式比较简单,需要注意的就是需要用户自己下载jdbc的驱动jar包。

input {
    jdbc {
        jdbc_driver_library => "ojdbc14-10.2.0.3.0.jar" jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:test" jdbc_user => "test" jdbc_password => "test123" schedule => "* * * * *" statement => "select * from TARGET_TABLE" add_field => ["type","a"] } } output{ elasticsearch { hosts =>["10.10.1.205:9200"] index => "product" document_type => "%{type}" } }

不过,它的性能实在是太差了!我导了一天,才导了两百多万的数据。

因此,就考虑自己来导。

自己的数据交换工具

思路:

最后使用发现,自己写的导入程序,比Logstash jdbc快5-6倍~~~~~~ 嗨皮!!!!

遇到的问题

  • 1 JDBC需要采用分页的方式读取全量数据
  • 2 要模仿bulk文件进行存储
  • 3 由于bulk文件过大,导致curl内存溢出

程序开源

下面的代码需要注意的就是

public class JDBCUtil { private static Connection conn = null; private static PreparedStatement sta=null; static{ try { Class.forName("oracle.jdbc.driver.OracleDriver"); conn = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:test", "test", "test123"); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (SQLException e) { e.printStackTrace(); } System.out.println("Database connection established"); } /**  * 把查到的数据格式化写入到文件  *  * @param list 需要存储的数据  * @param index 索引的名称  * @param type 类型的名称  * @param path 文件存储的路径  **/ public static void writeTable(List<Map> list,String index,String type,String path) throws SQLException, IOException { System.out.println("开始写文件"); File file = new File(path); int count = 0; int size = list.size(); for(Map map : list){ FileUtils.write(file, "{ \"index\" : { \"_index\" : \""+index+"\", \"_type\" : \""+type+"\" } }\n","UTF-8",true); FileUtils.write(file, JSON.toJSONString(map)+"\n","UTF-8",true); // System.out.println("写入了" + ((count++)+1) + "[" + size + "]"); } System.out.println("写入完成"); } /**  * 读取数据  * @param sql  * @return  * @throws SQLException  */ public static List<Map> readTable(String tablename,int start,int end) throws SQLException { System.out.println("开始读数据库"); //执行查询 sta = conn.prepareStatement("select * from(select rownum as rn,t.* from "+tablename+" t )where rn >="+start+" and rn <"+end); ResultSet rs = sta.executeQuery(); //获取数据列表 List<Map> data = new ArrayList(); List<String> columnLabels = getColumnLabels(rs); Map<String, Object> map = null; while(rs.next()){ map = new HashMap<String, Object>(); for (String columnLabel : columnLabels) { Object value = rs.getObject(columnLabel); map.put(columnLabel.toLowerCase(), value); } data.add(map); } sta.close(); System.out.println("数据读取完毕"); return data; } /**  * 获得列名  * @param resultSet  * @return  * @throws SQLException  */ private static List<String> getColumnLabels(ResultSet resultSet) throws SQLException { List<String> labels = new ArrayList<String>(); ResultSetMetaData rsmd = (ResultSetMetaData) resultSet.getMetaData(); for (int i = 0; i < rsmd.getColumnCount(); i++) { labels.add(rsmd.getColumnLabel(i + 1)); } return labels; } /**  * 获得数据库表的总数,方便进行分页  *  * @param tablename 表名  */ public static int count(String tablename) throws SQLException { int count = 0; Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE); ResultSet rs = stmt.executeQuery("select count(1) from "+tablename); while (rs.next()) { count = rs.getInt(1); } System.out.println("Total Size = " + count); rs.close(); stmt.close(); return count; } /**  * 执行查询,并持久化文件  *   * @param tablename 导出的表明  * @param page 分页的大小  * @param path 文件的路径  * @param index 索引的名称  * @param type 类型的名称  * @return  * @throws SQLException  */ public static void readDataByPage(String tablename,int page,String path,String index,String type) throws SQLException, IOException { int count = count(tablename); int i =0; for(i =0;i<count;){ List<Map> map = JDBCUtil.readTable(tablename,i,i+page); JDBCUtil.writeTable(map,index,type,path); i+=page; } } }

在main方法中传入必要的参数即可:

public class Main { public static void main(String[] args) { try { JDBCUtil.readDataByPage("TABLE_NAME",1000,"D://data.json","index","type"); } catch (SQLException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }

这样得到bulk的数据后,就可以运行脚本分批导入了。

下面脚本的思路,就是每100000行左右的数据导入到一个目标文件,使用bulk命令导入到es中。注意一个细节就是不能随意的切分文件,因为bulk的文件是两行为一条数据的。

#!/bin/bash count=0 rm target.json touch target.json while read line;do ((count++)) { echo $line >> target.json if [ $count -gt 100000 ] && [ $((count%2)) -eq 0 ];then count=0 curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null rm target.json touch target.json fi } done < $1 echo 'last submit' curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null

最后执行脚本:

sh auto_bulk.sh data.json

自己测试最后要比logstasj jdbc快5-6倍。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM