logstash是一個非常靈活好用的數據采集框架工具,可以通過簡單的配置滿足絕大多數數據采集場景的需求。
采集數據一個非常典型的場景就是將數據先放到kafka隊列里削峰,然后從kafka隊列里讀取數據到mysql或其他存儲系統中進行保存。
本文通過一個簡單的示例來演示從syslog采集日志到kafka然后在從kafka寫到mysql數據庫中。
默認已經安裝好了kafka、mysql、logstash,並已經經過簡單的驗證。
准備logstash的環境
一、下載mysql的jdbc驅動包
下載地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.15
下載后放到logstash的安裝目錄的/vendor/jar/目錄下
二、安裝logstash插件
logstash默認安裝了kafka插件,但是mysql插件沒有默認安裝需要自己安裝。
具體安裝方法 /bin/logstash-plugin install logstash-output-jdbc ,這里應為要用到logstash寫入mysql數據庫,所以安裝的插件是logstash-output-jdbc,如果要用到從mysql讀數據,那么就要安裝logstash-input-jdbc。安裝方法類似。
因為安裝時需要訪問國外的源,安裝進度很慢很慢,還經常安裝不成功,所以需要更改國內的源。
也就是給 Ruby 換成國內的鏡像站:https://gems.ruby-china.com/,替代https://rubygems.org。請注意:國內的鏡像站從https://gems.ruby-china.org 換成了 https://gems.ruby-china.com !!! 現在很多網上的資料就都是寫的https://gems.ruby-china.org,導致很多人換了鏡像源也裝不上。
具體方法如下:
1. 安裝Gem並更新
# yum install -y gem
# gem -v
2.0.14.1
# gem update --system
# gem -v
2.7.7
2. 檢查並修改鏡像源
# gem sources -l
*** CURRENT SOURCES ***
https://rubygems.org/
# gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/
https://gems.ruby-china.org/ added to sources
https://rubygems.org/ removed from sources
# cat ~/.gemrc
---
:backtrace: false
:bulk_threshold: 1000
:sources:
- https://gems.ruby-china.org/
:update_sources: true
:verbose: true
請注意:國內的鏡像站從https://gems.ruby-china.org 換成了 https://gems.ruby-china.com !!!現在很多網上的資料就都是寫的https://gems.ruby-china.org,導致很多人換了鏡像源也裝不上。
3. 修改 logstash的 gem 鏡像源
cd到logstach的安裝目錄,可以看到Gemfile文件
# vi Gemfile
# This is a Logstash generated Gemfile.
# If you modify this file manually all comments and formatting will be lost.
source "https://rubygems.org"
gem "logstash-core", :path => "./logstash-core"
......
更改默認的 https://rubygems.org 為https://gems.ruby-china.com
4. 安裝 logstash-output-jdbc
#/bin/logstash-plugin install logstash-output-jdbc
Validating logstash-output-jdbc
Installing logstash-output-jdbc
Installation successful
5.查看插件是否安裝成功
在logstash的bin目錄下執行./logstash-plugin list 可以查看已經安裝的插件,可以看到logstash-output-jdbc的插件已經裝好。
配置logstash
新建一個pipline.conf的配置文件
vi test-pipeline.conf
文件內容如下:
input {
stdin{ #用於測試標准控制台輸入的數據
type => "test-log"
}
syslog{ #用於接收來自syslog的日志
type => "test-log"
port => 514
}
kafka {
bootstrap_servers => "172.28.65.26:9092" #kafka服務器地址
topics => "test1" #kafka訂閱的topic主題
codec => "json" #寫入的時候使用json編碼,因為logstash收集后會轉換成json格式
consumer_threads => 1
decorate_events => true
add_field => {
"logsource" => "kafkalog"
}
}
}
output
{
if ([type]=="test-log" and "kafkalog" not in [logsource]) {
kafka {
codec => json
topic_id => "test1"
bootstrap_servers => "172.28.65.26:9092"
batch_size => 1
}
}
if ([type] == "test-log" and "kafkalog" in [logsource]) {
jdbc {
driver_jar_path => "/opt/elk/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-8.0.15.jar"
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://172.28.65.32:3306/testdb?user=yourdbuser&password=yourpassword"
statement => [ "INSERT INTO test_nginx_log (message) VALUES(?)", "message"]
}
}
stdout {
codec => rubydebug
}
}
這個邏輯就是從stdin或syslog接收數據output到kafka,然后從kafka中取出數據加入了一個logsource的字標識是從kafka過來的數據,然后又output到 jdbc寫到mysql中去。
如果沒有這幾個if的邏輯判斷,那么就會是個死循環。從kafka讀同樣的數據又寫到kafka中。如果在兩台機器上裝有logstash一台取數據放到kafka,一台從kafka中取數據放到mysql中就可以不用加這樣的判斷邏輯會單純簡單一些。
執行logstash並查看效果
通過在logstash安裝目錄下執行 bin/logstash -f test-pipeline.conf --config.test_and_exit 檢查配置文件是否有問題,沒有問題以后執行bin/logstash -f test-pipeline.conf --config.reload.automatic 運行logstash。
在控制台輸入
this is a test!
效果:
從控制台輸入信息,可以看到從stdin輸入output到stdout的沒有logsource標識,input從kafka訂閱過來的信息加了一個logsource=>kafkalog的標識。
用kafka tool工具看到kafka收到了從stdin發過來的信息。
在看MySQL表里的數據,已經通過logstash從kafka中將數據采集到了MySQL的表中。
再來看從syslog采集日志的效果
從控制台看到的信息效果
從kafka tool看到的效果
從mysql 表中看到的效果。
可以看到,logstash是一個非常靈活好用的數據采集框架工具,可以通過簡單的配置就能滿足絕大多數數據采集場景的需求。