前因
今天在調應用的時候,email-service服務無法收發郵件。
看到這種問題,我先去查了一下email-service的日志,看到了kafka的一個報錯:
Group coordinator lookup for group email-service failed
然后我就去查資料,解決這個問題。
解決
首先,我想先去清空一下這個email-service消費組的offset,但是它仍然報出了coordinator failed這種字樣的錯誤。那我就想了,kafka的這個coordinator(協調器)到底是什么東西,是怎么工作的。
害,先不管這個了,畢竟要先排掉錯誤嘛,原理放在后邊說。
於是我重啟了應用,再去查詢email-service的日志時,發現這個錯誤:
Group coordinator lookup for group email-service failed: The coordinator is not available
於是我去查詢kafka的配置文件,果然發現了問題。
在配置文件中(默認為config/server.properties), 將這幾項參數修改成下面的樣子:
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
然后重啟kafka,報錯消除。
解釋
之前的文章拋出了一個問題,什么是協調器(coordinator)。
顧名思義,協調器(coordinator)就是負責協調工作。簡單點說,就是消費者啟動后,到可以正常消費前,這個階段的初始化工作。消費者可以運行起來,全靠協調器的工作。
主要的協調器有以下兩種:
- 消費者協調器
- 組協調器
kafka引入協調器有其歷史過程,原來consumer信息依賴於zookeeper存儲,當代理或消費者發生變化時,引發消費者平衡,此時消費者之間是互不透明的,每個消費者和zookeeper單獨通信,容易造成羊群效應和腦裂問題。
為了消除此問題,kafka引入了協調器。
-
服務端引入了組協調器(GroupCoordinator),消費者端引入消費者協調器(ConsumerCoordinator)。
-
每個broker啟動的時候,都會創建組協調器實例,管理部分消費組和組下每個消費者消費的偏移量。
-
每個消費者實例化時,同時實例化一個消費者協調器,負責同個消費組下各個消費者和服務端組協調器之間的通信。
由於zookeeper並不適合頻繁的寫入操作,從0.8.2版本開始kafka開始將consumer的位移信息寫入kafka內部的topic中,具體為"__consumer_offsets"。並且默認提供了kafka_consumer_groups.sh腳本,方便用戶查詢consumer group和consumer信息。
我們可以查看__consumer_offsets的內容,它包括三部分內容:
<Group ID,主題名,分區號>
PS: 這個__consumer_offsets是kafka內部的topic,外界無法直接讀取此topic里邊的信息。如果要讀取的話,需要先修改config/consumer.properties中的"exclude.internal.topics"參數,將其置為false,如下:
exclude.internal.topics=false
然后通過命令來查詢信息:
# 0.11.0.0之前版本
sh kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties
# 0.11.0.0之后版本(含)
sh kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties