Logstash 是開源的服務器端數據處理管道,能夠同時從多個來源采集數據,轉換數據,然后將數據發送到您最喜歡的“存儲庫”中。個人認為這款插件是比較穩定,容易配置的使用Logstash之前,我們得明確自己的需求場景是什么,從哪種類型的數據源同步數據到哪種存儲庫。Logstash版本迭代較快,每個版本的插件都有點區別,比如6.3版本以后output到沒有jdbc的插件,然而你如果想使用output的jdbc插件就需要去安裝插件(logstash-output-jdbc),也就是說,如果你想用output的jdbc,你就必須使用6.3以下(最好5.x)的版本。這里以Logstash5.3.1版本為例。
1.下載安裝
Logstash不支持jdk1.10,建議使用1.8。Logstash版本要與Es版本保持一致。下載地址:https://www.elastic.co/cn/downloads/past-releases/logstash-5-3-1
上傳到目錄,例如:usr/local/src
解壓 tar -zxvf logstash-5.3.1.tar.gz
重命名 mv logstash-5.3.1 logstash
啟動驗證:
cd logstash
bin/logstash -e 'input{stdin{}} output{stdout{}}',輸出如下,代表啟動成功:
關閉
ps -ef | grep logstash
kill pid
2.配置目錄
配置文件目錄:logstash/config
新建配置文件test.conf
下載mysql-connector-java-5.1.30.jar,下載地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java/5.1.30
logstash目錄下創建一個jar目錄,用來存放jar文件
mkdir jar
在logstash目錄下創建一個sql目錄,用來存放查詢sql文件test.sql
mkdir sql
3.配置文件
Logstash同步數據方式有全量同步和增量同步兩種,不同方式配置文件有細微不同。第一次同步時需要全量的數據,之后則需要定時去同步增量數據,使用logstash需要了解一下事項:
1.凡是SQL可以實現的logstash均可以實現(本就是通過sql查詢數據)
2.支持每次全量同步或按照特定字段(如自增ID、更新時間)增量同步
3.同步頻率可控,最快同步頻率每分鍾一次(如果對實效性要求較高,慎用)
4.不支持被物理刪除的數據同步物理刪除ES中的數據(可在表設計中增加邏輯刪除字段標識數據刪除)
全量同步
test.conf文件內容如下:
input { jdbc { # mysql 數據庫鏈接,test為數據庫名 jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC" jdbc_user => "root" jdbc_password => "root" # 驅動路徑 jdbc_driver_library => "/usr/local/src/logstash/jar/mysql-connector-java-5.1.30-bin.jar" # 驅動類名 jdbc_driver_class => "com.mysql.jdbc.Driver" #是否分頁 jdbc_paging_enabled => "true" jdbc_page_size => "50000" #直接執行sql語句 #statement =>"select * from test" # 執行的sql 文件路徑+名稱 statement_filepath => "/usr/local/src/logstash/sql/test.sql" #設置監聽間隔 各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鍾都更新 schedule => "* * * * *" # 索引類型 #type => "jdbc" } } output { elasticsearch { #es的ip和端口 hosts => ["http://127.0.0.1:9200"] #ES索引名稱(自己定義的) index => "test_logstash" #文檔類型(自己定義的) document_type => "test" #設置es中數據的id為數據庫中的字段(一般設置為mysql中主鍵字段) document_id => "%{id}" } stdout { codec => json_lines } }
test.sql文件內容如下:
#sql查詢語句,mysql中怎樣寫,此處就怎樣寫
select * from table
指定配置文件啟動Logstash:
cd logstash
bin/logstash -f config/test.conf
會看到mysql中數據會在屏幕顯示,如果有報錯,一般為配置文件出錯。
后台啟動:nohup bin/logstash -f config/test.conf
增量同步
test.conf文件內容如下:
input { jdbc { # mysql 數據庫鏈接,test為數據庫名 jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC" jdbc_user => "root" jdbc_password => "root" # 驅動路徑 jdbc_driver_library => "/usr/local/src/logstash/jar/mysql-connector-java-5.1.30-bin.jar" # 驅動類名 jdbc_driver_class => "com.mysql.jdbc.Driver" #是否分頁 jdbc_paging_enabled => "true" jdbc_page_size => "50000" record_last_run => true use_column_value => true tracking_column => "id" last_run_metadata_path => "/usr/local/src/logstash/last_run_record" #直接執行sql語句 statement =>"statement =>"select * from test where id >:sql_last_value"" # 執行的sql 文件路徑+名稱 #statement_filepath => "/usr/local/src/logstash/sql/test.sql" #設置監聽間隔 各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鍾都更新 schedule => "* * * * *" # 索引類型 #type => "jdbc" } } output { elasticsearch { #es的ip和端口 hosts => ["http://127.0.0.1:9200"] #ES索引名稱(自己定義的) index => "test_logstash" #文檔類型(自己定義的) document_type => "test" #設置es中數據的id為數據庫中的字段(一般設置為mysql中主鍵字段) document_id => "%{id}" } stdout { codec => json_lines } }
紅色部分是跟全量同步有區別的地方:
record_last_run:記錄最后運行結果
use_column_value:記錄字段
tracking_column:記錄字段名,這里就是id
last_run_metadata_path:記錄數據保存位置
sql_last_value:下次在執行同步的時候會將這個值,賦給sql_last_value
啟動Logstash后,以后就是根據查詢條件增量同步數據了。
到此,使用Logstash同步Mysql數據到Es就算完成了,各位如果覺得還有點意義,煩請點一下推薦,加個關注,互相交流,如果安裝過程有任何問題或者發現錯誤,都可以留言交流,共同進步!