先說說需求的背景,由於業務數據都在Oracle數據庫中,想要對它進行數據的分析會非常非常慢,用傳統的數據倉庫-->數據集市這種方式,集市層表會非常大,查詢的時候如果再做一些group的操作,一個訪問需要一分鍾甚至更久才能響應。
為了解決這個問題,就想把業務庫的數據遷移到Elasticsearch中,然后針對es再去做聚合查詢。
問題來了,數據庫中的數據量很大,如何導入到ES中呢?
Logstash JDBC
Logstash提供了一款JDBC的插件,可以在里面寫sql語句,自動查詢然后導入到ES中。這種方式比較簡單,需要注意的就是需要用戶自己下載jdbc的驅動jar包。
input {
jdbc {
jdbc_driver_library => "ojdbc14-10.2.0.3.0.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@localhost:1521:test"
jdbc_user => "test"
jdbc_password => "test123"
schedule => "* * * * *"
statement => "select * from TARGET_TABLE"
add_field => ["type","a"]
}
}
output{
elasticsearch {
hosts =>["10.10.1.205:9200"]
index => "product"
document_type => "%{type}"
}
}
不過,它的性能實在是太差了!我導了一天,才導了兩百多萬的數據。
因此,就考慮自己來導。
自己的數據交換工具
思路:
- 1 采用JDBC的方式,通過分頁讀取數據庫的全部數據。
- 2 數據庫讀取的數據存儲成bulk形式的數據,關於bulk需要的文件格式,可以參考這里
- 3 利用bulk命令分批導入到es中
最后使用發現,自己寫的導入程序,比Logstash jdbc快5-6倍~~~~~~ 嗨皮!!!!
遇到的問題
- 1 JDBC需要采用分頁的方式讀取全量數據
- 2 要模仿bulk文件進行存儲
- 3 由於bulk文件過大,導致curl內存溢出
程序開源
下面的代碼需要注意的就是
public class JDBCUtil {
private static Connection conn = null;
private static PreparedStatement sta=null;
static{
try {
Class.forName("oracle.jdbc.driver.OracleDriver");
conn = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:test", "test", "test123");
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
System.out.println("Database connection established");
}
/**
* 把查到的數據格式化寫入到文件
*
* @param list 需要存儲的數據
* @param index 索引的名稱
* @param type 類型的名稱
* @param path 文件存儲的路徑
**/
public static void writeTable(List<Map> list,String index,String type,String path) throws SQLException, IOException {
System.out.println("開始寫文件");
File file = new File(path);
int count = 0;
int size = list.size();
for(Map map : list){
FileUtils.write(file, "{ \"index\" : { \"_index\" : \""+index+"\", \"_type\" : \""+type+"\" } }\n","UTF-8",true);
FileUtils.write(file, JSON.toJSONString(map)+"\n","UTF-8",true);
// System.out.println("寫入了" + ((count++)+1) + "[" + size + "]");
}
System.out.println("寫入完成");
}
/**
* 讀取數據
* @param sql
* @return
* @throws SQLException
*/
public static List<Map> readTable(String tablename,int start,int end) throws SQLException {
System.out.println("開始讀數據庫");
//執行查詢
sta = conn.prepareStatement("select * from(select rownum as rn,t.* from "+tablename+" t )where rn >="+start+" and rn <"+end);
ResultSet rs = sta.executeQuery();
//獲取數據列表
List<Map> data = new ArrayList();
List<String> columnLabels = getColumnLabels(rs);
Map<String, Object> map = null;
while(rs.next()){
map = new HashMap<String, Object>();
for (String columnLabel : columnLabels) {
Object value = rs.getObject(columnLabel);
map.put(columnLabel.toLowerCase(), value);
}
data.add(map);
}
sta.close();
System.out.println("數據讀取完畢");
return data;
}
/**
* 獲得列名
* @param resultSet
* @return
* @throws SQLException
*/
private static List<String> getColumnLabels(ResultSet resultSet)
throws SQLException {
List<String> labels = new ArrayList<String>();
ResultSetMetaData rsmd = (ResultSetMetaData) resultSet.getMetaData();
for (int i = 0; i < rsmd.getColumnCount(); i++) {
labels.add(rsmd.getColumnLabel(i + 1));
}
return labels;
}
/**
* 獲得數據庫表的總數,方便進行分頁
*
* @param tablename 表名
*/
public static int count(String tablename) throws SQLException {
int count = 0;
Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery("select count(1) from "+tablename);
while (rs.next()) {
count = rs.getInt(1);
}
System.out.println("Total Size = " + count);
rs.close();
stmt.close();
return count;
}
/**
* 執行查詢,並持久化文件
*
* @param tablename 導出的表明
* @param page 分頁的大小
* @param path 文件的路徑
* @param index 索引的名稱
* @param type 類型的名稱
* @return
* @throws SQLException
*/
public static void readDataByPage(String tablename,int page,String path,String index,String type) throws SQLException, IOException {
int count = count(tablename);
int i =0;
for(i =0;i<count;){
List<Map> map = JDBCUtil.readTable(tablename,i,i+page);
JDBCUtil.writeTable(map,index,type,path);
i+=page;
}
}
}
在main方法中傳入必要的參數即可:
public class Main {
public static void main(String[] args) {
try {
JDBCUtil.readDataByPage("TABLE_NAME",1000,"D://data.json","index","type");
} catch (SQLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
這樣得到bulk的數據后,就可以運行腳本分批導入了。
下面腳本的思路,就是每100000行左右的數據導入到一個目標文件,使用bulk命令導入到es中。注意一個細節就是不能隨意的切分文件,因為bulk的文件是兩行為一條數據的。
#!/bin/bash
count=0
rm target.json
touch target.json
while read line;do
((count++))
{
echo $line >> target.json
if [ $count -gt 100000 ] && [ $((count%2)) -eq 0 ];then
count=0
curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null
rm target.json
touch target.json
fi
}
done < $1
echo 'last submit'
curl -XPOST localhost:9200/_bulk --data-binary @target.json > /dev/null
最后執行腳本:
sh auto_bulk.sh data.json
自己測試最后要比logstasj jdbc快5-6倍。