使用logstash同步MySQL數據到ES
概述:
在生成業務常有將MySQL數據同步到ES的需求,如果需要很高的定制化,往往需要開發同步程序用於處理數據。但沒有特殊業務需求,官方提供的logstash就很有優勢了。
在使用logstash我們贏先了解其特性,再決定是否使用:
- 無需開發,僅需安裝配置logstash即可;
- 凡是SQL可以實現的logstash均可以實現(本就是通過sql查詢數據)
- 支持每次全量同步或按照特定字段(如遞增ID、修改時間)增量同步;
- 同步頻率可控,最快同步頻率每分鍾一次(如果對實效性要求較高,慎用);
- 不支持被物理刪除的數據同步物理刪除ES中的數據(可在表設計中增加邏輯刪除字段IsDelete標識數據刪除)。
1、安裝
前往官網下載logstash,下載地址https://www.elastic.co/downloads/logstash,zip壓縮包大約160M;
程序目錄:【windows】G:\ELK\logstash-6.5.4;【linux】/tomcat/logstash/logstash-6.5.4。
下文統一以【程序目錄】表示不同環境的安裝目錄。
2、配置
2.1、新建目錄存放配置文件及mysql依賴包
在【程序目錄】目錄(\bin同級)新建mysql目錄,將下載好的mysql-connector-java-5.1.34.jar放入此目錄;
在【程序目錄】\mysql目錄新建jdbc.conf文件,此文件將配置數據庫連接信息、查詢數據sql、分頁信息、同步頻率等核心信息。
注意事項請查看注釋信息。
2.2、單表同步配置
-
input {
-
stdin {}
-
jdbc {
-
type => "jdbc"
-
# 數據庫連接地址
-
jdbc_connection_string => "jdbc:mysql://192.168.1.1:3306/TestDB?characterEncoding=UTF-8&autoReconnect=true""
-
# 數據庫連接賬號密碼;
-
jdbc_user => "username"
-
jdbc_password => "pwd"
-
# MySQL依賴包路徑;
-
jdbc_driver_library => "mysql/mysql-connector-java-5.1.34.jar"
-
# the name of the driver class for mysql
-
jdbc_driver_class => "com.mysql.jdbc.Driver"
-
# 數據庫重連嘗試次數
-
connection_retry_attempts => "3"
-
# 判斷數據庫連接是否可用,默認false不開啟
-
jdbc_validate_connection => "true"
-
# 數據庫連接可用校驗超時時間,默認3600S
-
jdbc_validation_timeout => "3600"
-
# 開啟分頁查詢(默認false不開啟);
-
jdbc_paging_enabled => "true"
-
# 單次分頁查詢條數(默認100000,若字段較多且更新頻率較高,建議調低此值);
-
jdbc_page_size => "500"
-
# statement為查詢數據sql,如果sql較復雜,建議配通過statement_filepath配置sql文件的存放路徑;
-
# sql_last_value為內置的變量,存放上次查詢結果中最后一條數據tracking_column的值,此處即為ModifyTime;
-
# statement_filepath => "mysql/jdbc.sql"
-
statement => " SELECT KeyId,TradeTime,OrderUserName,ModifyTime FROM `DetailTab` WHERE ModifyTime>= :sql_last_value order by ModifyTime asc"
-
# 是否將字段名轉換為小寫,默認true(如果有數據序列化、反序列化需求,建議改為false);
-
lowercase_column_names => false
-
# Value can be any of: fatal,error,warn,info,debug,默認info;
-
sql_log_level => warn
-
#
-
# 是否記錄上次執行結果,true表示會將上次執行結果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
-
record_last_run => true
-
# 需要記錄查詢結果某字段的值時,此字段為true,否則默認tracking_column為timestamp的值;
-
use_column_value => true
-
# 需要記錄的字段,用於增量同步,需是數據庫字段
-
tracking_column => "ModifyTime"
-
# Value can be any of: numeric,timestamp,Default value is "numeric"
-
tracking_column_type => timestamp
-
# record_last_run上次數據存放位置;
-
last_run_metadata_path => "mysql/last_id.txt"
-
# 是否清除last_run_metadata_path的記錄,需要增量同步時此字段必須為false;
-
clean_run => false
-
#
-
# 同步頻率(分 時 天 月 年),默認每分鍾同步一次;如果是每10分鍾執行一下 */10 即可 https://www.cnblogs.com/superman66/p/4565723.html
-
schedule => "* * * * *"
-
}
-
}
-
-
filter {
-
json {
-
source => "message"
-
remove_field => ["message"]
-
}
-
# convert 字段類型轉換,將字段TotalMoney數據類型改為float;
-
mutate {
-
convert => {
-
"TotalMoney" => "float"
-
}
-
}
-
}
-
output {
-
elasticsearch {
-
# host => "192.168.1.1"
-
# port => "9200"
-
# 配置ES集群地址
-
hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
-
# 索引名字,必須小寫
-
index => "consumption"
-
# 數據唯一索引(建議使用數據庫KeyID)
-
document_id => "%{KeyId}"
-
}
-
stdout {
-
codec => json_lines
-
}
-
}
2.3、多表同步
多表配置和單表配置的區別在於input模塊的jdbc模塊有幾個type,output模塊就需對應有幾個type;
-
input {
-
stdin {}
-
jdbc {
-
# 多表同步時,表類型區分,建議命名為“庫名_表名”,每個jdbc模塊需對應一個type;
-
type => "TestDB_DetailTab"
-
-
# 其他配置此處省略,參考單表配置
-
# ...
-
# ...
-
# record_last_run上次數據存放位置;
-
last_run_metadata_path => "mysql\last_id.txt"
-
# 是否清除last_run_metadata_path的記錄,需要增量同步時此字段必須為false;
-
clean_run => false
-
#
-
# 同步頻率(分 時 天 月 年),默認每分鍾同步一次;
-
schedule => "* * * * *"
-
}
-
jdbc {
-
# 多表同步時,表類型區分,建議命名為“庫名_表名”,每個jdbc模塊需對應一個type;
-
type => "TestDB_Tab2"
-
# 多表同步時,last_run_metadata_path配置的路徑應不一致,避免有影響;
-
# 其他配置此處省略
-
# ...
-
# ...
-
}
-
}
-
-
filter {
-
json {
-
source => "message"
-
remove_field => [ "message"]
-
}
-
}
-
-
output {
-
# output模塊的type需和jdbc模塊的type一致
-
if [type] == "TestDB_DetailTab" {
-
elasticsearch {
-
# host => "192.168.1.1"
-
# port => "9200"
-
# 配置ES集群地址
-
hosts => [ "192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
-
# 索引名字,必須小寫
-
index => "detailtab1"
-
# 數據唯一索引(建議使用數據庫KeyID)
-
document_id => "%{KeyId}"
-
}
-
}
-
if [type] == "TestDB_Tab2" {
-
elasticsearch {
-
# host => "192.168.1.1"
-
# port => "9200"
-
# 配置ES集群地址
-
hosts => [ "192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
-
# 索引名字,必須小寫
-
index => "detailtab2"
-
# 數據唯一索引(建議使用數據庫KeyID)
-
document_id => "%{KeyId}"
-
}
-
}
-
stdout {
-
codec => json_lines
-
}
-
}
3、啟動運行
在【程序目錄】目錄執行以下命令啟動:
-
【windows】bin\logstash.bat -f mysql\jdbc.conf
-
【linux】nohup ./bin/logstash -f mysql/jdbc_jx_moretable.conf &
可新建腳本配置好啟動命令,后期直接運行即可。
在【程序目錄】\logs目錄會有運行日志。
Note:
6.X版本需要jdk8支持,如果默認jdk版本不是jdk8,那么需要在logstash或logstash.lib.sh的行首位置添加兩個環境變量:
-
export JAVA_CMD="/usr/tools/jdk1.8.0_162/bin"
-
export JAVA_HOME="/usr/tools/jdk1.8.0_162/"
開機自啟動:
windows開機自啟:
- 方案1:使用windows自帶的任務計划;
- 方案2:nssm注冊windows服務,https://blog.csdn.net/u010887744/article/details/53957713
linux開機自啟:
- CentOS 7將linux服務加入系統啟動 systemd service,https://blog.csdn.net/u010887744/article/details/53957647
4、問題及解決方案
4.1、數據同步后,ES沒有數據
output.elasticsearch模塊的index必須是全小寫;
4.2、增量同步后last_run_metadata_path文件內容不改變
如果lowercase_column_names配置的不是false,那么tracking_column字段配置的必須是全小寫。
4.3、提示找不到jdbc_driver_library
2032 com.mysql.jdbc.Driver not loaded. Are you sure you've included the correct jdbc driver in :jdbc_driver_library?
檢測配置的地址是否正確,如果是linux環境,注意路徑分隔符是“/”,而不是“\”。
4.4、數據丟失
statement配置的sql中,如果比較字段使用的是大於“>”,可能存在數據丟失。
假設當同步完成后last_run_metadata_path存放的時間為2019-01-30 20:45:30,而這時候新入庫一條數據的更新時間也為2019-01-30 20:45:30,那么這條數據將無法同步。
解決方案:將比較字段使用 大於等於“>=”。
4.5、數據重復更新
上一個問題“數據丟失”提供的解決方案是比較字段使用“大於等於”,但這時又會產生新的問題。
假設當同步完成后last_run_metadata_path存放的時間為2019-01-30 20:45:30,而數據庫中更新時間最大值也為2019-01-30 20:45:30,那么這些數據將重復更新,直到有更新時間更大的數據出現。
當上述特殊數據很多,且長期沒有新的數據更新時,會導致大量的數據重復同步到ES。
何時會出現以上情況呢:①比較字段非“自增”;②比較字段是程序生成插入。
解決方案:
①比較字段自增保證不重復或重復概率極小(比如使用自增ID或者數據庫的timestamp),這樣就能避免大部分異常情況了;
②如果確實存在大量程序插入的數據,其更新時間相同,且可能長期無數據更新,可考慮定期更新數據庫中的一條測試數據,避免最大值有大量數據。
4.6、容災
logstash本身無法集群,我們常使用的組合ELK是通過kafka集群變相實現集群的。
可供選擇的處理方式:①使用任務程序推送數據到kafaka,由kafka同步數據到ES,但任務程序本身也需要容災,並需要考慮重復推送的問題;②將logstash加入守護程序,並輔以第三方監控其運行狀態。具體如何選擇,需要結合自身的應用場景了。
4.7、海量數據同步
為什么會慢?logstash分頁查詢使用臨時表分頁,每條分頁SQL都是將全集查詢出來當作臨時表,再在臨時表上分頁查詢。這樣導致每次分頁查詢都要對主表進行一次全表掃描。
SELECT * FROM (SELECT * FROM `ImageCN1` WHERE ModifyTime>= '1970-01-01 08:00:00' order by ModifyTime asc) AS `t1` LIMIT 5000 OFFSET 10000000;
數據量太大,首次同步如何安全過渡同步?
可考慮在statement對應的sql中加上分頁條件,比如ID在什么范圍,修改時間在什么區間,將單詞同步的數據總量減少。先少量數據同步測試驗證,再根據測試情況修改區間條件啟動logstash完成同步。比如將SQL修改為:
SELECT * FROM `ImageCN1` WHERE ModifyTime<'2018-10-10 10:10:10' AND ModifyTime>= '1970-01-01 08:00:00' order by ModifyTime asc
這樣需要每次同步后就修改sql,線上運營比較繁瑣,是否可以不修改sql,同時保證同步效率呢?SQL我們可以再修改下:
SELECT * FROM `ImageCN1` WHERE ModifyTime>= '1970-01-01 08:00:00' order by ModifyTime asc limit 100000
這樣就能保證每次子查詢的數據量不超過10W條,實際測試發現,數據量很大時效果很明顯。
-
[SQL] USE XXXDataDB;
-
受影響的行: 0
-
時間: 0.001s
-
-
[SQL]
-
SELECT * FROM (SELECT * FROM `ImageCN1` WHERE ModifyTime>= '1970-01-01 08:00:00' order by ModifyTime asc ) AS `t1` LIMIT 5000 OFFSET 900000;
-
受影響的行: 0
-
時間: 7.229s
-
-
[SQL]
-
-
SELECT * FROM (SELECT * FROM `ImageCN1` WHERE ModifyTime>= '2018-07-18 19:35:10' order by ModifyTime asc limit 100000) AS `t1` LIMIT 5000 OFFSET 90000
-
-
受影響的行: 0
-
時間: 1.778s
測試可以看出,SQL不加limit 10W時,越往后分頁查詢越慢,耗時達到8S,而加了limit條件的SQL耗時穩定在2S以內。
-
歡迎個人轉載,但須在文章頁面明顯位置給出原文連接;
-
未經作者同意必須保留此段聲明、不得隨意修改原文、不得用於商業用途,否則保留追究法律責任的權利。
-
-
【 CSDN 】:csdn.zxiaofan.com
-
【GitHub】:github.zxiaofan.com
-
-
如有任何問題,歡迎留言。祝君好運!
-
Life is all about choices!
-
將來的你一定會感激現在拼命的自己!