1、數據同步方式
全量同步與增量同步
全量同步是指全部將數據同步到es,通常是剛建立es,第一次同步時使用。增量同步是指將后續的更新、插入記錄同步到es。
2、常用的一些ES同步方法
1)、 elasticsearch-jdbc : 嚴格意義上它已經不是第三方插件。已經成為獨立的第三方工具。不支持5.5.1。。。
2)、elasticsearch-river-mysql插件: https://github.com/scharron/elasticsearch-river-mysql
3)、go-mysql-elasticsearch(國內作者siddontang) : https://github.com/siddontang/go-mysql-elasticsearch
4)、python-mysql-replication: github地址 https://github.com/noplay/python-mysql-replication
5)、MySQL Binlog: 通過 MySQL binlog 將 MySQL 的數據同步給 ES, 只能使用 row 模式的 binlog。
6)、Logstash-input-jdbc: github地址 https://github.com/logstash-plugins/logstash-input-jdbc
3、Logstash-input-jdbc安裝
由於我用的ES版本是5.5.1,elasticsearch-jdbc不支持,只支持2.3.4,這就尷尬了。

所用這里用Logstash-input-jdbc來同步數據,logstash-input-jdbc插件是logstash 的一個個插件,使用ruby語言開發。所以要先安裝ruby,也是為了好使用ruby中的gem安裝插件,下載地址: https://rubyinstaller.org/downloads/

下載下來之后,進行安裝


安裝好之后試下是否安裝成功,打開CMD輸入:

OK,然后修改gem的源,使用以下命令查看gem源
gem sources -l

刪除默認的源
gem sources --remove https://rubygems.org/

添加新的源
gem sources -a http://gems.ruby-china.org/ gem sources -l

更改成功,還的修改Gemfile的數據源地址。步驟如下:
gem install bundler bundle config mirror.https://rubygems.org https://gems.ruby-china.org

然后就是安裝logstash-input-jdbc,在logstash-5.5.1/bin目錄下

執行安裝命令
.\logstash-plugin.bat install logstash-input-jdbc
靜等一會兒,成功之后提示如下

4、Logstash-input-jdbc使用
官方文檔地址
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
首先在bin目錄下新建一個mysql目錄,里面包含jdbc.conf,jdbc.sql文件,加入mysql的驅動

jdbc.conf配置如下
input {
stdin {
}
jdbc {
# mysql 數據庫鏈接,test為數據庫名
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"
# 用戶名和密碼
jdbc_user => "root"
jdbc_password => "root"
# 驅動
jdbc_driver_library => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\mysql-connector-java-5.1.9.jar"
# 驅動類名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 執行的sql 文件路徑+名稱
statement_filepath => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\jdbc.sql"
# 設置監聽間隔 各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鍾都更新
schedule => "* * * * *"
# 索引類型
type => "jdbc"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["localhost:9200"]
# 索引名稱
index => "article"
# 自增ID 需要關聯的數據庫中有有一個id字段,對應索引的id號
document_id => "%{id}"
}
stdout {
# JSON格式輸出
codec => json_lines
}
}
各數據庫對應的鏈接如下:
Driver ="path/to/jdbc-drivers/mysql-connector-java-5.1.35-bin.jar" //驅動程序
Class ="com.mysql.jdbc.Driver";
URL ="jdbc:mysql://localhost:3306/db_name"; //連接的URL,db_name為數據庫名
Driver ="path/to/jdbc-drivers/sqljdbc4.jar"
Class ="com.microsoft.jdbc.sqlserver.SQLServerDriver";
URL ="jdbc:microsoft:sqlserver://localhost:1433;DatabaseName=db_name"; //db_name為數據庫名
Driver ="path/to/jdbc-drivers/ojdbc6-12.1.0.2.jar"
Class ="oracle.jdbc.driver.OracleDriver";
URL ="jdbc:oracle:thin:@loaclhost:1521:orcl"; //orcl為數據庫的SID
//連接具有DB2客戶端的Provider實例
Driver ="path/to/jdbc-drivers/jt400.jar"
Class ="com.ibm.db2.jdbc.app.DB2.Driver";
URL ="jdbc:db2://localhost:5000/db_name"; //db_name為數據可名
Driver ="path/to/jdbc-drivers/postgresql-9.4.1201.jdbc4.jar"
Class ="org.postgresql.Driver"; //連接數據庫的方法
URL ="jdbc:postgresql://localhost/db_name"; //db_name為數據可名
jdbc.sql配置如下:
select * from person
就一條查詢語句對應的表數據如下:

注意:這里的jdbc.sql和jdbc.conf文件編碼都必須是ANSI
先啟動ES,然后通過sense創建article索引
UT http://localhost:9200/article

然后通過以下命令啟動logstash
.\logstash.bat -f .\mysql\jdbc.conf

過一會他就會自動的往ES里添加數據,輸出的日志如下:

執行了SQL查詢。查看下article索引會發現多出來了很多文檔

我們在數據庫增加一條數據,看他是否自動同步到ES中

靜等一會,發現logstash的日志

查詢了一篇,ES中的數據會多出剛剛插入的那條

下面使用 增量 來新增數據,需要在jdbc.conf配置文件中做如下修改:
input {
stdin {
}
jdbc {
# mysql 數據庫鏈接,test為數據庫名
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"
# 用戶名和密碼
jdbc_user => "root"
jdbc_password => "root"
# 驅動
jdbc_driver_library => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\mysql-connector-java-5.1.9.jar"
# 驅動類名
jdbc_driver_class => "com.mysql.jdbc.Driver"
#處理中文亂碼問題
codec => plain { charset => "UTF-8"}
#使用其它字段追蹤,而不是用時間
use_column_value => true
#追蹤的字段
tracking_column => id
record_last_run => true
#上一個sql_last_value值的存放文件路徑, 必須要在文件中指定字段的初始值
last_run_metadata_path => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\station_parameter.txt"
#開啟分頁查詢
jdbc_paging_enabled => true
jdbc_page_size => 300
# 執行的sql 文件路徑+名稱
statement_filepath => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\jdbc.sql"
# 設置監聽間隔 各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鍾都更新
schedule => "* * * * *"
# 索引類型
type => "jdbc"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["localhost:9200"]
# 索引名稱
index => "article"
# 自增ID
document_id => "%{id}"
}
stdout {
# JSON格式輸出
codec => json_lines
}
}
參數介紹:
//是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中 record_last_run => true //是否需要記錄某個column 的值,如果 record_last_run 為真,可以自定義我們需要 track 的 column 名稱,此時該參數就要為 true. 否則默認 track 的是 timestamp 的值. use_column_value => true //如果 use_column_value 為真,需配置此參數. track 的數據庫 column 名,該 column 必須是遞增的.比如:ID. tracking_column => MY_ID //指定文件,來記錄上次執行到的 tracking_column 字段的值 //比如上次數據庫有 10000 條記錄,查詢完后該文件中就會有數字 10000 這樣的記錄,下次執行 SQL 查詢可以從 10001 條處開始. //我們只需要在 SQL 語句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是該文件中的值(10000). last_run_metadata_path => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\station_parameter.txt" //是否清除 last_run_metadata_path 的記錄,如果為真那么每次都相當於從頭開始查詢所有的數據庫記錄 clean_run => false //是否將 column 名稱轉小寫 lowercase_column_names => false //存放需要執行的 SQL 語句的文件位置 statement_filepath => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\jdbc.sql"
這里使用webmagic爬蟲來爬取數據,導入到數據庫中,先運行爬蟲,爬取一些數據

這里爬取到了277條,然后啟動logstash,通過logstash導入到ES中去

打開mysql目錄下的station_parameter.txt文件


這個文件里記錄上次執行到的 tracking_column 字段的值,比如上次數據庫有 10000 條記錄,查詢完后該文件中就會有數字 10000 這樣的記錄,下次執行 SQL 查詢可以從 10001 條處開始,我們只需要在 SQL 語句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是該文件中的值。
然后開啟爬蟲,爬取數據,往數據庫里插,logstash會自動的識別到更新,然后導入到ES中!!
