Logstash:把MySQL數據導入到Elasticsearch中
前提條件
需要安裝好Elasticsearch及Kibana。
MySQL安裝
根據不同的操作系統我們分別對MySQL進行安裝。我們可以訪問網頁來對MySQL進行安裝。等我們安裝完我們的MySQL后,在我們的terminal中,打入如下的命令來檢查MySQL的版本:
$ /usr/local/mysql/bin/mysql -V
/usr/local/mysql/bin/mysql Ver 8.0.17 for macos10.14 on x86_64 (MySQL Community Server - GPL)
Logstash安裝
在上一步中,已經知道了mysql的版本信息。需要下載相應的JDBC connector。在地址https://dev.mysql.com/downloads/connector/j/
下載最新的Connector。下載完這個Connector后,把這個connector存入到Logstash安裝目錄下的如下子目錄中。
$ ls logstash-core/lib/jars/mysql-connector-java-8.0.17.jar
logstash-core/lib/jars/mysql-connector-java-8.0.17.jar
這樣我們的安裝就完成了。
准備練習數據
采用把一個CSV文件導入到MySQL中的辦法來形成一個MySQL的數據庫。CSV文件下載地址:
https://github.com/liu-xiao-guo/sample_csv
在上面的sample_csv中,有一個SalesJan2009.csv文件。通過MySQL的前端工具把這個導入到MySQL數據庫中。
這樣MySQL的數據庫data里含有一個叫做SalesJan2009的數據就建立好了。
Logstash 配置
對Logstash做如下的配置sales.conf:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/data"
jdbc_user => "root"
jdbc_password => "YourMyQLPassword"
jdbc_validate_connection => true
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
parameters => { "Product_id" => "Product1" }
statement => "SELECT * FROM SalesJan2009 WHERE Product = :Product_id"
}
}
filter {
mutate {
rename => {
"longitude" => "[location][lon]"
"latitude" => "[location][lat]"
}
}
}
output {
stdout {
}
elasticsearch {
index => "sales"
hosts => "localhost:9200"
document_type => "_doc"
}
}
在這里,必須替換jdbc_user和jdbc_password為自己的MySQL賬號的用戶名及密碼。特別值得指出的是jdbc_driver_library按elastic的文檔是可以放入JDBC驅動的路徑及驅動名稱。實踐證明如果這個驅動不在JAVA的classpath里,也是不能被正確地加載。正因為這樣的原因,在上一步里把驅動mysql-connector-java-8.0.17.jar放入到Logstash的jar目錄里,所以這里就直接填入空字符串。
運行Logstash加載數據
接下來我們運行Logstash來加載我們的MySQL里的數據到Elasticsearch中:
./bin/logstash --debug -f ~/data/sales.conf
在這里把sales.conf置於用戶home目錄下的data子目錄中。
我們可以在Kibana中查看到最新的導入到Elasticsearch中的數據
這里顯示在sales索引中有847個文檔。一旦數據進入到我們的Elastic,我們可以對數據進行分析: