kafka集群介紹+部署Filebeat+Kafka+ELK


一、消息隊列

1、為什么需要消息隊列(MQ)

2、使用消息隊列的好處

3、消息隊列的兩種模式

二、Kafka

1、Kafka 定義

2、Kafka 簡介

3、Kafka 的特性

4、Kafka 系統架構

三、部署kafka集群(在之前部署的3台zookeeper上)

1、下載安裝包

2、安裝 Kafka

3、修改配置文件

4、修改環境變量

5、配置 Zookeeper 啟動腳本

6、Kafka 命令行操作

四、Kafka 架構深入

1、Kafka 工作流程及文件存儲機制 

2、數據可靠性保證

3、數據一致性問題

4、ack 應答機制 

五、部署Filebeat+Kafka+ELK  

1、服務器准備

2、部署Zookeeper+Kafka集群

3、部署 Filebeat 

4、部署 ELK,在 Logstash 組件所在節點上新建一個 Logstash 配置文件

5、瀏覽器訪問測試

 

一、消息隊列

1、為什么需要消息隊列(MQ)

主要原因是由於在高並發環境下,同步請求來不及處理,請求往往會發生阻塞。比如大量的請求並發訪問數據庫,導致行鎖表鎖,最后請求線程會堆積過多,從而觸發 too many connection 錯誤,引發雪崩效應。
我們使用消息隊列,通過異步處理請求,從而緩解系統的壓力。消息隊列常應用於異步處理,流量削峰,應用解耦,消息通訊等場景。

當前比較常見的 MQ 中間件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。

2、使用消息隊列的好處

(1)解耦
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

(2)可恢復性
系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。

(3)緩沖
有助於控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。

(4)靈活性 & 峰值處理能力
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

(5)異步通信
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

3、消息隊列的兩種模式

(1)點對點模式(一對一,消費者主動拉取數據,消息收到后消息清除)
消息生產者生產消息發送到消息隊列中,然后消息消費者從消息隊列中取出並且消費消息。消息被消費以后,消息隊列中不再有存儲,所以消息消費者不可能消費到已經被消費的消息。消息隊列支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費。

(2)發布/訂閱模式(一對多,又叫觀察者模式,消費者消費數據之后不會清除消息)
消息生產者(發布)將消息發布到 topic 中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發布到 topic 的消息會被所有訂閱者消費。
發布/訂閱模式是定義對象間一種一對多的依賴關系,使得每當一個對象(目標對象)的狀態發生改變,則所有依賴於它的對象(觀察者對象)都會得到通知並自動更新。

二、Kafka

1、Kafka 定義

Kafka 是一個分布式的基於發布/訂閱模式的消息隊列(MQ,Message Queue),主要應用於大數據實時處理領域。

2、Kafka 簡介

Kafka 是最初由 Linkedin 公司開發,是一個分布式、支持分區的(partition)、多副本的(replica),基於 Zookeeper 協調的分布式消息中間件系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景,比如基於 hadoop 的批處理系統、低延遲的實時系統、Spark/Flink 流式處理引擎,nginx 訪問日志,消息服務等等,用 scala 語言編寫,
Linkedin 於 2010 年貢獻給了 Apache 基金會並成為頂級開源項目。

3、Kafka 的特性

(1)高吞吐量、低延遲
Kafka 每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒。每個 topic 可以分多個 Partition,Consumer Group 對 Partition 進行消費操作,提高負載均衡能力和消費能力。

(2)可擴展性
kafka 集群支持熱擴展

(3)持久性、可靠性
消息被持久化到本地磁盤,並且支持數據備份防止數據丟失

(4)容錯性
允許集群中節點失敗(多副本情況下,若副本數量為 n,則允許 n-1 個節點失敗)

(5)高並發
支持數千個客戶端同時讀寫

4、Kafka 系統架構

(1)Broker
一台 kafka 服務器就是一個 broker。一個集群由多個 broker 組成。一個 broker 可以容納多個 topic。

(2)Topic
可以理解為一個隊列,生產者和消費者面向的都是一個 topic。
類似於數據庫的表名或者 ES 的 index
物理上不同 topic 的消息分開存儲

(3)Partition
為了實現擴展性,一個非常大的 topic 可以分布到多個 broker(即服務器)上,一個 topic 可以分割為一個或多個 partition,每個 partition 是一個有序的隊列。Kafka 只保證 partition 內的記錄是有序的,而不保證 topic 中不同 partition 的順序。

每個 topic 至少有一個 partition,當生產者產生數據的時候,會根據分配策略選擇分區,然后將消息追加到指定的分區的隊列末尾。

Partation 數據路由規則:

1.指定了 patition,則直接使用;
2.未指定 patition 但指定 key(相當於消息中某個屬性),通過對 key 的 value 進行 hash 取模,選出一個 patition;
3.patition 和 key 都未指定,使用輪詢選出一個 patition。

每條消息都會有一個自增的編號,用於標識消息的偏移量,標識順序從 0 開始。

每個 partition 中的數據使用多個 segment 文件存儲。

如果 topic 有多個 partition,消費數據時就不能保證數據的順序。嚴格保證消息的消費順序的場景下(例如商品秒殺、 搶紅包),需要將 partition 數目設為 1。

●broker 存儲 topic 的數據。如果某 topic 有 N 個 partition,集群有 N 個 broker,那么每個 broker 存儲該 topic 的一個 partition。
●如果某 topic 有 N 個 partition,集群有 (N+M) 個 broker,那么其中有 N 個 broker 存儲 topic 的一個 partition, 剩下的 M 個 broker 不存儲該 topic 的 partition 數據。
●如果某 topic 有 N 個 partition,集群中 broker 數目少於 N 個,那么一個 broker 存儲該 topic 的一個或多個 partition。在實際生產環境中,盡量避免這種情況的發生,這種情況容易導致 Kafka 集群數據不均衡。

分區的原因
●方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;
●可以提高並發,因為可以以Partition為單位讀寫了。

(4)Replica
副本,為保證集群中的某個節點發生故障時,該節點上的 partition 數據不丟失,且 kafka 仍然能夠繼續工作,kafka 提供了副本機制,一個 topic 的每個分區都有若干個副本,一個 leader 和若干個 follower。

(5)Leader
每個 partition 有多個副本,其中有且僅有一個作為 Leader,Leader 是當前負責數據的讀寫的 partition。

(6)Follower
Follower 跟隨 Leader,所有寫請求都通過 Leader 路由,數據變更會廣播給所有 Follower,Follower 與 Leader 保持數據同步。Follower 只負責備份,不負責數據的讀寫。
如果 Leader 故障,則從 Follower 中選舉出一個新的 Leader。
當 Follower 掛掉、卡住或者同步太慢,Leader 會把這個 Follower 從 ISR(Leader 維護的一個和 Leader 保持同步的 Follower 集合) 列表中刪除,重新創建一個 Follower。

(7)Producer
生產者即數據的發布者,該角色將消息發布到 Kafka 的 topic 中。
broker 接收到生產者發送的消息后,broker 將該消息追加到當前用於追加數據的 segment 文件中。
生產者發送的消息,存儲到一個 partition 中,生產者也可以指定數據存儲的 partition。

(8)Consumer
消費者可以從 broker 中讀取數據。消費者可以消費多個 topic 中的數據。

(9)Consumer Group(CG)
消費者組,由多個 consumer 組成。
所有的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者。可為每個消費者指定組名,若不指定組名則屬於默認的組。
將多個消費者集中到一起去處理某一個 Topic 的數據,可以更快的提高數據的消費能力。
消費者組內每個消費者負責消費不同分區的數據,一個分區只能由一個組內消費者消費,防止數據被重復讀取。
消費者組之間互不影響。

(10)offset 偏移量
可以唯一的標識一條消息。
偏移量決定讀取數據的位置,不會有線程安全的問題,消費者通過偏移量來決定下次讀取的消息(即消費位置)。
消息被消費之后,並不被馬上刪除,這樣多個業務就可以重復使用 Kafka 的消息。
某一個業務也可以通過修改偏移量達到重新讀取消息的目的,偏移量由用戶控制。
消息最終還是會被刪除的,默認生命周期為 1 周(7*24小時)。

(11)Zookeeper
Kafka 通過 Zookeeper 來存儲集群的 meta 信息。

由於 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復后,需要從故障前的位置的繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復后繼續消費。
Kafka 0.9 版本之前,consumer 默認將 offset 保存在 Zookeeper 中;從 0.9 版本開始,consumer 默認將 offset 保存在 Kafka 一個內置的 topic 中,該 topic 為 __consumer_offsets。

三、部署kafka集群(在之前部署的3台zookeeper上)

1、下載安裝包

官方下載地址:http://kafka.apache.org/downloads.html

2、安裝 Kafka 

cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka

 

3、修改配置文件

cd /usr/local/kafka/config/
cp server.properties{,.bak}

vim server.properties
broker.id=0    #21行,broker的全局唯一編號,每個broker不能重復,因此要在其他機器上配置 broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.142.3:9092    #31行,指定監聽的IP和端口,如果修改每個broker的IP需區分開來,也可保持默認配置不用修改
num.network.threads=3    #42行,broker 處理網絡請求的線程數量,一般情況下不需要去修改
num.io.threads=8         #45行,用來處理磁盤IO的線程數量,數值應該大於硬盤數
socket.send.buffer.bytes=102400       #48行,發送套接字的緩沖區大小
socket.receive.buffer.bytes=102400    #51行,接收套接字的緩沖區大小
socket.request.max.bytes=104857600    #54行,請求套接字的緩沖區大小
log.dirs=/usr/local/kafka/logs        #60行,kafka運行日志存放的路徑,也是數據存放的路徑
num.partitions=1    #65行,topic在當前broker上的默認分區個數,會被topic創建時的指定參數覆蓋
num.recovery.threads.per.data.dir=1    #69行,用來恢復和清理data下數據的線程數量
log.retention.hours=168    #103行,segment文件(數據文件)保留的最長時間,單位為小時,默認為7天,超時將被刪除
log.segment.bytes=1073741824    #110行,一個segment文件最大的大小,默認為 1G,超出將新建一個新的segment文件
zookeeper.connect=192.168.142.3:2181,192.168.142.4:2181,192.168.142.5:2181    ●123行,配置連接Zookeeper集群地址

 

 

 

 

4、修改環境變量  

vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile
echo $PATH

 

5、配置 Zookeeper 啟動腳本

vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
	echo "---------- Kafka 啟動 ------------"
	${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
	echo "---------- Kafka 停止 ------------"
	${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
	$0 stop
	$0 start
;;
status)
	echo "---------- Kafka 狀態 ------------"
	count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
	if [ "$count" -eq 0 ];then
        echo "kafka is not running"
    else
        echo "kafka is running"
    fi
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

//設置開機自啟
chmod +x /etc/init.d/kafka
chkconfig --add kafka

//分別啟動 Kafka
service kafka start

 

 

6、Kafka 命令行操作

(1)創建topic  

kafka-topics.sh --create --zookeeper 192.168.142.3:2181,192.168.142.4:2181,192.168.142.5:2181 --replication-factor 2 --partitions 3 --topic test (zk集群的ip)
----------------------------------------------------------
(1)zookeeper:定義zookeeper集群服務器地址,如果有多個IP地址使用逗號分割,一般使用一個IP即可
(2)replication-factor:定義分區副本數,1代表單副本,建議為2 
(3)partitions:定義分區數 
(4)topic:定義topic名稱

 

(2)查看當前服務器中的所有 topic

kafka-topics.sh --list --zookeeper 192.168.142.3:2181,192.168.142.4:2181,192.168.142.5:2181 

 

(3)查看某個 topic 的詳情

kafka-topics.sh  --describe --zookeeper 192.168.142.3:2181,192.168.142.4:2181,192.168.142.5:2181 

 

(4)發布消息

kafka-console-producer.sh --broker-list 192.168.142.3:9092,192.168.142.4:9092,192.168.142.5:9092  --topic test

 

(5)消費消息  

kafka-console-consumer.sh --bootstrap-server 192.168.142.3:9092,192.168.142.4:9092,192.168.142.5:9092 --topic test --from-beginning

--from-beginning:會把主題中以往所有的數據都讀取出來

 

(6)修改分區數

kafka-topics.sh --zookeeper 192.168.142.3:2181,192.168.142.4:2181,192.168.142.5:2181 --alter --topic test --partitions 6

 

(7)刪除 topic

kafka-topics.sh --delete --zookeeper 192.168.142.3:2181,192.168.142.4:2181,192.168.142.5:2181 --topic test

 

四、Kafka 架構深入

1、Kafka 工作流程及文件存儲機制  

Kafka 中消息是以 topic 進行分類的,生產者生產消息,消費者消費消息,都是面向 topic 的。

topic 是邏輯上的概念,而 partition 是物理上的概念,每個 partition 對應於一個 log 文件,該 log 文件中存儲的就是 producer 生產的數據。Producer 生產的數據會被不斷追加到該 log 文件末端,且每條數據都有自己的 offset。 消費者組中的每個消費者,都會實時記錄自己消費到了哪個 offset,以便出錯恢復時,從上次的位置繼續消費。

由於生產者生產的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導致數據定位效率低下,Kafka 采取了分片和索引機制,將每個 partition 分為多個 segment。每個 segment 對應兩個文件:“.index” 文件和 “.log” 文件。這些文件位於一個文件夾下,該文件夾的命名規則為:topic名稱+分區序號。例如,test 這個 topic 有三個分區, 則其對應的文件夾為 test-0、test-1、test-2。

index 和 log 文件以當前 segment 的第一條消息的 offset 命名。

“.index” 文件存儲大量的索引信息,“.log” 文件存儲大量的數據,索引文件中的元數據指向對應數據文件中 message 的物理偏移地址。

2、數據可靠性保證

為保證 producer 發送的數據,能可靠的發送到指定的 topic,topic 的每個 partition 收到 producer 發送的數據后, 都需要向 producer 發送 ack(acknowledgement 確認收到),如果 producer 收到 ack,就會進行下一輪的發送,否則重新發送數據。  

3、數據一致性問題  

LEO:指的是每個副本最大的 offset;
HW:指的是消費者能見到的最大的 offset,所有副本中最小的 LEO。

(1)follower 故障
follower 發生故障后會被臨時踢出 ISR(Leader 維護的一個和 Leader 保持同步的 Follower 集合),待該 follower 恢復后,follower 會讀取本地磁盤記錄的上次的 HW,並將 log 文件高於 HW 的部分截取掉,從 HW 開始向 leader 進行同步。等該 follower 的 LEO 大於等於該 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。

(2)leader 故障
leader 發生故障之后,會從 ISR 中選出一個新的 leader, 之后,為保證多個副本之間的數據一致性,其余的 follower 會先將各自的 log 文件高於 HW 的部分截掉,然后從新的 leader 同步數據。

:這只能保證副本之間的數據一致性,並不能保證數據不丟失或者不重復。

4、ack 應答機制 

對於某些不太重要的數據,對數據的可靠性要求不是很高,能夠容忍數據的少量丟失,所以沒必要等 ISR 中的 follower 全部接收成功。所以 Kafka 為用戶提供了三種可靠性級別,用戶根據對可靠性和延遲的要求進行權衡選擇。

當 producer 向 leader 發送數據時,可以通過 request.required.acks 參數來設置數據可靠性的級別:
●0:這意味着producer無需等待來自broker的確認而繼續發送下一批消息。這種情況下數據傳輸效率最高,但是數據可靠性確是最低的。當broker故障時有可能丟失數據。

●1(默認配置):這意味着producer在ISR中的leader已成功收到的數據並得到確認后發送下一條message。如果在follower同步成功之前leader故障,那么將會丟失數據。

●-1(或者是all):producer需要等待ISR中的所有follower都確認接收到數據后才算一次發送完成,可靠性最高。但是如果在 follower 同步完成后,broker 發送ack 之前,leader 發生故障,那么會造成數據重復。

三種機制性能依次遞減,數據可靠性依次遞增。

:在 0.11 版本以前的Kafka,對此是無能為力的,只能保證數據不丟失,再在下游消費者對數據做全局去重。在 0.11 及以后版本的 Kafka,引入了一項重大特性:冪等性。所謂的冪等性就是指 Producer 不論向 Server 發送多少次重復數據, Server 端都只會持久化一條。

五、部署Filebeat+Kafka+ELK  

1、服務器准備 

主機ip 服務
192.168.142.10 Elasticsearch 、Kibana、Filebeat
192.168.142.20 Elasticsearch
192.168.142.6 Logstash、Apache
192.168.142.3 Zookeeper、Kafka
192.168.142.4 Zookeeper、Kafka
192.168.142.5 Zookeeper、Kafka

2、部署Zookeeper+Kafka集群

詳情請見前面博客

3、部署 Filebeat (192.168.142.10)

cd /usr/local/filebeat

vim filebeat.yml
filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /var/log/messages
    - /var/log/*.log
......
#添加輸出到Kafka的配置
output.kafka:
  enabled: true
  hosts: ["192.168.142.3:9092","192.168.142.5:9092","192.168.142.6:9092"]    #指定 Kafka 集群配置
  topic: "kafka_test"    #指定 Kafka 的 topic
  
#啟動 filebeat
./filebeat -e -c filebeat.yml

 

 

4、部署 ELK,在 Logstash 組件所在節點上新建一個 Logstash 配置文件 

cd /etc/logstash/conf.d/

vim filebeat.conf
input {
    kafka {
        bootstrap_servers => "192.168.142.3:9092,192.168.142.4:9092,192.168.142.5:9092"
        topics  => "filebeat_test"
        group_id => "test123"
        auto_offset_reset => "earliest"
    }
}

output {
    elasticsearch {
        hosts => ["192.168.142.10:9200"]
        index => "filebeat_test-%{+YYYY.MM.dd}"
    }
    stdout {
        codec => rubydebug
    }
}

#啟動 logstash
logstash -f filebeat.conf

 

5、瀏覽器訪問測試

瀏覽器訪問 http://192.168.142.10:5601 登錄 Kibana,單擊“Create Index Pattern”按鈕添加索引“filebeat_test-*”,單擊 “create” 按鈕創建,單擊 “Discover” 按鈕可查看圖表信息及日志信息。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM