版本介紹
目前最新版本為2.3(20190808更新)。
從使用上來看,以0.9為分界線,0.9開始不再區分高級(相當於mysql binlog的GTID,只需要跟topic打交道,服務器自動管理偏移量和負載均衡)/低級消費者API(相當於mysql binlog的文件+position,直接和分區以及偏移量打交道)。
從兼容性上來看,以0.8.x為分界線,0.8.x不兼容以前的版本。
總體拓撲架構
從上可知:
0、一個kafka集群由N個broker組成,至少一個(單機),其中一個broker是controller(其職責見下文);集群中包含很多topic,topic可以分區,每個分區可以配置多個副本,每個副本肯定在不同的broker,其中一個副本為leader(寫),其它為follower,可用於讀(跟couchbase架構是一樣的)。
1、生產者不需要訪問zookeeper(0.8.x版本的kafka consumer直連zk得到偏移量信息,之后的版本直接從cluster獲取,所以這兩個版本的API並不兼容,上圖是0.8x的結構,0.9.x以及之后略有失真)。
2、消費者fetch消息、生產者發布消息總是向leader節點發請求,不會發送給follower(broker之間,而不是broker和客戶端之間協調復制)。
3、和rocketmq一樣,為了線性提高性能,每個topic被分為partition(跟數據庫的分庫分表一樣的道理,對業務而言透明,屬於技術策略,不是業務策略。分區可以輪詢,也可以基於消息的key應用分區算法。),每個partition只能被相同消費組的任何一個成員消費(所以如果topic中的message不要求有序消費的話,partition是在大流量下提升性能的關鍵機制),topic的中分區parition的數量(默認是1)可通過./kafka-topics.sh –zookeeper localhost:2181 -alter –partitions 5 –topic userService修改,可以進入 /tmp/kafka-logs 目錄下進行查看,其合理值的設置可以參考https://blog.csdn.net/kwengelie/article/details/51150114。
4、kafka 0.8.x使用zk存儲每個consumer-group在每個topic的每個partition中的點位(每個消息都有一個offset,且在分區內單調遞增),0.9版本開始存儲在專門的topic中,該topic名為"__consumer_offset",這樣consumer-group+topic就確定了點位,便於隨時可恢復運行,采用日志壓縮存儲,也就是僅存儲每個key的最新值,而非所有。
5、每個topic本地有一個local log,broker會持續順序寫入。
6、每條消息可以有key,也可以沒有。有的話,用於確定消息發往哪個parition,否則就是輪詢機制,java中是對key應用hash(實際為了重復消費的問題,一般會設置key),每個分區內的記錄是保證有序的,所以選擇合適的key能夠將串行轉為並行,這個需要非常理解業務邏輯要求,很多時候,嚴格遞增並非必須(OLTP更是如此,可以根據產品、客戶、商家、甚至某一次活動),只是實現簡單而已。需要記住的是:是生產者而非broker決定去哪個分區。
7、在replicas模式下,一致性遵循的是全一致性模式,而非過半模式,如下:
ISRs見下文所述。一個topic中的不同parition可以為不同broker中的leader,這種模式可以提高性能,因為讀寫都是leader負責。committed記錄所在的截止位置也成為高水位"High Watermark"。雖然使用角度不直接care,但是partition是HA和擴展性的真正落地之處。
kafka自身整體架構
這里要提及的是controller,其中一個broker會被作為controller,controller主要負責處理kafka集群范圍內的事件,包括leader選舉、topic變化、paritions副本數跟蹤、broker的變化等,主要和zk通信,這根zk的主節點還不同,更像是Hadoop的nameserver,只負責元數據的全局管理。
kafka controller架構如下:
其職責可以參考https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals。對controller工作原理的解析,可以參考https://www.cnblogs.com/huxi2b/p/6980045.html,這篇文章總結的還是比較到位的。
綜合起來,kafka的核心要素包括:brokers(所有的進程都是broker), topics, logs, partitions, controller, message, cluster。
broker內部機制
消息發送過程
那客戶端是怎么知道哪個broker是leader呢?因為每個broker都緩存了元數據,所以在連接初始建立的時候,客戶端可以從任何一個broker獲取每個topic的元數據信息,如下:
消息消費過程
核心高級API(不允許用戶控制消費者和broker的交互過程)
- ConsumerConnector
- KafkaStream
- ConsumerConfig
低級API則允許控制各個交互過程,比如從哪里開始讀以及在客戶端維護點位,rocketmq實現其實采用的就是高層和底層結合的API,也就是kafka 0.9之后合並的api版本。
底層API的主要接口是SimpleConsumer。
消費者group和partition的關系
每個消費者group會記錄自己在每個分區中的消費進度(該信息記錄在專門的topic log中,見上文)。一個分區只能由被每個消費者group中的任意一個消費者成員消費,因為一般情況下微服務都是集群部署,所以這會導致N-1個微服務節點中的topic listener空跑,這是需要注意的,但是如果當前消費者所在的服務掛了,kafka會自動選擇其中一個剩下的consumer,但是如果已經消費但是ack未被kafka收到,其它consumer接管時就會重復消費,要注意冪等。想要一個topic被消費者group中的成員並行消費的話,就需要配置不低於集群成員數的partition。簡單的說,就是管理粒度是消費者組(在其他MQ中稱訂閱者)和topic,底層消息接收粒度分區和消費者。
不僅集群微服務可以從多partition受益,單JVM也可以收益,只要啟動多個獨立的線程,每個線程都作為topic的consumer就可以並發處理,這主要用於SMP服務器的時候,所以當消息處理需要一定時間或消息TPS大的時候,都應該使用多parition。
幾個關鍵的消息點位
消息堆積是消費滯后(Lag)的一種表現形式,消息中間件服務端中所留存的消息與消費掉的消息之間的差值即為消息堆積量,也稱之為消費滯后(Lag)量。對於Kafka而言,消息被發送至Topic中,而Topic又分成了多個分區(Partition),每一個Partition都有一個預寫式的日志文件,雖然Partition可以繼續細分為若干個段文件(Segment),但是對於上層應用來說可以將Partition看成最小的存儲單元(一個由多個Segment文件拼接的“巨型文件”)。每個Partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到Partition中。
上圖中有四個概念:
1.LogStartOffset:表示一個Partition的起始位移,初始為0,雖然消息的增加以及日志清除策略的影響,這個值會階段性的增大。
2.ConsumerOffset:消費位移,表示Partition的某個消費者消費到的位移位置。
3.HighWatermark:簡稱HW,代表消費端所能“觀察”到的Partition的最高日志位移,HW大於等於ConsumerOffset的值。
4.LogEndOffset:簡稱LEO, 代表Partition的最高日志位移,其值對消費者不可見。比如在ISR(In-Sync-Replicas)副本數等於3的情況下(如下圖所示),消息發送到Leader A之后會更新LEO的值,Follower B和Follower C也會實時拉取Leader A中的消息來更新自己,HW就表示A、B、C三者同時達到的日志位移,也就是A、B、C三者中LEO最小的那個值。由於B、C拉取A消息之間延時問題(這就涉及到發送可靠性問題,見下文),所以HW必然不會一直與Leader的LEO相等,即LEO>=HW。 所以,實際可能是這樣的:
要計算Kafka中某個消費者的滯后量很簡單,首先看看其消費了幾個Topic,然后針對每個Topic來計算其中每個Partition的Lag,每個Partition的Lag計算就顯得非常的簡單了,參考下圖:
由圖可知消費Lag=HW - ConsumerOffset。
ISRs
ISR是指當前同步副本集中的成員,如果leader失敗,其中一個ISR會被選為的新的leader。
Topic Log Compaction
kafka的topic log會持續增長,所以為了保持穩定,應該定期回收。這涉及到兩方面:消息的key是否會相同,它們的策略是不同的。Log Compaction主要用於key會相同的情況,也就是非UUID作為消息的鍵,否則就沒有意義了。其機制是根據消息保留的時間或文件大小來刪除key相同的歷史value,如下所示:
可知,歷史版本被清了。啟用compact后,topic log分為了head和tail部分,只有tail的才會被壓縮,但是刪除還要根據其它配置決定,如下。
kafka參數min.compaction.lag.ms控制消息至少過多久才會被壓縮,delete.retention.ms控制多久會被刪除,log.cleanup.policy=compact控制啟用壓縮,所以消費者只要在此之內進行消費,就可以保證至少看到最新記錄(因為producer可能又寫入了,所以至少會看到最新,也許更多)。
消息保留時長
每個topic可以基於時間或topic log的大小聲明消息的保留時間,由下列參數決定:
屬性名 | 含義 | 默認值 |
---|---|---|
log.cleanup.polict | 日志清理保存的策略只有delete和compact兩種 | delete |
log.retention.hours | 日志保存的時間,可以選擇hours,minutes和ms | 168(7day) |
log.retention.bytes | 刪除前日志文件允許保存的最大值(任意一個達到都會執行刪除) | -1 |
log.segment.delete.delay.ms | 日志文件被真正刪除前的保留時間 | 60000 |
log.cleanup.interval.mins | 每隔一段時間多久調用一次清理的步驟 | 10 |
log.retention.check.interval.ms | 周期性檢查是否有日志符合刪除的條件(新版本使用) | 300000 |
ACK一致性級別
生產者(現在面試,我們都問如何保證發出的消息不丟失)可以通過ack設置數據一致性要求(和mysql機制類似)。ack=0(不需要ACK,至多一次), ack=all(leader和所有follows都寫入成功,默認), ack=1(leader成功即可)。
可以通過在producer properties中設置,如下:
早期版本的生產者不支持“精確一次”的概念,從Kafka 0.11.0支持精確一次投遞概念,它是通過引入生產者消息冪等+原子事務概念實現的,可以參考https://dzone.com/articles/exactly-once-semantics-with-apache-kafka-1。
在消費者層面,kafka支持至多一次和至少一次兩種模式。
To implement “at-most-once” consumer reads a message, then saves its offset in the partition by sending it to the broker, and finally process the message. The issue with “at-most-once” is a consumer could die after saving its position but before processing the message. Then the consumer that takes over or gets restarted would leave off at the last position and message in question is never processed.
To implement “at-least-once” the consumer reads a message, process messages, and finally saves offset to the broker. The issue with “at-least-once” is a consumer could crash after processing a message but before saving last offset position. Then if the consumer is restarted or another consumer takes over, the consumer could receive the message that was already processed. The “at-least-once” is the most common set up for messaging, and it is your responsibility to make the messages idempotent, which means getting the same message twice will not cause a problem (two debits).
To implement “exactly once” on the consumer side, the consumer would need a two-phase commit between storage for the consumer position, and storage of the consumer’s message process output. Or, the consumer could store the message process output in the same location as the last offset.
kafka僅支持前兩種消費者ACK,第三種需要用戶自己實現,一般大家都是用第二種+冪等來實現,也就是消費者自身的一致性,通過冪等+ACK保證,就不重復闡述了。
通過如下可以保證手工管理ack提交:
props.put("enable.auto.commit", "false"); try { while (running) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) System.out.println(record.offset() + ": " + record.value()); try { consumer.commitSync(); } catch (CommitFailedException e) { // application specific failure handling } } } finally { consumer.close(); }
在自動提交模式下,提交間隔由auto.commit.interval.ms確定。各種提交模式的使用可以參考https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/。
Kafka的生態
MirrorMaker是kafka集群之間同步的組件,本質上是一個生產者+消費者,如下:
如上所示,它已經到V2版本(要求kafka 2.4),V2相比V1而言,最重要的是解決了兩個集群之間消費者點位的自動換算問題,而不需要依賴於kafka的時間戳來計算(這一步非常關鍵,否則災備切換成本過高、且容易出錯)。內部架構如下:
上圖在實際部署中其實是不太正確的,consumer(應用)肯定是兩個機房都部署了,且雙活存在。這時候就有個問題了,__consumer_offset和業務topic本身到達的先后問題。如果業務先,__consumer_offset后,那么災備中心的應用讀取到的就是老的offset,這樣就會重復消費。如果__consumer_offset先,也有個問題,超過了實際的長度,則會重置為實際的offset,后到的業務topic也會被重復消費。所以作為應用消費者這里,冪等極為重要,其次作為topic設計者,確定message的key唯一非常重要。如果延時太大,導致的問題可能是大量的重復消息要丟棄,進而影響RTO。
除此之外,Kafka包括所有MQ在災備時還存在一個很嚴重的問題,依賴於MQ實現最終一致性的破壞,通常對於最終一致性,依賴MQ可以極大的提升可靠性和性能、並解耦微服務,如果消息丟了,那么意味着最終一致性的破壞,需要依賴於微服務間的對賬機制。
Kafka REST Proxy and Confluent Schema Registry
kafka在zk中的存儲
為了顯示方便,LZ設置了chroot為localKakfa,如下:
各個zk節點的含義如下示意圖所示(之前版本的kafka,consumer-offset也維護在zk中),其中kafka01就是chroot,在kafka的server.properties中設置,加載zookeeper.connect后即可。如zookeeper.connect=localhost:2181/localKafka。
kafka監控管理
主流的幾種kafka監控程序分別為:
- 1、Kafka Web Conslole,從https://github.com/claudemamo/kafka-web-console下載源碼編譯,https://pan.baidu.com/s/1cJ2OefPqlxfWzTHElG6AjA 下載編譯好的,提取碼:f8xz,如果是測試和開發、為了排查方便,推薦使用它,其缺陷參見https://blog.csdn.net/qq_33314107/article/details/81099091。
[root@node1 bin]# sh kafka-web-console
Play server process ID is 4154
[info] play - database [default] connected at jdbc:h2:file:play
[warn] play - Your production database [default] needs evolutions!
INSERT INTO settings (key_, value) VALUES ('PURGE_SCHEDULE', '0 0 0 ? * SUN *');
INSERT INTO settings (key_, value) VALUES ('OFFSET_FETCH_INTERVAL', '30');
[warn] play - Run with -DapplyEvolutions.default=true if you want to run them automatically (be careful)
Oops, cannot start the server.
@74e0p173o: Database 'default' needs evolution!
at play.api.db.evolutions.EvolutionsPlugin$$anonfun$onStart$1$$anonfun$apply$1.apply$mcV$sp(Evolutions.scala:484)
at play.api.db.evolutions.EvolutionsPlugin.withLock(Evolutions.scala:507)
at play.api.db.evolutions.EvolutionsPlugin$$anonfun$onStart$1.apply(Evolutions.scala:461)
at play.api.db.evolutions.EvolutionsPlugin$$anonfun$onStart$1.apply(Evolutions.scala:459)
at scala.collection.immutable.List.foreach(List.scala:318)
at play.api.db.evolutions.EvolutionsPlugin.onStart(Evolutions.scala:459)
at play.api.Play$$anonfun$start$1$$anonfun$apply$mcV$sp$1.apply(Play.scala:88)
at play.api.Play$$anonfun$start$1$$anonfun$apply$mcV$sp$1.apply(Play.scala:88)
at scala.collection.immutable.List.foreach(List.scala:318)
at play.api.Play$$anonfun$start$1.apply$mcV$sp(Play.scala:88)
at play.api.Play$$anonfun$start$1.apply(Play.scala:88)
at play.api.Play$$anonfun$start$1.apply(Play.scala:88)
at play.utils.Threads$.withContextClassLoader(Threads.scala:18)
at play.api.Play$.start(Play.scala:87)
at play.core.StaticApplication.<init>(ApplicationProvider.scala:52)
at play.core.server.NettyServer$.createServer(NettyServer.scala:243)
at play.core.server.NettyServer$$anonfun$main$3.apply(NettyServer.scala:279)
at play.core.server.NettyServer$$anonfun$main$3.apply(NettyServer.scala:274)
at scala.Option.map(Option.scala:145)
at play.core.server.NettyServer$.main(NettyServer.scala:274)
at play.core.server.NettyServer.main(NettyServer.scala)
第一次啟動時要加個參數:
./kafka-web-console -DapplyEvolutions.default=true
- 2、Kafka Manager,也不錯,yahoo開發的,可用來監控流量。
鏈接:https://pan.baidu.com/s/1YMj-9HzoJLKDEY5C47aOlQ
提取碼:hhhr
- 3、KafkaOffsetMonitor
- 4、Kafdrop,新開發的,參見https://github.com/HomeAdvisor/Kafdrop。
以我們生產一般使用的KafkaOffsetMonitor為例,KafkaOffsetMonitor是Kafka的一款客戶端消費監控工具,用來實時監控Kafka服務的Consumer以及它們所在的Partition中的Offset,我們可以瀏覽當前的消費者組,並且每個Topic的所有Partition的消費情況都可以一目了然。KafkaOffsetMonitor托管在Github上,可以通過Github下載。下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releases,也可以從baidu網盤下載(內網的話,要使用這個,否則會缺少從cdn加載的js)。
可以通過java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.20.30.10:2181 --port 8088 --refresh 10.seconds --retain 2.days啟動,各配置含義可以參考github。
Kafka Rebalance機制(重要)
常見問題
如何通過java api獲取所有topic?
消費和如何一次性訂閱多個topic?
它適合所有topic具有相同的語義范圍時,可通過this.consumer.subscribe(Arrays.asList(topic));訂閱多個主題。比如緩存通常要求每個應用實例都消費、所以ip+port+app.group+app.version作為consumer-group-id比較合適,否則app.group+app.version作為consumer-group-id更合適。
如何查看所有的topic?
[root@hs-test-10-20-30-16 bin]# ./kafka-topics.sh --list --zookeeper 10.20.30.17:2191
__consumer_offsets
hs_ta_channel_Metadata
hsta_channelprocess20100513_aop-YOGA - marked for deletion
hsta_channelprocess20100514_aop-TA4 - marked for deletion
hsta_channelprocess20100530_aop-TA4
hsta_channelprocess20200118_aop-YOGA
hsta_channelprocessnull - marked for deletion
和連接--bootstrap-server出來的結果不一樣,如下:
[root@ta5host bin]# ./kafka-topics.sh --list --bootstrap-server 10.20.30.17:9092 __consumer_offsets hs_ta_channel_Metadata uft_individual_1 uft_individual_1_oracle uft_inst_1 uft_inst_1_oracle uft_spliter_1_oracle uft_spliter_2_oracle uft_splitter_1 uft_trade_1 uft_trade_1_bar uft_trade_1_oracle uft_trade_1_oracle_bar uft_trade_2_oracle uft_trade_2_oracle_bar
是不是哪里不對???
前者和kafka-manager的監控結果是一樣的,畢竟連接的是同一個zk。
要想知道哪些kafka集群連接本zk,可以通過netstat -ano | grep 2191找到客戶端。
查看特定topic的配置?
[root@hs-test-10-20-30-11 kafka]# bin/kafka-topics.sh --zookeeper 10.20.30.10:2181 --topic global --describe
Topic:global PartitionCount:1 ReplicationFactor:1 Configs:
Topic: global Partition: 0 Leader: 0 Replicas: 0 Isr: 0
[root@hs-test-10-20-30-10 bin]# ./kafka-topics.sh --bootstrap-server localhost:9092 --topic hs_ta_channel_Metadata --describe Topic:hs_ta_channel_Metadata PartitionCount:1 ReplicationFactor:1 Configs:min.insync.replicas=1,segment.bytes=1073741824,max.message.bytes=10485760 Topic: hs_ta_channel_Metadata Partition: 0 Leader: 0 Replicas: 0 Isr: 0
如何刪除topic?
刪除kafka topic及其數據,嚴格來說並不是很難的操作。但是,往往給kafka 使用者帶來諸多問題。項目組之前接觸過多個開發者,發現都會偶然出現無法徹底刪除kafka的情況。本文總結多個刪除kafka topic的應用場景,總結一套刪除kafka topic的標准操作方法。
step1:
如果需要被刪除topic 此時正在被程序 produce和consume,則這些生產和消費程序需要停止。
因為如果有程序正在生產或者消費該topic,則該topic的offset信息一致會在broker更新。調用kafka delete命令則無法刪除該topic。
同時,需要設置 auto.create.topics.enable = false,默認設置為true。如果設置為true,則produce或者fetch 不存在的topic也會自動創建這個topic。這樣會給刪除topic帶來很多意向不到的問題。
所以,這一步很重要,必須設置auto.create.topics.enable = false,並認真把生產和消費程序徹底全部停止。
step2:
server.properties 設置 delete.topic.enable=true
如果沒有設置 delete.topic.enable=true,則調用kafka 的delete命令無法真正將topic刪除,而是顯示(marked for deletion)
step3:
調用命令刪除topic:
./bin/kafka-topics --delete --zookeeper 【zookeeper server:port】 --topic 【topic name】
step4:
刪除kafka存儲目錄(server.properties文件log.dirs配置,默認為"/data/kafka-logs")相關topic的數據目錄。
注意:如果kafka 有多個 broker,且每個broker 配置了多個數據盤(比如 /data/kafka-logs,/data1/kafka-logs ...),且topic也有多個分區和replica,則需要對所有broker的所有數據盤進行掃描,刪除該topic的所有分區數據。
一般而言,經過上面4步就可以正常刪除掉topic和topic的數據。但是,如果經過上面四步,還是無法正常刪除topic,則需要對kafka在zookeeer的存儲信息進行刪除。具體操作如下:
(注意:以下步驟里面,kafka在zk里面的節點信息是采用默認值,如果你的系統修改過kafka在zk里面的節點信息,則需要根據系統的實際情況找到准確位置進行操作)
step5:
找一台部署了zk的服務器,使用命令:
bin/zkCli.sh -server 【zookeeper server:port】
登錄到zk shell,然后找到topic所在的目錄:ls /brokers/topics,找到要刪除的topic,然后執行命令:
rmr /brokers/topics/【topic name】
即可,此時topic被徹底刪除。
如果topic 是被標記為 marked for deletion,則通過命令 ls /admin/delete_topics,找到要刪除的topic,然后執行命令:
rmr /admin/delete_topics/【topic name】
備注:
網絡上很多其它文章還說明,需要刪除topic在zk上面的消費節點記錄、配置節點記錄,比如:
rmr /consumers/【consumer-group】
rmr /config/topics/【topic name】
其實正常情況是不需要進行這兩個操作的,如果需要,那都是由於操作不當導致的。比如step1停止生產和消費程序沒有做,step2沒有正確配置。也就是說,正常情況下嚴格按照step1 -- step5 的步驟,是一定能夠正常刪除topic的。
step6:
完成之后,調用命令:
./bin/kafka-topics.sh --list --zookeeper 【zookeeper server:port】
查看現在kafka的topic信息。正常情況下刪除的topic就不會再顯示。
但是,如果還能夠查詢到刪除的topic,則重啟zk和kafka即可。
如何查看所有的消費者組?
新的方式,也就是不是使用基於zk的客戶端(kafka.consumer.Consumer.createJavaConsumerConnector、內部是bootstrap)。
[root@hs-test-10-20-30-11 kafka]# bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.20.30.11:9092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
老的方式:基於zk的客戶端(kafka.javaapi.consumer.ZookeeperConsumerConnector,已經deprecated)。
[root@hs-test-10-20-30-11 kafka]# bin/kafka-consumer-groups.sh --zookeeper 10.20.30.10:2181 --list
Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).
AAA
TA50-Aggr-Logger-ConsumerGroup
console-consumer-23104
console-consumer-37858
查看一個消費者組的消費點位
[root@hs-test-10-20-30-10 bin]# ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-topics --group tabase-service-10.20.30.10:8383 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID hs_ta_channel_Metadata 0 4 4 0 consumer-1-b8c4c0c5-dbf3-4a55-8a9d-fda9d7b00c17 /10.20.30.10 consumer-1 [root@hs-test-10-20-30-10 bin]# ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-topics --group tabase-10.20.30.10:8080 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID hs_ta_channel_Metadata 0 4 4 0 consumer-1-1681831e-8c97-48bc-9c8d-21f2f7667aa9 /10.20.30.10 consumer-1
[root@hs-test-10-20-30-10 bin]# ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-topics --group tabase-10.20.30.10:8080 --verbose --state COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS 10.20.30.10:9092 (0) range Stable 1
生產者連接的時候報了下列錯誤
WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
有兩個原因:1、kafka沒有啟動;2、連接串使用了非conf/server.properties里面的LISTENERS參數的值。
log4j-kafka配置
增加jar包依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>0.11.0.3</version> </dependency>
配置log4j2.xml,如下:
logger增加kafka appender。
<Root level="INFO" additivity="false"> <AppenderRef ref="Console"/> <AppenderRef ref="KAFKA"/> <AppenderRef ref="app_error" /> </Root>
增加kafka appender。
<Appenders> <!-- 輸出錯誤日志到Kafka --> <Kafka name="KAFKA" topic="bomp"> <ThresholdFilter level="error" onMatch="DENY" onMismatch="NEUTRAL"/> <ThresholdFilter level="trace" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}:%4p %t (%F:%L) - %m%n" /> <Property name="bootstrap.servers">10.20.30.11:9092</Property> </Kafka> </Appenders>
這樣log4j配置kafka就完成了。對於c++,可以使用librdkafka庫,https://docs.confluent.io/2.0.0/clients/librdkafka/index.html,后續會專門出文講解。
相關異常
消費者報:
2018-09-17 14:10:07.768 WARN 130400 --- [r-finder-thread] kafka.client.ClientUtils$ : Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,10.20.30.11,9092)] failed
java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[kafka_2.12-0.11.0.3.jar:na]
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) ~[kafka_2.12-0.11.0.3.jar:na]
at kafka.producer.SyncProducer.doSend(SyncProducer.scala:79) ~[kafka_2.12-0.11.0.3.jar:na]
at kafka.producer.SyncProducer.send(SyncProducer.scala:124) ~[kafka_2.12-0.11.0.3.jar:na]
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:61) [kafka_2.12-0.11.0.3.jar:na]
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:96) [kafka_2.12-0.11.0.3.jar:na]
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:72) [kafka_2.12-0.11.0.3.jar:na]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) [kafka_2.12-0.11.0.3.jar:na]
解決方法:在server.properties里面設置下advertised.host.name,重啟試試看。參考https://stackoverflow.com/questions/30606447/kafka-consumer-fetching-metadata-for-topics-failed
zk日志中報:
2018-10-08 14:13:28,297 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100147743c10000 type:setData cxid:0xc8 zxid:0x53 txntype:-1 reqpath:n/a Error Path:/config/topics/uft_trade Error:KeeperErrorCode = NoNode for /config/topics/uft_trade
2018-10-08 14:13:28,302 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100147743c10000 type:create cxid:0xc9 zxid:0x54 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics
解決方法:待排查。
spring boot kafka客戶端在某虛擬機服務器(物理機一直運行未發生)上運行一段時間后,瞬間cpu system 80-90%,大量下列日志:
2018-10-09 13:54:57,713 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2682ms for sessionid 0x100175687960002 2018-10-09 13:54:57,904 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2672ms for sessionid 0x100175687960004 2018-10-09 13:54:58,621 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2675ms for sessionid 0x100175687960003 2018-10-09 13:54:57,232 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2700ms for sessionid 0x100175687960007 2018-10-09 13:55:09,812 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2672ms for sessionid 0x100175687960004, closing socket connection and attempting reconn ect 2018-10-09 13:55:02,942 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2702ms for sessionid 0x100175687960008 2018-10-09 13:55:09,755 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2675ms for sessionid 0x100175687960003, closing socket connection and attempting reconn ect 2018-10-09 13:55:09,789 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2682ms for sessionid 0x100175687960002, closing socket connection and attempting reconn ect 2018-10-09 13:55:18,677 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2675ms for sessionid 0x100175687960005 2018-10-09 13:55:11,752 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 20016ms for sessionid 0x100175687960001 2018-10-09 13:55:17,709 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2678ms for sessionid 0x100175687960006 2018-10-09 13:55:12,779 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2700ms for sessionid 0x100175687960007, closing socket connection and attempting reconn ect 2018-10-09 13:55:20,634 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2702ms for sessionid 0x100175687960008, closing socket connection and attempting reconn ect 2018-10-09 13:55:22,178 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 20016ms for sessionid 0x100175687960001, closing socket connection and attempting recon nect 2018-10-09 13:58:10,244 INFO ZkClient:713 - zookeeper state changed (Disconnected) 2018-10-09 13:58:10,240 INFO ZkClient:713 - zookeeper state changed (Disconnected) 2018-10-09 13:58:10,241 INFO ZkClient:713 - zookeeper state changed (Disconnected) 2018-10-09 13:58:10,240 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2675ms for sessionid 0x100175687960005, closing socket connection and attempting reconn ect 2018-10-09 13:58:10,243 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2678ms for sessionid 0x100175687960006, closing socket connection and attempting reconn ect 2018-10-09 13:58:11,107 INFO ZkClient:713 - zookeeper state changed (Disconnected) 2018-10-09 13:58:40,384 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 13:58:40,383 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 13:58:40,379 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 13:58:40,378 INFO ZkClient:713 - zookeeper state changed (Disconnected) 2018-10-09 13:58:40,378 INFO ZkClient:713 - zookeeper state changed (Disconnected) 2018-10-09 13:58:40,377 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 13:59:22,082 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 13:59:22,084 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 13:59:22,099 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 13:59:22,108 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 13:59:22,130 INFO ZkClient:713 - zookeeper state changed (Disconnected) 2018-10-09 13:59:23,382 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 13:59:23,412 INFO ZkClient:713 - zookeeper state changed (Expired) 2018-10-09 13:59:23,412 INFO ZkClient:713 - zookeeper state changed (Expired) 2018-10-09 13:59:23,443 INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@8646db9 2018-10-09 13:59:23,411 WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960001 has expired 2018-10-09 13:59:32,474 INFO ZkClient:713 - zookeeper state changed (Disconnected) 2018-10-09 13:59:23,404 WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960007 has expired 2018-10-09 13:59:23,390 INFO ZkClient:713 - zookeeper state changed (Expired) 2018-10-09 13:59:32,477 INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@4671e53b 2018-10-09 13:59:23,390 WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960008 has expired 2018-10-09 13:59:23,390 INFO ZkClient:713 - zookeeper state changed (Expired) 2018-10-09 13:59:32,477 INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@6a1aab78 2018-10-09 13:59:23,389 WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960004 has expired 2018-10-09 13:59:32,417 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 13:59:23,380 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 13:59:23,446 INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@dc24521 2018-10-09 13:59:41,829 INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960004 has expired, closing socket connection 2018-10-09 13:59:41,832 INFO ZkClient:936 - Waiting for keeper state SyncConnected 2018-10-09 13:59:41,829 INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960008 has expired, closing socket connection 2018-10-09 13:59:41,831 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 13:59:41,830 INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960007 has expired, closing socket connection 2018-10-09 13:59:41,830 INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960001 has expired, closing socket connection 2018-10-09 13:59:41,860 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 13:59:42,585 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 13:59:42,810 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 13:59:42,835 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 14:00:31,813 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 48978ms for sessionid 0x100175687960002 2018-10-09 14:00:31,825 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 49644ms for sessionid 0x100175687960005 2018-10-09 14:00:31,825 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 49644ms for sessionid 0x100175687960005, closing socket connection and attempting recon nect 2018-10-09 14:00:31,827 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 49968ms for sessionid 0x100175687960006 2018-10-09 14:00:31,827 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 49968ms for sessionid 0x100175687960006, closing socket connection and attempting recon nect 2018-10-09 14:00:31,842 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 50011ms for sessionid 0x100175687960003 2018-10-09 14:00:31,868 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 50011ms for sessionid 0x100175687960003, closing socket connection and attempting recon nect 2018-10-09 14:00:31,853 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 48978ms for sessionid 0x100175687960002, closing socket connection and attempting recon nect 2018-10-09 14:00:31,885 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 14:00:31,886 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 14:00:31,887 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 14:00:31,887 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 14:00:31,907 INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960001 2018-10-09 14:00:31,907 INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960008 2018-10-09 14:00:31,908 INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960004 2018-10-09 14:00:31,944 INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960007 2018-10-09 14:00:33,391 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 14:00:33,396 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 14:00:33,424 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 1336ms for sessionid 0x0 2018-10-09 14:00:33,430 INFO ClientCnxn:1299 - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x10017568796000b, negotiated timeout = 30000 2018-10-09 14:00:33,517 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 14:00:33,516 INFO ZkClient:713 - zookeeper state changed (SyncConnected) 2018-10-09 14:00:34,399 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 14:00:34,354 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 1336ms for sessionid 0x0, closing socket connection and attempting reconnect 2018-10-09 14:00:34,433 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 14:00:34,475 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 14:00:34,476 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 14:00:34,485 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 968ms for sessionid 0x0 2018-10-09 14:00:34,488 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 968ms for sessionid 0x0, closing socket connection and attempting reconnect 2018-10-09 14:00:37,472 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 14:00:37,484 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 14:00:37,487 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 14:00:37,488 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 14:00:37,489 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 14:00:37,479 WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960006 has expired 2018-10-09 14:00:37,495 INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960006 has expired, closing socket connection 2018-10-09 14:00:37,447 INFO ZkClient:713 - zookeeper state changed (Expired) 2018-10-09 14:00:37,479 INFO ZkClient:713 - zookeeper state changed (Expired) 2018-10-09 14:00:37,519 INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@69b0fd6f 2018-10-09 14:00:37,519 INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@4a87761d 2018-10-09 14:00:37,446 WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960005 has expired 2018-10-09 14:00:37,519 INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960005 has expired, closing socket connection 2018-10-09 14:00:37,765 INFO ZkClient:713 - zookeeper state changed (Expired) 2018-10-09 14:00:37,780 INFO ZkClient:713 - zookeeper state changed (Expired) 2018-10-09 14:00:37,780 WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960003 has expired 2018-10-09 14:00:37,791 INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960003 has expired, closing socket connection 2018-10-09 14:00:38,194 INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@3aeaafa6 2018-10-09 14:00:37,995 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 507ms for sessionid 0x0 2018-10-09 14:00:52,148 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 507ms for sessionid 0x0, closing socket connection and attempting reconnect 2018-10-09 14:00:38,198 INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@491cc5c9 2018-10-09 14:00:52,141 INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960006 2018-10-09 14:00:52,128 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 14:00:52,154 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 14:00:52,126 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 14:00:52,179 INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-10-09 14:00:38,010 WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960002 has expired 2018-10-09 14:00:52,231 INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960002 has expired, closing socket connection 2018-10-09 14:00:52,683 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 504ms for sessionid 0x0 2018-10-09 14:05:12,238 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 14:05:12,176 INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2018-10-09 14:08:21,078 INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960002 2018-10-09 14:05:12,113 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 259911ms for sessionid 0x10017568796000b 2018-10-09 14:08:21,107 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 259911ms for sessionid 0x10017568796000b, closing socket connection and attempting reco nnect 2018-10-09 14:05:12,098 INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960003 2018-10-09 14:00:52,677 WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 501ms for sessionid 0x0 2018-10-09 14:08:21,107 INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 501ms for sessionid 0x0, closing socket connection and attempting reconnect
經大概看了下帖子https://blog.csdn.net/xjping0794/article/details/77784171的內容,查看該段時間系統io,確實很高,高達50%,如下:
14時00分28秒 sda 3062.38 922268.58 670.77 301.38 5.17 1.71 0.16 49.44 14時00分28秒 ol-root 3111.77 922266.41 495.79 296.54 5.29 1.70 0.16 49.43 14時00分28秒 ol-swap 22.04 2.09 174.24 8.00 0.13 5.80 0.15 0.33 14時11分16秒 sda 5432.75 1537105.34 768.61 283.07 19.06 3.53 0.17 91.53 14時11分16秒 ol-root 5513.26 1537106.56 731.82 278.93 19.55 3.54 0.17 91.52 14時11分16秒 ol-swap 5.07 4.68 35.87 8.00 0.01 2.27 0.19 0.10 14時11分16秒 DEV tps rd_sec/s wr_sec/s avgrq-sz avgqu-sz await svctm %util 14時20分01秒 sda 2784.00 795332.59 462.60 285.85 10.89 3.93 0.18 50.09 14時20分01秒 ol-root 2827.44 795311.85 414.30 281.43 11.18 3.95 0.18 50.07 14時20分01秒 ol-swap 6.96 12.98 42.72 8.00 0.05 7.80 0.18 0.12 14時30分01秒 sda 3.13 12.42 59.59 23.04 0.00 0.57 0.44 0.14
但是這段時間沒有東西特別在運行,這就比較奇怪了,那會兒一下子也忘了用iotop看下是哪個進程所致。上述帖子提到的幾點是:
關於ZK日志存放,官網給出如下建議:
Having a dedicated log devicehas a large impact on throughput and stable latencies. It is highly recommenedto dedicate a log device and set dataLogDir to point to a directory on thatdevice, and then make sure to point dataDir to a directory not residing on thatdevice.
在ZOO.CFG中增加:
forceSync=no
默認是開啟的,為避免同步延遲問題,ZK接收到數據后會立刻去講當前狀態信息同步到磁盤日志文件中,同步完成后才會應答。將此項關閉后,客戶端連接可以得到快速響應(這一點在有BMU的服務器上問題不大)。
再看下zk服務器的日志,差不多時間開始出現大量CancelledKeyException:
2018-10-09 13:56:36,712 [myid:] - INFO [SyncThread:0:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:14926 which had sessionid 0x100175687960008 2018-10-09 13:56:43,857 [myid:] - INFO [SyncThread:0:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:14924 which had sessionid 0x100175687960006 2018-10-09 13:56:49,783 [myid:] - INFO [SyncThread:0:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:14919 which had sessionid 0x100175687960001 2018-10-09 13:56:49,816 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@236] - Ignoring unexpected runtime exception java.nio.channels.CancelledKeyException at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205) at java.lang.Thread.run(Thread.java:748) 2018-10-09 13:58:54,331 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23459 2018-10-09 13:58:54,377 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23459, probably expired 2018-10-09 13:58:54,401 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23485 2018-10-09 13:58:54,441 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23494 2018-10-09 13:58:56,314 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23459 which had sessionid 0x10017 5687960000 2018-10-09 13:58:56,336 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23485 2018-10-09 13:58:56,392 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23485, probably expired 2018-10-09 13:58:57,890 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23497 2018-10-09 13:58:59,480 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23485 which had sessionid 0x10017 5687960000 2018-10-09 13:59:00,383 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23494 2018-10-09 13:59:00,910 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23494, probably expired 2018-10-09 13:59:02,140 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23507 2018-10-09 13:59:03,286 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23497 2018-10-09 13:59:03,671 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23494 which had sessionid 0x10017 5687960000 2018-10-09 13:59:03,905 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23497, probably expired 2018-10-09 13:59:05,341 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@236] - Ignoring unexpected runtime exception java.nio.channels.CancelledKeyException at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205) at java.lang.Thread.run(Thread.java:748) 2018-10-09 13:59:06,862 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23511 2018-10-09 13:59:10,044 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23507 2018-10-09 13:59:10,267 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23497 which had sessionid 0x10017 5687960000 2018-10-09 13:59:10,285 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23507, probably expired 2018-10-09 13:59:10,286 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@236] - Ignoring unexpected runtime exception java.nio.channels.CancelledKeyException at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205) at java.lang.Thread.run(Thread.java:748) 2018-10-09 13:59:10,287 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23507 which had sessionid 0x10017 5687960000 2018-10-09 13:59:10,287 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23511 2018-10-09 13:59:10,287 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23511, probably expired 2018-10-09 13:59:10,313 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23519 2018-10-09 13:59:10,313 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23511 which had sessionid 0x10017 5687960000 2018-10-09 13:59:10,314 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23524 2018-10-09 13:59:10,314 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23519 2018-10-09 13:59:10,314 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23519, probably expired 2018-10-09 13:59:10,315 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23524 2018-10-09 13:59:10,315 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23519 which had sessionid 0x10017 5687960000 2018-10-09 13:59:10,316 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23524, probably expired 2018-10-09 13:59:10,321 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@236] - Ignoring unexpected runtime exception java.nio.channels.CancelledKeyException at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205) at java.lang.Thread.run(Thread.java:748)
上述帖子中提到在3.4.8中修復,我們用的3.4.12。進一步查找,有些提及寫日志延遲很大,例如“fsync-ing the write ahead log in SyncThread:0 took 8001ms which will adversely effect operation latency. See the ZooKeeper troubleshooting guide
”但是日志中並沒有看到該告警。決定加上forceSync=no試試看,參考https://www.jianshu.com/p/73eec030db86。
至於日志中的超時時間有長、有短,這是tickTime有關,可以解釋,不做詳細說明。
zk日志中大量下列錯誤信息:
id:0x9d zxid:0x42 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics 2018-10-09 12:01:07,918 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx id:0xa5 zxid:0x45 txntype:-1 reqpath:n/a Error Path:/brokers/topics/uft_individual/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/uft_individual/partitions/0 2018-10-09 12:01:07,921 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx id:0xa6 zxid:0x46 txntype:-1 reqpath:n/a Error Path:/brokers/topics/uft_individual/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/uft_individual/partitions 2018-10-09 12:01:17,740 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:setData c xid:0xaf zxid:0x4a txntype:-1 reqpath:n/a Error Path:/config/topics/uft_splitter Error:KeeperErrorCode = NoNode for /config/topics/uft_splitter 2018-10-09 12:01:17,741 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx id:0xb0 zxid:0x4b txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics 2018-10-09 12:01:17,753 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx id:0xb8 zxid:0x4e txntype:-1 reqpath:n/a Error Path:/brokers/topics/uft_splitter/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/uft_splitter/partitions/0 2018-10-09 12:01:17,754 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx id:0xb9 zxid:0x4f txntype:-1 reqpath:n/a Error Path:/brokers/topics/uft_splitter/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/uft_splitter/partitions 2018-10-09 12:01:35,671 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:setData c xid:0xc2 zxid:0x53 txntype:-1 reqpath:n/a Error Path:/config/topics/cres_global Error:KeeperErrorCode = NoNode for /config/topics/cres_global
參考https://github.com/mesos/kafka/issues/136,可是kafka服務一直正常啟動着啊(對比啟動日志也可以看出確實已經啟動了)。https://stackoverflow.com/questions/34393837/zookeeper-kafka-error-keepererrorcode-nodeexists還有一個原因,是因為zk的data未刪除的原因,可我們是全新安裝過一會也有這個問題。最后查看https://stackoverflow.com/questions/43559328/got-user-level-keeperexception-when-processing,如下:
The message you see is not an error yet. It is a potential exception raised by Zookeeper that original object making a request has to handle.
When you start a fresh Kafka, it gets a bunch of NoNode
messages. It's normal because some paths don't exist yet. At the same time, you get also NodeExists
messages as the path exists already.
Example: Error:KeeperErrorCode = NoNode for /config/topics/test
It's because Kafka sends a request to Zookeeper for this path. But it doesn't exist. That's OK, because you are trying to create it. So, you see "INFO" from Zookeeper but no error from Kafka. Once Kafka gets this message, it tries to create your topic. To do so, it needs to access a path in Zookeeper for topics. So, it sends a request and gets an error NodeExists for /config/topics
. Again, it's normal and Kafka ignores the message.
Long story short, these are all non-issue messages and you should skip them. If it bothers you, change logging configuration of Zookeeper (it's not recommended though).
其實就是提示性信息,不用管它就好了,kafka會直接忽略該信息。
編譯不報錯,啟動時報下列錯誤:
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe
原因:編譯依賴的kafka client版本和運行時不一致如0.9.1和0.11.0,典型的例如間接依賴,對比下編譯依賴的版本和運行時打出來的版本。
客戶端警告:
[WARN] consumer-0-C-1 org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:846) Error while fetching metadata with correlation id 105 : {hs:ta:channel:Metadata=INVALID_TOPIC_EXCEPTION}
解決方法:主題名不能包含:字符。
客戶端錯誤
Exception in thread "CDC-Kafka-[hsta.cdcRecord_F6]-Consumer-1" Exception in thread "CDC-Kafka-[hsta.cdcRecord_F7]-Consumer-1" org.apache.kafka.common.errors.WakeupException
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:487)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
解決方法:
客戶端錯誤
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'correlation_id': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:53)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:435)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
首先檢查,確保配置正確,比如是不是連接到了正確的kafka版本,其次客戶端的版本是不是比server高。
客戶端錯誤:
[] 2020-01-09 13:35:19 [32026] [ERROR] consumer-0-C-1 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:692) Container exception
org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [hs:ta:channel:Metadata]
解決方法:主題名不能包含:字符。
kafka后台啟動方式
默認情況下,執行
./kafka-server-start.sh ../config/server.properties的時候,進程是前台模式的,意味着關掉控制台,kafka就停了。所以需要加-daemon選項以后台模式啟動。如下:
./kafka-server-start.sh -daemon ../config/server.properties
提交失敗
- 1、保證consumer的消費性能足夠快
- 2、盡量讓consumer邏輯冪等
- 3、通過配置優化解決重復消費
- 4、不單單做好消息積壓的監控,還要做消息消費速度的監控(前后兩次offset比較)
我們使用的Kafka的api, 調用的是KafkaConsumer的poll方法,該方法調用了pollOnce方法,該方法又調用了ConsumerCoordinator 的poll方法,該方法的最后調用了自動offset同步的方法,關鍵就在這個方法,這個方法只有在poll的時候才會調用,如果數據處理時間操過poll的最大時間,就會導致本文開始的錯誤,而不能提交offset.
消費慢的各種問題
https://blog.csdn.net/qq_16681169/article/details/101081656
https://blog.csdn.net/qq_40384985/article/details/90675986
https://blog.csdn.net/qq_34894188/article/details/80554570
問題
[2018-10-14 21:20:28,860] INFO [Consumer clientId=consumer-1, groupId=console-consumer-28334] Discovered group coordinator wxoddc2nn1:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
https://www.jianshu.com/p/0e7e38c98d13 解決方法
kafka:為什么說Kafka使用磁盤比內存快(這個結論本質上是說不正確的,為什么呢?因為它的機制是連續讀寫為主,因此只是把活丟給了OS的頁面緩存而已,mongodb亦如此。你可以說這個決定不一定比自己做差,但是說磁盤比內存快就有點過了)
Kafka最核心的思想是使用磁盤,而不是使用內存,可能所有人都會認為,內存的速度一定比磁盤快,我也不例外。
在看了Kafka的設計思想,查閱了相應資料再加上自己的測試后,發現磁盤的順序讀寫速度(Cassandra, LevelDB, RocksDB也都是這種策略)和內存持平。
而且Linux對於磁盤的讀寫優化也比較多,包括read-ahead和write-behind,磁盤緩存等。
如果在內存做這些操作的時候,一個是JAVA對象的內存開銷很大,另一個是隨着堆內存數據的增多,JAVA的GC時間會變得很長,而利用OS的page cache,gc的開銷就節省了不少(JNI是否也可以達到類似效果???,起碼netty中的ByteBuffer及Unsafe一大部分是的)。
使用磁盤操作有以下幾個好處:
- 磁盤緩存由Linux系統維護,減少了java程序員的不少工作。
- 磁盤順序讀寫速度超過內存隨機讀寫。
- JVM的GC效率低,內存占用大。
- 系統冷啟動后,磁盤緩存依然可用。
優化
- 消息刪除方面,使用實時標記代替刪除。
- 發送方面,使用批量發送代替實時發送。
- 在jvm方面,默認kafka用的是cms gc,可以考慮g1垃圾回收期,調整為:-server -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true
其他關鍵實踐
Kafka uses tombstones instead of deleting records right away
❖ Kafka producers support record batching. by the size of records and auto-flushed based on time
❖ Batching is good for network IO throughput.
❖ Batching speeds up throughput drastically
With Kafka consumers pull data from brokers replica.lag.time.max.ms > lag時,leader就把follow從ISRs踢掉
If all replicas are down for a partition, Kafka chooses first replica (not necessarily in ISR set) that comes alive as the leader
❖ Config unclean.leader.election.enable=true is default
❖ If unclean.leader.election.enable=false, if all replicas are down for a partition, Kafka waits for the ISR member that comes alive as new leader.
Outside of using a single ensemble(協調器,zookeeper) for multiple Kafka clusters, it is not recommended
to share the ensemble with other applications, if it can be avoided. Kafka is sensitive
to Zookeeper latency and timeouts, and an interruption in communications with the
ensemble will cause the brokers to behave unpredictably. This can easily cause multiple
brokers to go offline at the same time, should they lose Zookeeper connections,
which will result in offline partitions. It also puts stress on the cluster controller,
which can show up as subtle errors long after the interruption has passed, such as
when trying to perform a controlled shutdown of a broker. Other applications that
can put stress on the Zookeeper ensemble, either through heavy usage or improper
operations, should be segregated to their own ensemble.
Kafka與MQTT
不同於rabbitmq、active mq,kafka默認不支持MQTT協議,如果希望現有和rabbitmq通過MQTT對接的應用無縫切換,要么自己寫gateway,要么借用三方插件,比較正統的主要有https://www.confluent.io/connector/kafka-connect-mqtt/,https://www.infoq.cn/article/fdbcrh6I*9ajCWLvippC
參考
- https://my.oschina.net/u/3853586/blog/3138825(相關總結性概念,還不錯)
- https://kafka.apache.org/documentation(single page模式)
- http://cloudurable.com/blog/kafka-architecture/index.html
- https://cwiki.apache.org/confluence/display/KAFKA/
- http://cloudurable.com/ppt/4-kafka-detailed-architecture.pdf
- Learning Apache Kafka Second Edition (針對0.8.x版本)
- Kafka: The Definitive Guide(針對0.9.x版本)
- https://cwiki.apache.org/confluence/display/KAFKA/Kafka+papers+and+presentations
- https://www.iteblog.com/archives/2605.html(32 道常見的 Kafka 面試題你都會嗎)
- https://www.confluent.io/resources/?_ga=2.259543491.831483530.1575092374-1512510254.1571457212(如果說percona server是依托mysql的主要商業版公司的話,confluent則是依托kafka的商業公司)
- https://www.jianshu.com/p/d2cbaae38014(kafka運維天坑,版本有點老,0.9.1系列的,但是針對服務器的一些問題值得參考下)
相關最佳實踐參考:
- https://dzone.com/articles/20-best-practices-for-working-with-apache-kafka-at