1. Kafka概述
kafka是最初由LinkedIn公司開發,是一個分布式、支持分區(partition)、多副本(replica),基於zookeeper協調的分布式消息系統,它的最大的特點就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、Strom和Spark流式處理引擎,web、ngiinx日志、訪問日志,消息服務等。用Scala語言編寫,LinkedIn於2010年貢獻給Apache基金會並成為頂級開源的項目。
1.1 Kafka的使用場景
- 日志收集:
- 消息系統
- 用戶活動跟蹤
- 運營指標
1.2 Kafka的基本概念
kafka是一個分布式的,分區的消息(官方稱之為commit log)服務。它提供一個消息系統應該具備的功能,但卻有着獨特的設計。可以這樣來說,Kafka借鑒了JMS規范的思想,但是卻並沒有完全遵守JMS規范。
名稱 | 解釋 |
---|---|
Broker | 消息中間件處理節點,一個kafka節點就是一個broker,一個或者多個Broker可以組成一個Kafka集群。 |
Topic | Kafka根據topic對消息進行歸類,發布到kafka集群的每條消息都需要指定一個tiopic |
Producer | 消息生產者,向Broker發送消息的客戶端 |
Consumer | 消息消費者,從Broker讀取消息的客戶端 |
Comsumer Group | 每個Consumer屬於一個特定的Consumer Group,一條消息可以被不同的Consumer Group消費,但是一個Consumer Group中只能有一個Consumer能夠消費該消息 |
Partition | 物理上的概念。一個Topic可以分為多個Partition,每個Partition內部消息是有序的 |
1.3 創建topic
- 通過Kafka命令向zk中創建一個主題
./kafka-topics.sh --create --zookeeper 192.168.21.107 --replication-factor 1 --partitions 1 --topic test
- 查看當前zk中所有的主題
./kafka-topics.sh --list --zookeeper 192.168.21.107:2181
- 查看某個具體topic的信息
./kafka-topics.sh --zookeeper 192.168.21.107:2181 --topic test --describe
- 發送消息
./kafka-console-producer.sh --broker-list 192.168.21.107:9092 --topic test
-
消費消息
方式一:從當前主題中的最后一條消息的offset(偏移量)+1開始消費
./kafka-console-consumer.sh --bootstrap-server 192.168.21.107:9092 --topic test
方式二:從當前主題中的第一條開始消費
./kafka-console-consumer.sh --bootstrap-server 192.168.21.107:9092 --from-beginning --topic test
1.4 關於消息的細節
- 生產端將消息發送給broker,broker會將消息保存在本地的日志文件中
/usr/local/kafka_2.12/kafka-logs/主題-分區/00000000000000000000.log
- 消費的保存是有序的,通過offset偏移量來描述消息的有序性
- 消費者消費消息時,也是通過offset來描述當前要消費的那條消息的位置。
1.5 單播消息
在同一個kafka的topic中,啟動兩個消費者,一個生產者,問:生產者發送消息,這條消息是否能夠同時被兩個消費者消費
答:如果多個消費者在同一個消費組,那么只有一個消費者可以收到訂閱的topic中的消息。換言之,同一個消費組中只能有一個消費者收到一個topic中的消息
./kafka-console-producer.sh --broker-list 192.168.21.107:9092 --topic test
啟動兩個消費者,但是屬於同一個消費者組
./kafka-console-consumer.sh --bootstrap-server 192.168.21.107:9092 --consumer-property group.id=testGroup1 --topic test
發送消息后只有消費者組中的一台實例接收到了消息。
1.6 多播消息
不同的消費組訂閱了同一個topic。那么不同的消費組中只有一個消費者能收到消息。實際上也是多個消費者中的多個消費者收到了同一個消息。
./kafka-console-consumer.sh --bootstrap-server 192.168.21.107:9092 --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server 192.168.21.107:9092 --consumer-property group.id=testGroup2 --topic test
1.7 查看消費者組的詳細信息
./kafka-consumer-groups.sh --bootstrap-server 192.168.21.109:9092 --describe --group testGroup1
需要關注的幾個信息:
- current-offset:最后被消費的消息的偏移量
- Log-end-offset:消息總數(最后一條消息的偏移量)
- Lag: 積壓了多少條消息
2. 主題和分區
2.1 主題
主題(topic)在kafka中是一個邏輯的概念,kafka通過topic將消息進行分類。不同的topic會被訂閱該topic的消費者消費。
如果一個topic中的消息非常多,則該消息被保存到logr日志文件就會特別大。為了 解決這個文件大的問題,kafka提出partition分區的概念。
2.2分區
通過partition將一個topic中的消息分區來進行存儲,可以帶來如下好處
- 分區存儲,可以解決統一存儲文件過大的問題
- 提供了讀寫的吞吐量:讀和寫可以同時在多個分區中進行
創建多個分分區的主題
./kafka-topics.sh --create --zookeeper 192.168.21.107:2181 --replication-factor 1 --partitions 2 --topic test1
# 查看topic1的描述信息
./kafka-topics.sh --zookeeper 192.168.21.107:2181 --topic test1 --describe
2.3日志文件中的保存的內容
-
00000.log : 這個文件中保存的就是消息
-
_consumer_offsets-49:
kafka內部自己創建了_consumer_offsets主題包含了50個分區。這個主題用來存放消費者消費某個主題的偏移量。因為每個消費者都會自己維護着消費的主題的偏移量。也就是說每個消費者都會把消費的主題的偏移量自助上報給kafka中默認的主題:
consumer_offsets.因此kafka為了提升這個主題的並發性,默認設置了50個分區。
提交到哪個分區:通過hash函數:hash(consumerGroupId) % __consumer_offsets 主題的分區數 提交到該主題中的內容是:key是consumerGroupId+topic+分區號,value就是當前 offset的值
-
文件中保存的消息,默認保存是7天,7天后消息會被刪除。
3. 副本和集群消費
3.1 副本
副本是為了主題中的分區創建多個備份,多個副本在kafka集群的多個broker中,會有一個副本作為leader,其他都是follower
# 查看topic1的描述信息
./kafka-topics.sh --zookeeper 192.168.21.107:2181 --topic test1 --describe
- leader
Kafka的讀寫操作,都發生在leader上。Leader負責把數據同步給follower。當Leader掛掉后,經過主從選舉,會從多個follower中選舉一個新的Leader.
- follower
接收leader的同步的數據
- isr
可以同步和已同步的節點會被存入到isr集合中。這里有一個細節:如果isr中的節點性能 較差,會被移除isr集合
3.2 集群消費的細節
集群配置的server.properties文件
#0 1 2
broker.id=2
port=9092
host.name=192.168.21.107
log.dirs=/usr/local/kafka_2.12/kafka-logs
- 一個partition只能被一個消費者組的一個消費者消費,目的是為了保證消費的順序行,但是多個partition的多個消費者消費的總的順序是得不到保證的。
- partition的數量決定了消費者組中的消費者的數量。建議同一個消費組中的消費者的數量不要超過partition的數量,否則多的消費者會消費不到。
- 如果消費者掛了,會觸發rebalance機制。會讓其他消費者來消費該分區。