kafka快速入門到精通


目錄

中文文檔

https://kafka.apachecn.org/

1. 消息隊列兩種模式

1.1 消息隊列作用

消息隊列主要作用:
 1.解偶
 2.削峰,緩沖
 3.異步
 4.占存數據

kafka機器可用動態添加減少機器

1.2 點對點模式(一對一,消費者主動拉取數據,消息收到后消息刪除)

消息生產者生產消息發送到Queue 中,然后消息消費者從Queue 中取出並且消費消息。消息被消費以后,queue 中不再有存儲,所以消息消費者不可能消費到已經被消費的消息。Queue 支持存在多個消費者, 但是對一個消息而言, 只會有一個消費者可以消費。

eg:rabbitMQ簡單模式

1.3 發布/訂閱模式(一對多,消費數據之后不會刪除消息)

消息生產者(發布)將消息發布到 topic 中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發布到 topic 的消息會被所有訂閱者消費。

eg:kafka

1.4 kafka框架

1)	Producer :消息生產者,就是向 kafka broker 發消息的客戶端;
2)	Consumer :消息消費者,向 kafka broker 取消息的客戶端;
3)	Consumer Group (CG):消費者組,由多個 consumer 組成。消費者組內每個消費者負責消費不同分區的數據,一個分區只能由一個組內消費者消費;消費者組之間互不影響。所有的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者。
4)	Broker :一台 kafka 服務器就是一個 broker。一個集群由多個 broker 組成。一個 broker
可以容納多個topic。
5)	Topic :可以理解為一個隊列,生產者和消費者面向的都是一個 topic;
6)	Partition:為了實現擴展性,一個非常大的 topic 可以分布到多個broker(即服務器)上,一個 topic 可以分為多個 partition,每個 partition 是一個有序的隊列;
7)	Replica:副本,為保證集群中的某個節點發生故障時,該節點上的 partition 數據不丟失,且kafka 仍然能夠繼續工作,kafka 提供了副本機制,一個 topic 的每個分區都有若干個副本,一個 leader 和若干個 follower。
8)	leader:每個分區多個副本的“主”,生產者發送數據的對象,以及消費者消費數據的對象都是 leader。
9)	follower:每個分區多個副本中的“從”,實時從 leader 中同步數據,保持和 leader 數據的同步。leader 發生故障時,某個 follower 會成為新的 leader。

1.5 kafka工作流程

2. 安裝部署

2.1 下載安裝

下載官網:https://kafka.apache.org/downloads.html
使用:0.11.0.0版本
下載:wget https://archive.apache.org/dist/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz
解壓:tar -xf kafka_2.11-0.11.0.0.tgz -C /opt/module/
改名:mv kafka_2.11-0.11.0.0 kafka

2.2 配置hosts文件

cat >> /etc/hosts << EOF
192.168.0.215 sg-15
192.168.0.216 sg-16
192.168.0.217 sg-17
EOF

2.3 kafka配置文件(日志數據分離)

日志數據分離:
需要手動創建目錄:/opt/module/kafka/data  //數據目錄
會自動創建目錄:/opt/module/kafka/logs  //日志目錄

[root@sg-15 config]# cd /opt/module/kafka/config/
[root@sg-15 config]# vi server.properties //修改配置文件

輸入以下內容:
#broker 的全局唯一編號,不能重復。其他機器kafka需要修改
broker.id=0
#刪除 topic 功能使能
delete.topic.enable=true 
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤 IO 的現成數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400 
#接收套接字的緩沖區大小
socket.receive.buffer.bytes=102400 
#請求套接字的緩沖區大小
socket.request.max.bytes=104857600 
#kafka 暫存數據路徑
log.dirs=/opt/module/kafka/data 
#topic 在當前 broker 上的分區個數
num.partitions=1
#用來恢復和清理 data 下數據的線程數量
num.recovery.threads.per.data.dir=1 
#一個數據文件最大大小。默認1G,/opt/module/kafka/data/00000000000000000000.log
log.segment.bytes=1073741824
#segment 文件保留的最長時間,超時將被刪除。單位小時
log.retention.hours=168 
#配置連接Zookeeper 集群地址
zookeeper.connect=192.168.0.215:2181,192.168.0.216:2181,192.168.0.217:2181


配置完之后分發到其他kafka服務器,然后把配置文件broker.id改了broker.id=1、broker.id=2

2.4 配置環境變量

配置kafka命令,想配就配。不然就到kafaka/bin里執行命令

[root@sg-15 kafka]# sudo vi /etc/profile

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin

[atguigu@hadoop102 module]$ source /etc/profile

2.5 啟動集群/bin下腳本說明

bin目錄下常用文件介紹:
kafka-server-start.sh      //啟動kafka服務
kafka-server-stop.sh       //停止kafka服務
kafka-console-consumer.sh  //控制台的消費者-測試環境使用
kafka-console-producer.sh  //控制台的生產者-測試環境使用
kafka-topics.sh            //關於topic的操作-增刪改查
kafka-producer-perf-test.sh //生產者壓力測試-測試集群負載能力
kafka-consumer-perf-test.sh //消費者壓力測試-測試集群負載能力

//啟動kafka,需要指定配置文件
[root@sg-15 kafka]# bin/kafka-server-start.sh -daemon config/server.properties  
[root@sg-15 kafka]# jps // 查看
29735 Jps
29658 Kafka
24235 QuorumPeerMain

2.6 啟動kafka集群shell腳本

**:先做免密登陸
[root@sg-15 bin]# vi kafka.sh
[root@sg-15 script]# chmod 777 kafka.sh //加權限

case $1 in 
"start"){
	for i in 192.168.0.215 192.168.0.216 192.168.0.217
	do
		echo "*******kafka--$i**********"
		ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
		done
};;
"stop"){
	for i in 192.168.0.215 192.168.0.216 192.168.0.217
	do
		echo "*******kafka--$i**********"
		ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"
		done
};;
esac

2.7 kafka啟動日志文件

如果jps查看沒有kafka進程,到這個日志查原因

// 日志文件
[root@sg-15 logs]# cat /opt/module/kafka/logs/server.log

3. topic(隊列)增刪改查

3.1 增-create

--create
//增加一個主題(隊列)
[root@sg-15 bin]# ./kafka-topics.sh --create --zookeeper 192.168.0.215:2181 --topic topicName --partitions 2 --replication-factor 2

Created topic "topicName".

參數:
--topic:定義主題名稱
--partitions:定義分區數
--replication-factor:定義副本數,副本數小於等於kafka節點數(機器數)

3.2 刪-delete

--delete
//刪除topic
[root@sg-15 bin]# ./kafka-topics.sh --delete --zookeeper 192.168.0.215:2181 --topic topic_name

Topic topic_name is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

注意:配置文件必須把delete.topic.enable=true,刪除功能打開才能成功。

3.3 修改topic分區數

--alter
//修改分區數
[root@sg-15 bin]# ./kafka-topics.sh --zookeeper 192.168.0.215:2181 --topic topic_name --alter --partitions 4

3.4 查topic列表-list

--list
//查詢主題列表(隊列),需要連接zookeeper
[root@sg-15 bin]# ./kafka-topics.sh --list --zookeeper 192.168.0.215:2181

3.5 查topic詳情-describe

--describe
// 查詢詳情,副本數,分區數等
[root@sg-15 bin]# ./kafka-topics.sh --describe --topic frist --zookeeper 192.168.0.215:2181
Topic:frist	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: frist	Partition: 0	Leader: 1	Replicas: 1	Isr: 1

[root@sg-15 bin]# ./kafka-topics.sh --describe --topic frist2 --zookeeper 192.168.0.215:2181
Topic:frist2	PartitionCount:2	ReplicationFactor:2	Configs:
	Topic: frist2	Partition: 0	Leader: 1	Replicas: 1,0	Isr: 1,0
	Topic: frist2	Partition: 1	Leader: 2	Replicas: 2,1	Isr: 2

4. 終端開啟測試生產者消費者

4.1 開啟生產者

[root@sg-15 bin]# ./kafka-console-producer.sh --topic topicName -broker-list 192.168.0.215:9092
>aaa  // 生產消息
>123456  // 生產消息

4.2 開啟消費者

// 開啟消費者方式1:通過老方式依賴--zookeeper方式(有一堆提示)
[root@sg-17 bin]# ./kafka-console-consumer.sh --topic topicName --zookeeper 192.168.0.215:2181
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
123456  //收到消息

// 開啟消費者方式2:通過新版本方式--bootstrap-server(無提示)
--from-beginning 從頭消費
[root@sg-17 bin]# ./kafka-console-consumer.sh --topic topicName --bootstrap-server 192.168.0.215:9092 --from-beginning
123456  //收到消息

// --from-beginning 從頭開始消費7天內數據,最大保留7天
[root@sg-17 bin]# ./kafka-console-consumer.sh --topic topicName --bootstrap-server 192.168.0.215:9092 --from-beginning
aaa  //收到消息
123456  //收到消息

4.3 新版和老版對比zookeeper/bootstrap-server

老版:通過--zookeeper 方式開啟消費者。zookeeper中主要存offset偏移量,依賴於zookeeper,需要頻繁和zookeeper通信。
新版:通過--bootstrap-server offset偏移量存在了kafka自己本身中,kafka自默認有50個分區1個副本,50個分區以輪訓的方式分布在kafka所有機器中,50個分區存系統主題

4.4 數據目錄kafka/data

數據目錄:/opt/module/kafka/data
創建topic時指定了兩個副本,生產者生產數據時默認輪訓往副本總寫入寫消息
00000000000000000000.log:存放數據文件,超過最大文件大小,將產生一個新的文件
log.segment.bytes=1073741824 參數配置00000000000000000000.log文件大小

“.index”文件存儲大量的索引信息
“.log”文件存儲大量的數據,索引文件中的元數據指向對應數據文件中message的物理偏移地址。

兩個問題:
 1.怎么快速定位到消費的位置,.index索引文件
 2.超過文件大小時,新文件名稱怎么命名

通過index文件快速定位到消費的位置:
第一步:通過二分查找發定位到.index文件
第二步:掃描index文件,找到數據在.log文件中具體的物理偏移量

5. kafka架構深入

5.1 kafka工作流程

topic可以理解為:隊列
1.Kafka中消息是以topic進行分類的,生產者生產消息,消費者消費消息,都是面向topic的。

2.topic是邏輯上的概念,而partition是物理上的概念,每個partition對應於一個00000000000000000000.log數據文件,該log文件中存儲的就是producer生產的數據。Producer生產的數據會被不斷追加到該log文件末端,且每條數據都有自己的offset。消費者組中的每個消費者,都會實時記錄自己消費到了哪個offset,以便出錯恢復時,從上次的位置繼續消費。

5.2 kafka存儲機制

  由於生產者生產的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導致數據定位效率低下,Kafka 采取了分片和索引機制,將每個 partition 分為多個 segment。每個 segment 對應兩個文件“.index”索引文件和“.log”數據文件。這些文件位於一個文件夾下,該文件夾的命名規則為:topic 名稱+分區序號。

“.index”文件存儲大量的索引信息
“.log”文件存儲大量的數據,索引文件中的元數據指向對應數據文件中message的物理偏移地址。

5.3 kafka生產者

5.3.1 分區策略/分區原因/分區原則

分區策略

一個消費者組中有多個消費者,一個topic有多個分區,所以必然會涉及到partition的分配問題,即確定那個partition 由哪個consumer來消費。
兩種方式:1)RoundRobin 2)Range
Range:面向topic處理,分配給訂閱了該主題的人
RoundRobin:面向消費者組處理,分配給某個消費者組

分區原因

(1)方便在集群中擴展,每個Partition 可以通過調整以適應它所在的機器,而一個topic又可以有多個 Partition 組成,因此整個集群就可以適應任意大小的數據了;

(2)	可以提高並發,因為可以以Partition 為單位讀寫了。

分區原則

(1)	指明partition的情況下,直接將指明的值直接作為partiton值;
(2)	沒有指明partition值但有key的情況下,將key的hash值與topic的partition數進行取余得到partition值;
(3)	既沒有partition值又沒有key值的情況下,第一次調用時隨機選擇一個partition,后面就輪尋。

5.3.2 數據可靠性

  為保證 producer 發送的數據,能可靠的發送到指定的 topic,topic的每個 partition 收到producer 發送的數據后,都需要向 producer 發送 ack(acknowledgement 確認收到),如果producer 收到 ack,就會進行下一輪的發送,否則重新發送數據。

什么時候回復ack?
 確保有follower與leader同步完成,leader再發送ack,這樣才能保證leader 掛掉之后,能在follower中選舉出新的leader

5.3.3 kafka數據同步策略/leader回復ack時機

kafka同步策略:全部完成同步,才發送ack回復
Kafka 選擇了第二種方案,原因如下:
  1.同樣為了容忍 n 台節點的故障,第一種方案需要 2n+1 個副本,而第二種方案只需要 n+1
個副本,而Kafka 的每個分區都有大量的數據,第一種方案會造成大量數據的冗余。
  2.雖然第二種方案的網絡延遲會比較高,但網絡延遲對 Kafka 的影響較小。
方案 優點 缺點
半數以上完成同步,就發送 ack 延遲低 選舉新的 leader 時,容忍 n 台節點的故障,需要 2n+1 個副 本
全部完成同步,才發送 ack 選舉新的 leader 時,容忍 n 台節點的故障,需要 n+1 個副 本 延遲高

5.3.4 ISR保持同步的follower集合

  采用第二種方案之后(ack=-1),設想以下情景:leader收到數據,所有follower都開始同步數據,但有一個follower,因為某種故障,遲遲不能與leader進行同步,那leader就要一直等下去,直到它完成同步,才能發送ack。這個問題怎么解決呢?

  Leader維護了一個動態的in-sync replica set (ISR),意為和leader保持同步的follower集合。當ISR中的follower完成數據的同步之后,leader就會給follower發送ack。如果follower長時間未向leader同步數據,則該follower將被踢出ISR,該時間閾值由:
replica.lag.time.max.ms參數設定。Leader發生故障之后,就會從ISR中選舉新的leader。

5.3.5 leader故障選舉新leader(通過ISR選舉)

leader:通過isr選舉:
 0.9版本之前(老版本):2個條件,與之前leader同步時間最短,同步消息數最多的就是新leader。
 0.9版本之后(新版本):1個條件,與之前leader同步時間最短的就是新leader。

5.3.6 ack應答機制/ack配置

ack:生產者發送數據確認收到,控制數據丟失
HW:控制數據所有副本一致性

  對於某些不太重要的數據,對數據的可靠性要求不是很高,能夠容忍數據的少量丟失,所以沒必要等ISR中的follower全部接收成功。
  所以Kafka為用戶提供了三種可靠性級別,用戶根據對可靠性和延遲的要求進行權衡,選擇以下的配置。

acks參數配置:0/1/-1(all)
 0:producer不等待broker的ack,這一操作提供了一個最低的延遲,broker一接收到還沒有寫入磁盤就已經返回,當broker故障時有可能丟失數據;
 1:producer等待broker的ack,partition的leader落盤成功后返回ack,如果在follower同步成功之前leader故障,那么將會丟失數據;
 -1(all):producer等待broker的ack,partition的leader和follower全部落盤成功后才返回ack。但是如果在follower同步完成后,broker發送ack之前,leader發生故障,那么會造成數據重復。
 
acks:大白話
0:生產者不接收返回值(leader掛或不掛都可能丟失數據)
1:leader收到消息后,返回ack(leader掛了會丟失數據)
-1:等待ISR集合中全部收到數據,返回ack。(leader掛了,可能重復數據)

5.3.7 故障處理細節(follower故障,leader故障,HW,LEO)

log文件中的HW,LEO
LEO:指的是每個副本最大的 offset;
HW:指的是消費者能見到的最大的 offset,ISR 隊列中最小的 LEO。
HW控制所有副本數據一致性

(1)follower 故障
  follower 發生故障后會被臨時踢出 ISR,待該 follower 恢復后,follower 會讀取本地磁盤記錄的上次的HW,並將 log 文件高於 HW 的部分截取掉,從 HW 開始向 leader 進行同步。等該 follower 的 LEO 大於等於該Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。

(2)leader 故障
  leader 發生故障之后,會從 ISR 中選出一個新的 leader,之后,為保證多個副本之間的數據一致性,其余的follower 會先將各自的 log 文件高於 HW 的部分截掉,然后從新的 leader同步數據。

注意:這只能保證副本之間的數據一致性,並不能保證數據不丟失或者不重復。

5.3.8 Exactly Once(既不重復也不丟失)語義/啟動冪等性

acks:0  At Most Once(最多一次),生產者發送數據最多發送一次,不接受ack。可以保證數據不重復,但是不能保證數據不丟失。

acks:-1 At least Once(最少一次),生產者發送數據最少一次,接受到ack為止,可能多次發送,造成數據重復。可以保證不丟數據,但是不能保證數據不重復。

Exactly Once(既不重復也不丟失)語義:0.11版本以前kafka對此無能為力,只能保證數據不丟失,然后在下游(消費者)對數據做全局去重,對性能造成很大的影響。

  0.11 版本的 Kafka,引入了一項重大特性:冪等性。所謂的冪等性就是指Producer 不論向 Server 發送多少次重復數據,Server 端都只會持久化一條。冪等性結合 At Least Once 語義,就構成了Kafka 的Exactly Once 語義。
即:At Least Once + 冪等性 = Exactly Once

啟動冪等性:將 Producer 的參數中 enable.idompotence 設置為 true 即可

kafka冪等性實現方式:將原來下游需要做的去重放在了數據上游。開啟冪等性的 Producer 在初始化的時候會被分配一個 PID,發往同一 Partition 的消息會附帶 Sequence Number。而Broker 端會對<PID, Partition, SeqNumber>做緩存,當具有相同主鍵的消息提交時,Broker只會持久化一條。
但是PID 重啟就會變化,同時不同的 Partition 也具有不同主鍵,所以冪等性無法保證跨分區跨會話的Exactly Once。

5.4消費者

5.4.1 消費方式(pull拉,timeout)

consumer 采用 pull(拉)模式從 broker 中讀取數據。
因為push(推)模式很難適應消費速率不同的消費者,因為消息發送速率是由 broker 決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式則可以根據 consumer 的消費能力以適當的速率消費消息。

pull 模式不足之處是, 如果 kafka 沒有數據,消費者可能會陷入循環中,一直返回空數據。針對這一點,Kafka 的消費者在消費數據時會傳入一個時長參數 timeout,如果當前沒有數據可供消費,consumer 會等待一段時間之后再返回,這段時長即為timeout。

5.4.2 offset的維護

  由於consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復后,需要從故障前的位置的繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復后繼續消費。

//offset存在zookeeper的位置
[zk: localhost:2181(CONNECTED) 10] get /consumers/atguigu/offsets/topicName/0
18

Kafka 0.9 版本之前,consumer 默認將 offset 保存在 Zookeeper 中,從 0.9 版本開始, consumer 默認將 offset 保存在 Kafka 一個內置的 topic 中,該 topic 為 consumer_offsets。

1)修改配置文件consumer.properties
exclude.internal.topics=false

5.4.3 消費者組

  一個消費者組中有多個消費者,一個topic(隊列)有多個partition(分區),所以必然會涉及到partition的分配問題,即確定那個partition由哪個consumer來消費。
  注意:一個消費者組,同時只有一個消費者消費

配置文件:vi /opt/module/kafka/config/consumer.properties
默認:group.id=test-consumer-group 每創建一個消費者隨機創建一個組
修改:group.id=bsBike  修改任意組名,創建的消費者都在這個組里面

// 必須指定--consumer.config剛剛修改的配置文件
創建2個同組消費者:[root@sg-16 bin]# ./kafka-console-consumer.sh --topic topicName --zookeeper 192.168.0.215:2181 --consumer.config ../config/consumer.properties

啟動生產者:[root@sg-15 config]# ./kafka-console-producer.sh --topic topicName -broker-list 192.168.0.215:9092

驗證:同一時刻只有一個消費者消費數據。

5.5 kafka為什么高效讀寫數據

kafka高效的原因有:
1.分布式的(有分區概率,可以並發讀寫,但是單機器效率也很高)
2.順序讀寫磁盤
3.零復制技術(零拷貝技術)

順序讀寫磁盤

  Kafka 的 producer 生產數據,要寫入到log數據文件中,寫的過程是一直追加到文件末端,為順序寫。官網有數據表明,同樣的磁盤,順序寫能到 600M/s,而隨機寫只有 100K/s。這與磁盤的機械機構有關,順序寫之所以快,是因為其省去了大量磁頭尋址的時間。

 順序寫:相當於開辟一塊空間,每次都寫在磁盤的同一個扇區
 隨機寫:每次寫在磁盤的不同扇區,需要花費大量尋址時間

零拷貝技術

a.txt修改為b.txt:
普通情況(圖1):用戶通過代碼調用c語言,讀取a.txt文件,再通過網絡寫入b.txt文件
零拷貝技術(圖2):用戶發送給操作系統指令,由操作系統直接把a.txt修改為b.txt

圖1

圖2

5.6 kafka事務

  Kafka 從 0.11 版本開始引入了事務支持。事務可以保證 Kafka 在Exactly Once 語義的基礎上,生產和消費可以跨分區和會話,要么全部成功,要么全部失敗。

5.6.1 Producer 事務

  為了實現跨分區跨會話的事務,需要引入一個全局唯一的 Transaction ID,並將 Producer 獲得的PID 和Transaction ID 綁定。這樣當Producer 重啟后就可以通過正在進行的Transaction ID 獲得原來的PID。
  為了管理Transaction,Kafka 引入了一個新的組件Transaction Coordinator。Producer 就是通過和 Transaction Coordinator 交互獲得 Transaction ID 對應的任務狀態。Transaction Coordinator 還負責將事務所有寫入 Kafka 的一個內部 Topic,這樣即使整個服務重啟,由於事務狀態得到保存,進行中的事務狀態可以得到恢復,從而繼續進行。

5.6.2 Consumer 事務

   上述事務機制主要是從Producer 方面考慮,對於 Consumer 而言,事務的保證就會相對較弱,尤其時無法保證 Commit 的信息被精確消費。這是由於 Consumer 可以通過 offset 訪問任意信息,而且不同的 Segment File 生命周期不同,同一事務的消息可能會出現重啟后被刪除的情況。

6. kafka API

6.1 消息發送流程(攔截器->序列化器->分區器)

  Kafka 的 Producer 發送消息采用的是異步發送的方式。在消息發送的過程中,涉及到了兩個線程——main線程和Sender線程,以及一個線程共享變量——RecordAccumulator。main 線程將消息發送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發送到Kafka broker。

相關參數:
batch.size:只有數據積累到 batch.size 之后,sender 才會發送數據。
linger.ms:如果數據遲遲未達到 batch.size,sender 等待 linger.time 之后就會發送數據。

分區器:hash計算分區分區號,決定寫入的分區
序列化器:生產者把對象轉換成字節數組才能通過網絡發送給Kafka,消費者需要反序列化
攔截器:在消息發送前做一些准備工作,比如按照某個規則過濾不符合要求的消息、修改消息的內容等
RecordAccumulator:數據共享區

處理順序:攔截器-->序列化器-->分區器

6.2 生產者API

電腦本地添加hosts內容:sudo vi /etc/hosts
192.168.0.215 sg-15
192.168.0.216 sg-16
192.168.0.217 sg-17

生產者API

官網:https://www.topgoer.cn/docs/golang/chapter10-5-4
Go語言中連接kafka使用第三方庫: github.com/Shopify/sarama。使用v1.19.0版本
下載安裝:go get github.com/Shopify/sarama@v1.19.0

連接kafka發送消息

package main

import (
	"fmt"

	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
  // 回復ack模式
	config.Producer.RequiredAcks = sarama.WaitForAll          // 發送完數據需要leader和follow都確認
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個partition
	config.Producer.Return.Successes = true                   // 成功交付的消息將在success channel返回

	// 構造一個消息
	msg := sarama.ProducerMessage{}
	msg.Topic = "topicName"
	msg.Value = sarama.StringEncoder("this is a test log2")
	// 連接kafka
	client, err := sarama.NewSyncProducer([]string{"192.168.0.216:9092"}, config)
	if err != nil {
		fmt.Println("producer closed",err)
		return
	}
	defer client.Close()
	// 發送消息
	pid, offset, err := client.SendMessage(&msg)
	if err != nil {
		fmt.Println("send msg failed:",err)
		return
	}
	fmt.Println("pid:",pid)
	fmt.Println("offset:",offset)
}

6.2 消費者API

6.2.1 創建所有分區消費

demo:

package main

import (
	"encoding/json"
	"fmt"
	"sync"

	"github.com/Shopify/sarama"
)

// kafka 消費者
func main() {
	var wg sync.WaitGroup
	consumer, err := sarama.NewConsumer([]string{"192.168.0.215:9092"}, nil)
	topicName := "topicName"
	if err != nil {
		fmt.Printf("fail to start consumer, err:%v\n", err)
		return
	}
	partitionList, err := consumer.Partitions(topicName) // 根據topic取到所有的分區
	if err != nil {
		fmt.Printf("fail to get list of partition:err%v\n", err)
		return
	}
	fmt.Println(partitionList)
	for partition := range partitionList { // 遍歷所有的分區
		// 針對每個分區創建一個對應的分區消費者
		pc, err := consumer.ConsumePartition(topicName, int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		// 異步從每個分區消費信息
		go func(sarama.PartitionConsumer) {
			wg.Add(1)
			for msg := range pc.Messages() {
				a,_:=json.Marshal(msg)
				fmt.Println(string(a))
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
			}
		}(pc)
	}
	wg.Wait() //等待
}
結果:
[0 1]
{"Key":null,"Value":"dGhpcyBpcyBhIHRlc3QgbG9nMg==","Topic":"topicName","Partition":1,"Offset":34,"Timestamp":"0001-01-01T00:00:00Z","BlockTimestamp":"0001-01-01T00:00:00Z","Headers":null}
Partition:1 Offset:34 Key:[] Value:this is a test log2

6.2.2 創建指定分區消費

demo:

package main

import (
	"encoding/json"
	"fmt"
	"sync"

	"github.com/Shopify/sarama"
)

// kafka consumer

func main() {
	var wg sync.WaitGroup
	wg.Add(1)

	consumer, err := sarama.NewConsumer([]string{"192.168.0.215:9092"}, nil)
	topicName := "topicName"
	if err != nil {
		fmt.Printf("fail to start consumer, err:%v\n", err)
		return
	}
	partitionList, err := consumer.Partitions(topicName) // 根據topic取到所有的分區
	if err != nil {
		fmt.Printf("fail to get list of partition:err%v\n", err)
		return
	}
	fmt.Println(partitionList) //[0 1]
	// 針對其中一個分區創建一個對應的分區消費者
	partition := partitionList[0]
	pc, err := consumer.ConsumePartition(topicName, int32(partition), sarama.OffsetNewest)
	if err != nil {
		fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
		return
	}
	defer pc.AsyncClose()
	// 異步從每個分區消費信息
	go func(sarama.PartitionConsumer) {
		for msg := range pc.Messages() {
			a, _ := json.Marshal(msg)
			fmt.Println(string(a))
			fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
		}
	}(pc)
	wg.Wait() //等待
}
結果:
[0 1]
{"Key":null,"Value":"dGhpcyBpcyBhIHRlc3QgbG9nMg==","Topic":"topicName","Partition":0,"Offset":29,"Timestamp":"0001-01-01T00:00:00Z","BlockTimestamp":"0001-01-01T00:00:00Z","Headers":null}
Partition:0 Offset:29 Key:[] Value:this is a test log2

6.2.3 創建消費者組

同一個組中,同時只有一個消費者消費同一個topic的數據。避免消費者重復消費數據

一個分區只能被同一個消費組的一個消費者消費,如果想要一個分區被多個消費者消費,可以使用多個消費者組
經測試:一個消費者組下的消費者數量<=topic分區數

demo:

package main

import (
	"fmt"
	"os"
	"os/signal"
	_ "regexp"

	cluster "github.com/bsm/sarama-cluster"
)

var Address = []string{"192.168.0.215:9092"}
var Topic = "topicName"

func main() {
  // 消費者組名:demo1
	go syncConsumer("demo1")
	select {}
}

//消費者組
func syncConsumer(groupName string) {
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true

	// init consumer
	//可以訂閱多個主題
	topics := []string{Topic}
	consumer, err := cluster.NewConsumer(Address, groupName, topics, config)
	if err != nil {
		panic(err)
	}
	//這里需要注意的是defer 一定要在panic 之后才能關閉連接
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume errors
	go func() {
		for err = range consumer.Errors() {
			fmt.Println("err:",err)
		}
	}()

	// consume notifications
	go func() {
		for ntf := range consumer.Notifications() {
			fmt.Println("消費通知:",ntf)
		}
	}()

	// 循環從通道中獲取message
	//msg.Topic 消息主題
	//msg.Partition  消息分區
	//msg.Offset
	//msg.Key
	//msg.Value 消息值
	for {
		select {
		case msg, ok := <-consumer.Messages():
			if ok {
				fmt.Printf("%s receive message %s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", groupName, msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
				consumer.MarkOffset(msg, "") // 上報offset
			}
		case err := <-consumer.Errors():
			{
				fmt.Println(fmt.Sprintf("consumer error:%v", err))
			}
		case <-signals:
			return
		}
	}
}

7 . kafka-eagle監控

7.1 下載安裝

http://www.kafka-eagle.org/articles/docs/changelog/changelog.html
此版本下載1.3.7
下載到/opt/software/目錄中

//解壓
解壓:tar -xf kafka-eagle-bin-1.3.7
再解壓(因為包了兩層):
cd /opt/software/kafka-eagle-bin-1.3.7
tar -xf kafka-eagle-web-1.3.7-bin.tar.gz -C /opt/module/

//改名為eagle
cd /opt/module
mv kafka-eagle-web-1.3.7 eagle

7.2 修改配置

//修改kafka啟動命令,開啟jmx端口,幫助eagle抓取kafka中的數據,修改kafka中bin/kafka-server-start.sh文件
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export JMX_PORT="9999"  //添加
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" 
fi

注意:kafka所有機器的啟動文件都要修改,java環境變量也要配置
//添加環境變量,因為kafka-eagle啟動會讀環境變量
export KE_HOME=/opt/module/eagle
export PATH=$PATH:$KE_HOME/bin
[root@sg-15 eagle]# source /etc/profile  //刷新環境變量

//添加執行權限
[root@sg-15 bin]# chmod 777 /opt/module/eagle/bin/ke.sh

// conf目錄下文件:
log4j.properties //配置日志文件
system-config.properties //系統配置

// 系統配置,修改/opt/module/eagle/conf/system-config.properties文件
注意:eagle框架可以支持多套kafka集群,用","隔開即可
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.0.215:2181,192.168.0.216:2181,192.168.0.217:2181
cluster2.zk.list刪掉 //刪掉這行
kafka.eagle.webui.port=8048 //ui端口,如果和其他的程序沖突就改
cluster1.kafka.eagle.offset.storage=kafka //保留,因為0.11版本之后offset寫在kafka本地
cluster2.kafka.eagle.offset.storage=zookeeper //刪除這行,如果是監控的0.11版本之前的kafka集群刪除上面一行

kafka.eagle.metrics.charts=true //改為true,監控圖表,否則頁面看不見圖表

//監控異常,發送的郵箱。需要即配置
kafka.eagle.mail.enable=false
kafka.eagle.mail.sa=jeff@163.com
kafka.eagle.mail.username=alert_sa@163.com
kafka.eagle.mail.password=mqslimczkdqabbbh
kafka.eagle.mail.server.host=smtp.163.com
kafka.eagle.mail.server.port=25

//存一些元數據信息,一般存mysql中,需要即配置
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://192.168.0.215:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456

注意:會在mysql中自動創建ke庫

7.3 啟動kafka-eagle

[root@sg-15 bin]# ./ke.sh start //啟動
[root@sg-15 bin]# ./ke.sh status //查看狀態

ke.sh [start|status|stop|restart|stats]

如果啟動失敗,查看日志:/opt/module/eagle/logs

啟動成功

web頁面:

7.4 kafka-eagle刪除需要的token

//在配置文件中/opt/module/eagle/conf/system-config.properties
kafka.eagle.topic.token=keadmin


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM