Kafka與Logstash的數據采集對接 —— 看圖說話,從運行機制到部署


基於Logstash跑通Kafka還是需要注意很多東西,最重要的就是理解Kafka的原理。

Logstash工作原理

由於Kafka采用解耦的設計思想,並非原始的發布訂閱,生產者負責產生消息,直接推送給消費者。而是在中間加入持久化層——broker,生產者把數據存放在broker中,消費者從broker中取數據。這樣就帶來了幾個好處:

  • 1 生產者的負載與消費者的負載解耦
  • 2 消費者按照自己的能力fetch數據
  • 3 消費者可以自定義消費的數量

另外,由於broker采用了主題topic-->分區的思想,使得某個分區內部的順序可以保證有序性,但是分區間的數據不保證有序性。這樣,消費者可以以分區為單位,自定義讀取的位置——offset。

Kafka采用zookeeper作為管理,記錄了producer到broker的信息,以及consumer與broker中partition的對應關系。因此,生產者可以直接把數據傳遞給broker,broker通過zookeeper進行leader-->followers的選舉管理;消費者通過zookeeper保存讀取的位置offset以及讀取的topic的partition分區信息。

由於上面的架構設計,使得生產者與broker相連;消費者與zookeeper相連。有了這樣的對應關系,就容易部署logstash-->kafka-->logstash的方案了。

接下來,按照下面的步驟就可以實現logstash與kafka的對接了。

啟動kafka

啟動zookeeper:

$zookeeper/bin/zkServer.sh start

啟動kafka:

$kafka/bin/kafka-server-start.sh $kafka/config/server.properties &

創建主題

創建主題:

$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic hello --replication-factor 1 --partitions 1

查看主題:

$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe

測試環境

執行生產者腳本:

$kafka/bin/kafka-console-producer.sh --broker-list 10.0.67.101:9092 --topic hello

執行消費者腳本,查看是否寫入:

$kafka/bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --from-beginning --topic hello

輸入測試

input{
	stdin{}
}
output{
	kafka{
		topic_id => "hello"
		bootstrap_servers => "192.168.0.4:9092" # kafka的地址
		batch_size => 5
	}
	stdout{
		codec => rubydebug
	}
}

讀取測試

logstash配置文件:

input{
    kafka {
        codec => "plain"
        group_id => "logstash1"
        auto_offset_reset => "smallest"
        reset_beginning => true
        topic_id => "hello"
        #white_list => ["hello"]
        #black_list => nil
        zk_connect => "192.168.0.5:2181" # zookeeper的地址
   }

}
output{
    stdout{
        codec => rubydebug
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM