大多數情況下我們的數據都存放在了數據庫中,但是elasticsearch它有自己的索引庫,那么如果我們在做搜索的是時候就需要將數據庫中的數據同步到elasticsearch中,在這里我們使用logstash的logstash-jdbc-input的插件進行與數據庫的同步,對於logstash與數據庫同步,我們可以設置elasticsearch與數據庫同步的時間,使用這種方式進行同步還是很方便的。
1、下載並安裝logstash
注意下載的版本要和你的elasticsearch的版本號一致,我的版本elasticsearch6.2.2
logstash下載地址:https://www.elastic.co/downloads/logstash
下載后之后,直接解壓就好
(elasticsearch的環境搭建可參考http://www.cnblogs.com/xuwenjin/p/8745624.html)
2、配置logstash
對於logstash5.x以上版本,它自身已經集成了這個插件,不需要我們去單獨安裝,直接使用即可。我這里說一下與mysql進行同步的簡單配置
在logstash文件目錄下,新建一個文件夾(命名隨意)。如:mysql
2.1 先把一個jdbc驅動放到這個文件夾下,用來連接mysql數據庫
使用maven項目的,可在pom文件中,加入以下依賴,然后將jar包拷貝出來就行
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.40</version> </dependency>
2.2 創建一個 .conf 的配置文件(命名隨意),用來將es與數據庫關聯。如mysql.conf
input { jdbc { # mysql 數據庫鏈接,shop為數據庫名 jdbc_connection_string => "jdbc:mysql://dev.yonyouccs.com:3001/test" # 用戶名和密碼 jdbc_user => "root" jdbc_password => "root" # 驅動 jdbc_driver_library => "D:/software/logstash-6.2.2/logstash-6.2.2/mysqletc/mysql-connector-java-5.1.40.jar" # 驅動類名 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 執行的sql 文件路徑+名稱 statement_filepath => "D:/software/logstash-6.2.2/logstash-6.2.2/mysql/jdbc.sql" # 設置監聽間隔 各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鍾都更新 schedule => "* * * * *" } } filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { # ES的IP地址及端口 hosts => ["localhost:9200"] # 索引名稱 index => "synces" # 需要關聯的數據庫中有有一個id字段,對應類型中的id document_id => "%{id}" # 索引名稱 document_type => "xwjUser" } stdout { # JSON格式輸出 codec => json_lines } }
2.3 創建一個sql文件,我這里命名為jdbc.sql和上邊的配置文件一致
SELECT id, last_name lastName, age, email FROM xwj_user
注意:sql不能有結束符,不然運行的時候會報錯(至於原因,后面會講到)
3、啟動logstash
在logstash的bin目錄下,使用cmd執行命令:logstash -f ../mysql/mysql.conf
會看到如下啟動信息:
可以看到在同步的過程中,執行了我們的腳本,並且是將其包起來的,所以sql腳本不能有結束符。
還會將同步的數據以json字符串的方式打印出來(默認將字段名稱全部轉為小寫了)
4、通過elasticsearch-head查看數據
可以看到數據已經全部同步到elasticsearch了。不過在該索引下,默認增加@version、@timestamp兩個字段
5、踩過的坑
1、logstash的配置文件一定得是UTF-8,不能是GBK,不然會報錯
2、logstash啟動報無法找到主類解決方案,可參考https://www.cnblogs.com/sbj-dawn/p/8549369.html
6、其它
1、新增或更新數據庫數據,我們會發現logstash會根據設定的時間(這里我設置的每分鍾,是最小頻率)自動將最新數據同步到es。
2、目前logstash只支持數據增量,表的增量,即不能同步已物理刪除的數據和表。這個問題的解決方案可參考https://blog.csdn.net/laoyang360/article/details/51747266
關於logstash的更多用法,可參考elastic的官網: