大數據篇:Kafka
Kafka 是什么?
Kafka是一種高吞吐量的分布式發布、訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。
如果沒有Kafka
大數據領域的每秒數百萬以上的消息,消息的持久化無法處理;
傳統領域的發短信、郵件的等異步操作,削峰處理等等。(當然也可以使用RabbitMQ、ActiveMQ、RocketMQ等)
1 兩種消息處理模式
2 Kafka架構
-
Producer :消息生產者,kafka broker發消息的客戶端。
-
Consumer :消息消費者,kafka broker接收消息的客戶端
-
Topic :一個主題,用於說明生產消費使用的是什么主題,可以理解為一個隊列。(對數據進行分類)
-
Consumer Group (CG):kafka用來實現一個topic消息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會復制(不是真的復制,是概念上的)到所有的CG,但每個partion只會把消息發給該CG中的一個consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次發送消息到不同的topic。(如上圖TopicA發送給消費者組的消息,由ConsumerA接收了,那么ConsumerB就不能接收了,這時就實現了單播)(如上圖假設,3個consumer各為一個消費者組,TopicA由3個組共同接收了,這時就實現了廣播)(理論上,Topic的分區數對應消費者組中消費者數量性能最優)
-
Broker :一台kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
-
Partition:為了實現擴展性,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的整體(多個partition間)的順序。(kafka只能讀取Leader的partition中的信息)
-
Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位於2049的位置,只要找到2048.kafka的文件即可。當然第一個offset就是00000000000.kafka
-
zookeeper:用來保存kafka集群的配置等信息
3 監控安裝
3.1 Kafkatool
此軟件用來查看kafka生產者數據等信息,下載安裝即可。
-
百度網盤地址:https://pan.baidu.com/s/1N8BCXXgVydrDvJYXRSYxdQ 提取碼:calp
3.2 CMAK
CMAK(以前稱為Kafka Manager)是用於管理Kafka集群的工具,主要用來觀察消費者等信息。3.0.x以上需要java11以上,zookeeper3.5.x以上才可以運行
- 上傳文件包解壓
mkdir /usr/local/src/CMAK
cd /usr/local/src/CMAK
unzip cmak-3.0.0.4.zip
cd cmak-3.0.0.4
- 修改配置
vim conf/application.conf
zk集群,注意這里配置的需要和kafka配置的對應,否則web會找不到消費者組,如果找不到把IP換成hostname,或者把hostname換成IP。
老版本
kafka-manager.zkhosts="192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181"
新版本
cmak.zkhosts="192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181"
- 啟動
cmak 默認的端口是8080,可通過 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:
nohup bin/cmak -Dconfig.file=conf/application.conf -Dhttp.port=8080
4 命令操作
4.1 創建 topic
#創建top-test主題,2個分區,2個副本
kafka-topics --create --zookeeper cdh01.cm:2181,cdh02.cm:2181,cdh03.cm:2181 --topic top-test --partitions 2 --replication-factor 2
4.2 查看topic
kafka-topics --list --zookeeper cdh01.cm:2181,cdh02.cm:2181,cdh03.cm:2181
4.3 刪除topic
kafka-topics --delete --zookeeper cdh01.cm:2181,cdh02.cm:2181,cdh03.cm:2181 --topic top-test
4.4 查看topic詳情
kafka-topics --describe --zookeeper cdh01.cm:2181,cdh02.cm:2181,cdh03.cm:2181 --topic top-test
4.5 生產者-消費者
#生產者
kafka-console-producer --topic top-test --broker-list cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
#消費者(--from-beginning 從頭開始讀取)
kafka-console-consumer --topic top-test --bootstrap-server cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092 --from-beginning --group g1
- 首先在生產者端輸入多條數據,我這里輸入aa->ff,效果如下
5 Kafka工作流程
kafka每個分區的offset都是從0開始的,保證了區內有序,不能保證全局有序;
producer不在zk中注冊,消費者在zk中注冊。
topic是邏輯上的概念,而partition是物理上的概念,每個partition對應一個log文件,該log文件中儲存的就是生產的數據。生產者生產的數據會被不斷的追加到該log文件末端,且每條數據都有自己的offset。消費者組中的每個消費者,都會實時記錄自己消費到了哪個offset,以便出錯恢復時,從上次的位置繼續消費。如下圖:
5.1 生產消息過程
1步解釋:producer先從zookeeper的 "/brokers/.../state"節點找到該topic對應partition的leader;
通過4-6步保證數據可靠性,kafka選擇了必須全部ack發送成功才完成同步。
-
分區的原因
- 方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;
- 可以提高並發,因為可以以Partition為單位讀寫了。
-
分區原則
-
指定了patition,則直接使用;
-
未指定patition但指定key,通過對key的value進行hash出一個patition
-
patition和key都未指定,使用輪詢選出一個patition。
-
5.2 消費消息過程
消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic。每個分區在同一時間只能由group中的一個消費者讀取,但是多個group可以同時消費這個partition。在圖中,有一個由三個消費者組成的group,有一個消費者讀取主題中的兩個分區,另外兩個分別讀取一個分區。某個消費者讀取某個分區,也可以叫做某個消費者是某個分區的擁有者。
在這種情況下,消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的group成員會自動負載均衡讀取之前失敗的消費者讀取的分區。
- 消費方式
- consumer采用pull(拉)模式從broker中讀取數據。
- push(推)模式很難適應消費速率不同的消費者,因為消息發送速率是由broker決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據consumer的消費能力以適當的速率消費消息。
- 對於Kafka而言,pull模式更合適,它可簡化broker的設計,consumer可自主控制消費消息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。
- pull模式不足之處是,如果kafka沒有數據,消費者可能會陷入循環中,一直等待數據到達。為了避免這種情況,我們在我們的拉請求中有參數,允許消費者請求在等待數據到達的“長輪詢”中進行阻塞(並且可選地等待到給定的字節數,以確保大的傳輸大小)。
- 消費者組的偏移量等信息存儲在zookeeper中的consumers節點中。
5.3 保存消息
由於生產者生產的消息會不斷的追加到log文件末尾,為防止log文件過大導致的數據定位效率低問題,Kafka采取分片和索引機制,將每個patition分為多個segment。
每個segment對應2個文件-->".index和.log"文件。這些文件位於一個patition文件夾下,其patition文件夾命名規則為:topic名稱-分區號。
".index和.log"文件以當前segment的第一條消息的offset命名,如下:
00000000000000000000.index
00000000000000000000.log
00000000000000135489.index
00000000000000135489.log
00000000000000268531.index
00000000000000268531.log
下面為".index和.log"文件結構示意圖:
6 Kafka壓測
6.1 Kafka Producer 壓力測試
- record-size 是一條信息有多大,單位是字節。
- num-records 是總共發送多少條信息。
- throughput 是每秒多少條信息,設成-1,表示不限流,可測出生產者最大吞吐量。
./kafka-producer-perf-test --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
100000 records sent, 95877.277085 records/sec (9.14 MB/sec),
187.68 ms avg latency, 424.00 ms max latency, 155 ms 50th, 411 ms
95th, 423 ms 99th, 424 ms 99.9th
參數解析:一共寫入 10w 條消息,吞吐量為 9.14 MB/sec,每次寫入的平均延遲
為 187.68 毫秒,最大的延遲為 424.00 毫秒。
6.2 Kafka Consumer 壓力測試
-
zookeeper 指定 zookeeper 的鏈接信息
-
topic 指定 topic 的名稱
-
fetch-size 指定每次 fetch 的數據的大小
-
messages 總共要消費的消息個數
./kafka-consumer-perf-test.sh --zookeeper cdh01.cm:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2019-02-19 20:29:07:566, 2019-02-19 20:29:12:170, 9.5368, 2.0714, 100010, 21722.4153
開始測試時間,測試結束數據,共消費數據 9.5368MB,吞吐量 2.0714MB/s,共消費100010 條,平均每秒消費 21722.4153 條。
6.3 Kafka 機器數量計算
Kafka 機器數量(經驗公式)=2(峰值生產速度副本數/100)+1
先拿到峰值生產速度,再根據設定的副本數,就能預估出需要部署 Kafka 的數量。
比如我們的峰值生產速度是 50M/s。副本數為 2。
Kafka 機器數量=2(502/100)+ 1=3 台