logstash集成kafka,mysql實現數據采集


logstash是一個非常靈活好用的數據采集框架工具,可以通過簡單的配置滿足絕大多數數據采集場景的需求。
采集數據一個非常典型的場景就是將數據先放到kafka隊列里削峰,然后從kafka隊列里讀取數據到mysql或其他存儲系統中進行保存。
從syslog采集日志到kafka然后在從kafka寫到mysql數據庫中
本文通過一個簡單的示例來演示從syslog采集日志到kafka然后在從kafka寫到mysql數據庫中。
默認已經安裝好了kafka、mysql、logstash,並已經經過簡單的驗證。

准備logstash的環境

一、下載mysql的jdbc驅動包

下載地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.15
下載后放到logstash的安裝目錄的/vendor/jar/目錄下

二、安裝logstash插件

logstash默認安裝了kafka插件,但是mysql插件沒有默認安裝需要自己安裝。
具體安裝方法 /bin/logstash-plugin install logstash-output-jdbc ,這里應為要用到logstash寫入mysql數據庫,所以安裝的插件是logstash-output-jdbc,如果要用到從mysql讀數據,那么就要安裝logstash-input-jdbc。安裝方法類似。
因為安裝時需要訪問國外的源,安裝進度很慢很慢,還經常安裝不成功,所以需要更改國內的源。
也就是給 Ruby 換成國內的鏡像站:https://gems.ruby-china.com/,替代https://rubygems.org。請注意:國內的鏡像站從https://gems.ruby-china.org 換成了 https://gems.ruby-china.com !!! 現在很多網上的資料就都是寫的https://gems.ruby-china.org,導致很多人換了鏡像源也裝不上。
具體方法如下:

1. 安裝Gem並更新

# yum install -y gem
# gem -v
2.0.14.1
# gem update --system
# gem -v
2.7.7

2. 檢查並修改鏡像源

# gem sources -l
*** CURRENT SOURCES ***
 
https://rubygems.org/

# gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/
https://gems.ruby-china.org/ added to sources
https://rubygems.org/ removed from sources

# cat ~/.gemrc 
---
:backtrace: false
:bulk_threshold: 1000
:sources:
- https://gems.ruby-china.org/
:update_sources: true
:verbose: true

請注意:國內的鏡像站從https://gems.ruby-china.org 換成了 https://gems.ruby-china.com !!!現在很多網上的資料就都是寫的https://gems.ruby-china.org,導致很多人換了鏡像源也裝不上。

3. 修改 logstash的 gem 鏡像源

cd到logstach的安裝目錄,可以看到Gemfile文件

# vi Gemfile
# This is a Logstash generated Gemfile.
# If you modify this file manually all comments and formatting will be lost.
 
source "https://rubygems.org"
gem "logstash-core", :path => "./logstash-core"
......

更改默認的 https://rubygems.org 為https://gems.ruby-china.com
更換國內鏡像源地址

4. 安裝 logstash-output-jdbc

#/bin/logstash-plugin install logstash-output-jdbc
Validating logstash-output-jdbc
Installing logstash-output-jdbc
Installation successful

5.查看插件是否安裝成功

在logstash的bin目錄下執行./logstash-plugin list 可以查看已經安裝的插件,可以看到logstash-output-jdbc的插件已經裝好。
檢查插件安裝

配置logstash

新建一個pipline.conf的配置文件

vi test-pipeline.conf

文件內容如下:

input {
    stdin{            #用於測試標准控制台輸入的數據
      type => "test-log"
    }
    syslog{           #用於接收來自syslog的日志
        type => "test-log"
        port => 514
    }
    kafka {
       bootstrap_servers => "172.28.65.26:9092" #kafka服務器地址
       topics => "test1"           #kafka訂閱的topic主題
       codec => "json" #寫入的時候使用json編碼,因為logstash收集后會轉換成json格式
       consumer_threads => 1
       decorate_events => true
       add_field => {
             "logsource" => "kafkalog"
       }
    }
}
output
{
    if ([type]=="test-log" and "kafkalog" not in [logsource]) {
       kafka {
            codec => json
            topic_id => "test1"
            bootstrap_servers => "172.28.65.26:9092"
            batch_size => 1
        }
    }
    if ([type] == "test-log" and "kafkalog" in [logsource]) {
        jdbc {
            driver_jar_path => "/opt/elk/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-8.0.15.jar"
            driver_class => "com.mysql.jdbc.Driver"
            connection_string => "jdbc:mysql://172.28.65.32:3306/testdb?user=yourdbuser&password=yourpassword"
            statement => [ "INSERT INTO test_nginx_log (message) VALUES(?)", "message"]
        }
    }
    stdout {
       codec => rubydebug
    }
}

這個邏輯就是從stdin或syslog接收數據output到kafka,然后從kafka中取出數據加入了一個logsource的字標識是從kafka過來的數據,然后又output到 jdbc寫到mysql中去。
如果沒有這幾個if的邏輯判斷,那么就會是個死循環。從kafka讀同樣的數據又寫到kafka中。如果在兩台機器上裝有logstash一台取數據放到kafka,一台從kafka中取數據放到mysql中就可以不用加這樣的判斷邏輯會單純簡單一些。

執行logstash並查看效果

通過在logstash安裝目錄下執行 bin/logstash -f test-pipeline.conf --config.test_and_exit 檢查配置文件是否有問題,沒有問題以后執行bin/logstash -f test-pipeline.conf --config.reload.automatic 運行logstash。
在控制台輸入

 this is a test!

效果:
從控制台輸入信息,可以看到從stdin輸入output到stdout的沒有logsource標識,input從kafka訂閱過來的信息加了一個logsource=>kafkalog的標識。
logsource=>kafkalog的標識
用kafka tool工具看到kafka收到了從stdin發過來的信息。
用kafka tool工具看到kafka收到了從stdin發過來的信息
在看MySQL表里的數據,已經通過logstash從kafka中將數據采集到了MySQL的表中。
MySQL的表的信息數據
再來看從syslog采集日志的效果
從控制台看到的信息效果
控制台看到的信息效果
從kafka tool看到的效果
kafka tool看到的效果
從mysql 表中看到的效果。
mysql 表中看到的效果
可以看到,logstash是一個非常靈活好用的數據采集框架工具,可以通過簡單的配置就能滿足絕大多數數據采集場景的需求。


作者博客:http://xiejava.gitee.io


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM