kafka學習指南(總結版)


版本介紹

  目前最新版本為2.3(20190808更新)。

  從使用上來看,以0.9為分界線,0.9開始不再區分高級(相當於mysql binlog的GTID,只需要跟topic打交道,服務器自動管理偏移量和負載均衡)/低級消費者API(相當於mysql binlog的文件+position,直接和分區以及偏移量打交道)。

  從兼容性上來看,以0.8.x為分界線,0.8.x不兼容以前的版本。

總體拓撲架構

從上可知:

0、一個kafka集群由N個broker組成,至少一個(單機),其中一個broker是controller(其職責見下文);集群中包含很多topic,topic可以分區,每個分區可以配置多個副本,每個副本肯定在不同的broker,其中一個副本為leader(寫),其它為follower,可用於讀(跟couchbase架構是一樣的)。

1、生產者不需要訪問zookeeper(0.8.x版本的kafka consumer直連zk得到偏移量信息,之后的版本直接從cluster獲取,所以這兩個版本的API並不兼容,上圖是0.8x的結構,0.9.x以及之后略有失真)。

2、消費者fetch消息、生產者發布消息總是向leader節點發請求,不會發送給follower(broker之間,而不是broker和客戶端之間協調復制)。

3、和rocketmq一樣,為了線性提高性能,每個topic被分為partition(跟數據庫的分庫分表一樣的道理,對業務而言透明,屬於技術策略,不是業務策略。分區可以輪詢,也可以基於消息的key應用分區算法。),每個partition只能被相同消費組的任何一個成員消費(所以如果topic中的message不要求有序消費的話,partition是在大流量下提升性能的關鍵機制),topic的中分區parition的數量(默認是1)可通過./kafka-topics.sh –zookeeper localhost:2181 -alter –partitions 5 –topic userService修改,可以進入 /tmp/kafka-logs 目錄下進行查看,其合理值的設置可以參考https://blog.csdn.net/kwengelie/article/details/51150114

4、kafka 0.8.x使用zk存儲每個consumer-group在每個topic的每個partition中的點位(每個消息都有一個offset,且在分區內單調遞增),0.9版本開始存儲在專門的topic中,該topic名為"__consumer_offset",這樣consumer-group+topic就確定了點位,便於隨時可恢復運行,采用日志壓縮存儲,也就是僅存儲每個key的最新值,而非所有。 

5、每個topic本地有一個local log,broker會持續順序寫入。

6、每條消息可以有key,也可以沒有。有的話,用於確定消息發往哪個parition,否則就是輪詢機制,java中是對key應用hash(實際為了重復消費的問題,一般會設置key),每個分區內的記錄是保證有序的,所以選擇合適的key能夠將串行轉為並行,這個需要非常理解業務邏輯要求,很多時候,嚴格遞增並非必須(OLTP更是如此,可以根據產品、客戶、商家、甚至某一次活動),只是實現簡單而已。需要記住的是:是生產者而非broker決定去哪個分區。

7、在replicas模式下,一致性遵循的是全一致性模式,而非過半模式,如下:

ISRs見下文所述。一個topic中的不同parition可以為不同broker中的leader,這種模式可以提高性能,因為讀寫都是leader負責。committed記錄所在的截止位置也成為高水位"High Watermark"。雖然使用角度不直接care,但是partition是HA和擴展性的真正落地之處。

kafka自身整體架構

  這里要提及的是controller,其中一個broker會被作為controller,controller主要負責處理kafka集群范圍內的事件,包括leader選舉、topic變化、paritions副本數跟蹤、broker的變化等,主要和zk通信,這根zk的主節點還不同,更像是Hadoop的nameserver,只負責元數據的全局管理。

  kafka controller架構如下:

   其職責可以參考https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals。對controller工作原理的解析,可以參考https://www.cnblogs.com/huxi2b/p/6980045.html,這篇文章總結的還是比較到位的。

綜合起來,kafka的核心要素包括:brokers(所有的進程都是broker), topics, logs, partitions, controller, message, cluster。

broker內部機制

消息發送過程

那客戶端是怎么知道哪個broker是leader呢?因為每個broker都緩存了元數據,所以在連接初始建立的時候,客戶端可以從任何一個broker獲取每個topic的元數據信息,如下:

 

消息消費過程

 

核心高級API(不允許用戶控制消費者和broker的交互過程)

  • ConsumerConnector
  • KafkaStream
  • ConsumerConfig

  低級API則允許控制各個交互過程,比如從哪里開始讀以及在客戶端維護點位,rocketmq實現其實采用的就是高層和底層結合的API,也就是kafka 0.9之后合並的api版本。

底層API的主要接口是SimpleConsumer。

消費者group和partition的關系 

  每個消費者group會記錄自己在每個分區中的消費進度(該信息記錄在專門的topic log中,見上文)。一個分區只能由被每個消費者group中的任意一個消費者成員消費,因為一般情況下微服務都是集群部署,所以這會導致N-1個微服務節點中的topic listener空跑,這是需要注意的,但是如果當前消費者所在的服務掛了,kafka會自動選擇其中一個剩下的consumer,但是如果已經消費但是ack未被kafka收到,其它consumer接管時就會重復消費,要注意冪等。想要一個topic被消費者group中的成員並行消費的話,就需要配置不低於集群成員數的partition。簡單的說,就是管理粒度是消費者組(在其他MQ中稱訂閱者)和topic,底層消息接收粒度分區和消費者。

  不僅集群微服務可以從多partition受益,單JVM也可以收益,只要啟動多個獨立的線程,每個線程都作為topic的consumer就可以並發處理,這主要用於SMP服務器的時候,所以當消息處理需要一定時間或消息TPS大的時候,都應該使用多parition。

幾個關鍵的消息點位

  消息堆積是消費滯后(Lag)的一種表現形式,消息中間件服務端中所留存的消息與消費掉的消息之間的差值即為消息堆積量,也稱之為消費滯后(Lag)量。對於Kafka而言,消息被發送至Topic中,而Topic又分成了多個分區(Partition),每一個Partition都有一個預寫式的日志文件,雖然Partition可以繼續細分為若干個段文件(Segment),但是對於上層應用來說可以將Partition看成最小的存儲單元(一個由多個Segment文件拼接的“巨型文件”)。每個Partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到Partition中。

 

 上圖中有四個概念:

1.LogStartOffset:表示一個Partition的起始位移,初始為0,雖然消息的增加以及日志清除策略的影響,這個值會階段性的增大。
2.ConsumerOffset:消費位移,表示Partition的某個消費者消費到的位移位置。
3.HighWatermark:簡稱HW,代表消費端所能“觀察”到的Partition的最高日志位移,HW大於等於ConsumerOffset的值。
4.LogEndOffset:簡稱LEO, 代表Partition的最高日志位移,其值對消費者不可見。比如在ISR(In-Sync-Replicas)副本數等於3的情況下(如下圖所示),消息發送到Leader A之后會更新LEO的值,Follower B和Follower C也會實時拉取Leader A中的消息來更新自己,HW就表示A、B、C三者同時達到的日志位移,也就是A、B、C三者中LEO最小的那個值。由於B、C拉取A消息之間延時問題(這就涉及到發送可靠性問題,見下文),所以HW必然不會一直與Leader的LEO相等,即LEO>=HW。 所以,實際可能是這樣的:

 

要計算Kafka中某個消費者的滯后量很簡單,首先看看其消費了幾個Topic,然后針對每個Topic來計算其中每個Partition的Lag,每個Partition的Lag計算就顯得非常的簡單了,參考下圖:

 

 

由圖可知消費Lag=HW - ConsumerOffset。

ISRs

  ISR是指當前同步副本集中的成員,如果leader失敗,其中一個ISR會被選為的新的leader。

Topic Log Compaction

  kafka的topic log會持續增長,所以為了保持穩定,應該定期回收。這涉及到兩方面:消息的key是否會相同,它們的策略是不同的。Log Compaction主要用於key會相同的情況,也就是非UUID作為消息的鍵,否則就沒有意義了。其機制是根據消息保留的時間或文件大小來刪除key相同的歷史value,如下所示:

  可知,歷史版本被清了。啟用compact后,topic log分為了head和tail部分,只有tail的才會被壓縮,但是刪除還要根據其它配置決定,如下。

 

  kafka參數min.compaction.lag.ms控制消息至少過多久才會被壓縮,delete.retention.ms控制多久會被刪除,log.cleanup.policy=compact控制啟用壓縮,所以消費者只要在此之內進行消費,就可以保證至少看到最新記錄(因為producer可能又寫入了,所以至少會看到最新,也許更多)。

消息保留時長

  每個topic可以基於時間或topic log的大小聲明消息的保留時間,由下列參數決定:

 

屬性名 含義 默認值
log.cleanup.polict 日志清理保存的策略只有delete和compact兩種 delete
log.retention.hours 日志保存的時間,可以選擇hours,minutes和ms 168(7day)
log.retention.bytes 刪除前日志文件允許保存的最大值(任意一個達到都會執行刪除) -1
log.segment.delete.delay.ms 日志文件被真正刪除前的保留時間 60000
log.cleanup.interval.mins 每隔一段時間多久調用一次清理的步驟 10
log.retention.check.interval.ms 周期性檢查是否有日志符合刪除的條件(新版本使用) 300000

ACK一致性級別

  生產者(現在面試,我們都問如何保證發出的消息不丟失)可以通過ack設置數據一致性要求(和mysql機制類似)。ack=0(不需要ACK,至多一次), ack=all(leader和所有follows都寫入成功,默認), ack=1(leader成功即可)。

  可以通過在producer properties中設置,如下:

  • props.put("acks", "0");
  • props.put("acks", "1");
  • props.put("acks", "all");

  早期版本的生產者不支持“精確一次”的概念,從Kafka 0.11.0支持精確一次投遞概念,它是通過引入生產者消息冪等+原子事務概念實現的,可以參考https://dzone.com/articles/exactly-once-semantics-with-apache-kafka-1。

  在消費者層面,kafka支持至多一次和至少一次兩種模式。

To implement “at-most-once” consumer reads a message, then saves its offset in the partition by sending it to the broker, and finally process the message. The issue with “at-most-once” is a consumer could die after saving its position but before processing the message. Then the consumer that takes over or gets restarted would leave off at the last position and message in question is never processed.

To implement “at-least-once” the consumer reads a message, process messages, and finally saves offset to the broker. The issue with “at-least-once” is a consumer could crash after processing a message but before saving last offset position. Then if the consumer is restarted or another consumer takes over, the consumer could receive the message that was already processed. The “at-least-once” is the most common set up for messaging, and it is your responsibility to make the messages idempotent, which means getting the same message twice will not cause a problem (two debits).

To implement “exactly once” on the consumer side, the consumer would need a two-phase commit between storage for the consumer position, and storage of the consumer’s message process output. Or, the consumer could store the message process output in the same location as the last offset.

  kafka僅支持前兩種消費者ACK,第三種需要用戶自己實現,一般大家都是用第二種+冪等來實現,也就是消費者自身的一致性,通過冪等+ACK保證,就不重復闡述了。

  通過如下可以保證手工管理ack提交:

props.put("enable.auto.commit", "false");
try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());

    try {
      consumer.commitSync();
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  consumer.close();
}

  在自動提交模式下,提交間隔由auto.commit.interval.ms確定。各種提交模式的使用可以參考https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/。

Kafka的生態

  MirrorMaker是kafka集群之間同步的組件,本質上是一個生產者+消費者,如下:

  如上所示,它已經到V2版本(要求kafka 2.4),V2相比V1而言,最重要的是解決了兩個集群之間消費者點位的自動換算問題,而不需要依賴於kafka的時間戳來計算(這一步非常關鍵,否則災備切換成本過高、且容易出錯)。內部架構如下:

上圖在實際部署中其實是不太正確的,consumer(應用)肯定是兩個機房都部署了,且雙活存在。這時候就有個問題了,__consumer_offset和業務topic本身到達的先后問題。如果業務先,__consumer_offset后,那么災備中心的應用讀取到的就是老的offset,這樣就會重復消費。如果__consumer_offset先,也有個問題,超過了實際的長度,則會重置為實際的offset,后到的業務topic也會被重復消費。所以作為應用消費者這里,冪等極為重要,其次作為topic設計者,確定message的key唯一非常重要。如果延時太大,導致的問題可能是大量的重復消息要丟棄,進而影響RTO。

除此之外,Kafka包括所有MQ在災備時還存在一個很嚴重的問題,依賴於MQ實現最終一致性的破壞,通常對於最終一致性,依賴MQ可以極大的提升可靠性和性能、並解耦微服務,如果消息丟了,那么意味着最終一致性的破壞,需要依賴於微服務間的對賬機制。

Kafka REST Proxy and Confluent Schema Registry

 

kafka在zk中的存儲

  為了顯示方便,LZ設置了chroot為localKakfa,如下:

  

  各個zk節點的含義如下示意圖所示(之前版本的kafka,consumer-offset也維護在zk中),其中kafka01就是chroot,在kafka的server.properties中設置,加載zookeeper.connect后即可。如zookeeper.connect=localhost:2181/localKafka。

 

kafka監控管理

  主流的幾種kafka監控程序分別為:

  • 1、Kafka Web Conslole,從https://github.com/claudemamo/kafka-web-console下載源碼編譯,https://pan.baidu.com/s/1cJ2OefPqlxfWzTHElG6AjA 下載編譯好的,提取碼:f8xz,如果是測試和開發、為了排查方便,推薦使用它,其缺陷參見https://blog.csdn.net/qq_33314107/article/details/81099091。

[root@node1 bin]# sh kafka-web-console

Play server process ID is 4154

[info] play - database [default] connected at jdbc:h2:file:play

[warn] play - Your production database [default] needs evolutions! 

 

INSERT INTO settings (key_, value) VALUES ('PURGE_SCHEDULE', '0 0 0 ? * SUN *');

INSERT INTO settings (key_, value) VALUES ('OFFSET_FETCH_INTERVAL', '30');

 

[warn] play - Run with -DapplyEvolutions.default=true if you want to run them automatically (be careful)

Oops, cannot start the server.

@74e0p173o: Database 'default' needs evolution!

        at play.api.db.evolutions.EvolutionsPlugin$$anonfun$onStart$1$$anonfun$apply$1.apply$mcV$sp(Evolutions.scala:484)

        at play.api.db.evolutions.EvolutionsPlugin.withLock(Evolutions.scala:507)

        at play.api.db.evolutions.EvolutionsPlugin$$anonfun$onStart$1.apply(Evolutions.scala:461)

        at play.api.db.evolutions.EvolutionsPlugin$$anonfun$onStart$1.apply(Evolutions.scala:459)

        at scala.collection.immutable.List.foreach(List.scala:318)

        at play.api.db.evolutions.EvolutionsPlugin.onStart(Evolutions.scala:459)

        at play.api.Play$$anonfun$start$1$$anonfun$apply$mcV$sp$1.apply(Play.scala:88)

        at play.api.Play$$anonfun$start$1$$anonfun$apply$mcV$sp$1.apply(Play.scala:88)

        at scala.collection.immutable.List.foreach(List.scala:318)

        at play.api.Play$$anonfun$start$1.apply$mcV$sp(Play.scala:88)

        at play.api.Play$$anonfun$start$1.apply(Play.scala:88)

        at play.api.Play$$anonfun$start$1.apply(Play.scala:88)

        at play.utils.Threads$.withContextClassLoader(Threads.scala:18)

        at play.api.Play$.start(Play.scala:87)

        at play.core.StaticApplication.<init>(ApplicationProvider.scala:52)

        at play.core.server.NettyServer$.createServer(NettyServer.scala:243)

        at play.core.server.NettyServer$$anonfun$main$3.apply(NettyServer.scala:279)

        at play.core.server.NettyServer$$anonfun$main$3.apply(NettyServer.scala:274)

        at scala.Option.map(Option.scala:145)

        at play.core.server.NettyServer$.main(NettyServer.scala:274)

        at play.core.server.NettyServer.main(NettyServer.scala)

 

第一次啟動時要加個參數:

 ./kafka-web-console -DapplyEvolutions.default=true

  • 2、Kafka Manager,也不錯,yahoo開發的,可用來監控流量。

鏈接:https://pan.baidu.com/s/1YMj-9HzoJLKDEY5C47aOlQ 
提取碼:hhhr

  • 3、KafkaOffsetMonitor
  • 4、Kafdrop,新開發的,參見https://github.com/HomeAdvisor/Kafdrop。

  以我們生產一般使用的KafkaOffsetMonitor為例,KafkaOffsetMonitor是Kafka的一款客戶端消費監控工具,用來實時監控Kafka服務的Consumer以及它們所在的Partition中的Offset,我們可以瀏覽當前的消費者組,並且每個Topic的所有Partition的消費情況都可以一目了然。KafkaOffsetMonitor托管在Github上,可以通過Github下載。下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releases,也可以從baidu網盤下載(內網的話,要使用這個,否則會缺少從cdn加載的js)。

  可以通過java -cp KafkaOffsetMonitor-assembly-0.2.0.jar     com.quantifind.kafka.offsetapp.OffsetGetterWeb     --zk 10.20.30.10:2181     --port 8088     --refresh 10.seconds     --retain 2.days啟動,各配置含義可以參考github。

 

 

 

  

Kafka Rebalance機制(重要)

常見問題

如何通過java api獲取所有topic?

 

消費和如何一次性訂閱多個topic? 

它適合所有topic具有相同的語義范圍時,可通過this.consumer.subscribe(Arrays.asList(topic));訂閱多個主題。比如緩存通常要求每個應用實例都消費、所以ip+port+app.group+app.version作為consumer-group-id比較合適,否則app.group+app.version作為consumer-group-id更合適。 

 

如何查看所有的topic?

 
         

[root@hs-test-10-20-30-16 bin]# ./kafka-topics.sh --list --zookeeper 10.20.30.17:2191
__consumer_offsets
hs_ta_channel_Metadata
hsta_channelprocess20100513_aop-YOGA - marked for deletion
hsta_channelprocess20100514_aop-TA4 - marked for deletion
hsta_channelprocess20100530_aop-TA4
hsta_channelprocess20200118_aop-YOGA
hsta_channelprocessnull - marked for deletion

 

  和連接--bootstrap-server出來的結果不一樣,如下:

[root@ta5host bin]# ./kafka-topics.sh  --list --bootstrap-server 10.20.30.17:9092
__consumer_offsets
hs_ta_channel_Metadata
uft_individual_1
uft_individual_1_oracle
uft_inst_1
uft_inst_1_oracle
uft_spliter_1_oracle
uft_spliter_2_oracle
uft_splitter_1
uft_trade_1
uft_trade_1_bar
uft_trade_1_oracle
uft_trade_1_oracle_bar
uft_trade_2_oracle
uft_trade_2_oracle_bar

  是不是哪里不對???

  前者和kafka-manager的監控結果是一樣的,畢竟連接的是同一個zk。

 

要想知道哪些kafka集群連接本zk,可以通過netstat -ano | grep 2191找到客戶端。

查看特定topic的配置?

[root@hs-test-10-20-30-11 kafka]# bin/kafka-topics.sh --zookeeper 10.20.30.10:2181 --topic global --describe
Topic:global PartitionCount:1 ReplicationFactor:1 Configs:
Topic: global Partition: 0 Leader: 0 Replicas: 0 Isr: 0

[root@hs-test-10-20-30-10 bin]# ./kafka-topics.sh --bootstrap-server localhost:9092 --topic hs_ta_channel_Metadata --describe
Topic:hs_ta_channel_Metadata    PartitionCount:1    ReplicationFactor:1    Configs:min.insync.replicas=1,segment.bytes=1073741824,max.message.bytes=10485760
    Topic: hs_ta_channel_Metadata    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

 

如何刪除topic?

  刪除kafka topic及其數據,嚴格來說並不是很難的操作。但是,往往給kafka 使用者帶來諸多問題。項目組之前接觸過多個開發者,發現都會偶然出現無法徹底刪除kafka的情況。本文總結多個刪除kafka topic的應用場景,總結一套刪除kafka topic的標准操作方法。

step1:

  如果需要被刪除topic 此時正在被程序 produce和consume,則這些生產和消費程序需要停止。

  因為如果有程序正在生產或者消費該topic,則該topic的offset信息一致會在broker更新。調用kafka delete命令則無法刪除該topic。

  同時,需要設置 auto.create.topics.enable = false,默認設置為true。如果設置為true,則produce或者fetch 不存在的topic也會自動創建這個topic。這樣會給刪除topic帶來很多意向不到的問題。

  所以,這一步很重要,必須設置auto.create.topics.enable = false,並認真把生產和消費程序徹底全部停止。

step2:

  server.properties 設置 delete.topic.enable=true

  如果沒有設置 delete.topic.enable=true,則調用kafka 的delete命令無法真正將topic刪除,而是顯示(marked for deletion)

step3:

  調用命令刪除topic:

  ./bin/kafka-topics --delete --zookeeper 【zookeeper server:port】 --topic 【topic name】

step4:

  刪除kafka存儲目錄(server.properties文件log.dirs配置,默認為"/data/kafka-logs")相關topic的數據目錄。

  注意:如果kafka 有多個 broker,且每個broker 配置了多個數據盤(比如 /data/kafka-logs,/data1/kafka-logs ...),且topic也有多個分區和replica,則需要對所有broker的所有數據盤進行掃描,刪除該topic的所有分區數據。

   一般而言,經過上面4步就可以正常刪除掉topic和topic的數據。但是,如果經過上面四步,還是無法正常刪除topic,則需要對kafka在zookeeer的存儲信息進行刪除。具體操作如下:

  (注意:以下步驟里面,kafka在zk里面的節點信息是采用默認值,如果你的系統修改過kafka在zk里面的節點信息,則需要根據系統的實際情況找到准確位置進行操作)

step5:

  找一台部署了zk的服務器,使用命令:

  bin/zkCli.sh -server 【zookeeper server:port】

  登錄到zk shell,然后找到topic所在的目錄:ls /brokers/topics,找到要刪除的topic,然后執行命令:

  rmr /brokers/topics/【topic name】

  即可,此時topic被徹底刪除。

  如果topic 是被標記為 marked for deletion,則通過命令 ls /admin/delete_topics,找到要刪除的topic,然后執行命令:

  rmr /admin/delete_topics/【topic name】

備注:

  網絡上很多其它文章還說明,需要刪除topic在zk上面的消費節點記錄、配置節點記錄,比如:

  rmr /consumers/【consumer-group】

  rmr /config/topics/【topic name】

  其實正常情況是不需要進行這兩個操作的,如果需要,那都是由於操作不當導致的。比如step1停止生產和消費程序沒有做,step2沒有正確配置。也就是說,正常情況下嚴格按照step1 -- step5 的步驟,是一定能夠正常刪除topic的。

step6:

  完成之后,調用命令:

  ./bin/kafka-topics.sh --list --zookeeper 【zookeeper server:port】

  查看現在kafka的topic信息。正常情況下刪除的topic就不會再顯示。

  但是,如果還能夠查詢到刪除的topic,則重啟zk和kafka即可。

如何查看所有的消費者組?

  新的方式,也就是不是使用基於zk的客戶端(kafka.consumer.Consumer.createJavaConsumerConnector、內部是bootstrap)。

[root@hs-test-10-20-30-11 kafka]# bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.20.30.11:9092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

  老的方式:基於zk的客戶端(kafka.javaapi.consumer.ZookeeperConsumerConnector,已經deprecated)。

[root@hs-test-10-20-30-11 kafka]# bin/kafka-consumer-groups.sh --zookeeper 10.20.30.10:2181 --list
Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).

AAA
TA50-Aggr-Logger-ConsumerGroup
console-consumer-23104
console-consumer-37858

查看一個消費者組的消費點位

[root@hs-test-10-20-30-10 bin]# ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-topics --group tabase-service-10.20.30.10:8383

TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
hs_ta_channel_Metadata 0          4               4               0               consumer-1-b8c4c0c5-dbf3-4a55-8a9d-fda9d7b00c17 /10.20.30.10    consumer-1
[root@hs-test-10-20-30-10 bin]# ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-topics --group tabase-10.20.30.10:8080

TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
hs_ta_channel_Metadata 0          4               4               0               consumer-1-1681831e-8c97-48bc-9c8d-21f2f7667aa9 /10.20.30.10    consumer-1
[root@hs-test-10-20-30-10 bin]# ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-topics --group tabase-10.20.30.10:8080 --verbose --state

COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
10.20.30.10:9092 (0)      range                     Stable               1

 

生產者連接的時候報了下列錯誤

  WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

  有兩個原因:1、kafka沒有啟動;2、連接串使用了非conf/server.properties里面的LISTENERS參數的值。

log4j-kafka配置

增加jar包依賴: 

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>0.11.0.3</version>
        </dependency>

配置log4j2.xml,如下:

logger增加kafka appender。

        <Root level="INFO" additivity="false">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="KAFKA"/>
            <AppenderRef ref="app_error" />
        </Root>

增加kafka appender。

    <Appenders>
        <!-- 輸出錯誤日志到Kafka -->
        <Kafka name="KAFKA" topic="bomp">
            <ThresholdFilter level="error" onMatch="DENY" onMismatch="NEUTRAL"/>
            <ThresholdFilter level="trace" onMatch="ACCEPT" onMismatch="DENY"/>
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}:%4p %t (%F:%L) - %m%n" />
            <Property name="bootstrap.servers">10.20.30.11:9092</Property>
        </Kafka>
    </Appenders>

  這樣log4j配置kafka就完成了。對於c++,可以使用librdkafka庫,https://docs.confluent.io/2.0.0/clients/librdkafka/index.html,后續會專門出文講解。

相關異常

消費者報

2018-09-17 14:10:07.768 WARN 130400 --- [r-finder-thread] kafka.client.ClientUtils$ : Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,10.20.30.11,9092)] failed

java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[kafka_2.12-0.11.0.3.jar:na]
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) ~[kafka_2.12-0.11.0.3.jar:na]
at kafka.producer.SyncProducer.doSend(SyncProducer.scala:79) ~[kafka_2.12-0.11.0.3.jar:na]
at kafka.producer.SyncProducer.send(SyncProducer.scala:124) ~[kafka_2.12-0.11.0.3.jar:na]
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:61) [kafka_2.12-0.11.0.3.jar:na]
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:96) [kafka_2.12-0.11.0.3.jar:na]
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:72) [kafka_2.12-0.11.0.3.jar:na]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) [kafka_2.12-0.11.0.3.jar:na]

解決方法:在server.properties里面設置下advertised.host.name,重啟試試看。參考https://stackoverflow.com/questions/30606447/kafka-consumer-fetching-metadata-for-topics-failed

zk日志中報

2018-10-08 14:13:28,297 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100147743c10000 type:setData cxid:0xc8 zxid:0x53 txntype:-1 reqpath:n/a Error Path:/config/topics/uft_trade Error:KeeperErrorCode = NoNode for /config/topics/uft_trade
2018-10-08 14:13:28,302 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100147743c10000 type:create cxid:0xc9 zxid:0x54 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics

解決方法:待排查。

spring boot kafka客戶端在某虛擬機服務器(物理機一直運行未發生)上運行一段時間后,瞬間cpu system 80-90%,大量下列日志:

2018-10-09 13:54:57,713  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2682ms for sessionid 0x100175687960002
2018-10-09 13:54:57,904  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2672ms for sessionid 0x100175687960004
2018-10-09 13:54:58,621  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2675ms for sessionid 0x100175687960003
2018-10-09 13:54:57,232  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2700ms for sessionid 0x100175687960007
2018-10-09 13:55:09,812  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2672ms for sessionid 0x100175687960004, closing socket connection and attempting reconn
ect
2018-10-09 13:55:02,942  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2702ms for sessionid 0x100175687960008
2018-10-09 13:55:09,755  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2675ms for sessionid 0x100175687960003, closing socket connection and attempting reconn
ect
2018-10-09 13:55:09,789  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2682ms for sessionid 0x100175687960002, closing socket connection and attempting reconn
ect
2018-10-09 13:55:18,677  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2675ms for sessionid 0x100175687960005
2018-10-09 13:55:11,752  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 20016ms for sessionid 0x100175687960001
2018-10-09 13:55:17,709  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 2678ms for sessionid 0x100175687960006
2018-10-09 13:55:12,779  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2700ms for sessionid 0x100175687960007, closing socket connection and attempting reconn
ect
2018-10-09 13:55:20,634  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2702ms for sessionid 0x100175687960008, closing socket connection and attempting reconn
ect
2018-10-09 13:55:22,178  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 20016ms for sessionid 0x100175687960001, closing socket connection and attempting recon
nect
2018-10-09 13:58:10,244  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:58:10,240  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:58:10,241  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:58:10,240  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2675ms for sessionid 0x100175687960005, closing socket connection and attempting reconn
ect
2018-10-09 13:58:10,243  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 2678ms for sessionid 0x100175687960006, closing socket connection and attempting reconn
ect
2018-10-09 13:58:11,107  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:58:40,384  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:58:40,383  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:58:40,379  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:58:40,378  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:58:40,378  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:58:40,377  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:59:22,082  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:22,084  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:22,099  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:22,108  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:22,130  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:59:23,382  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:59:23,412  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 13:59:23,412  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 13:59:23,443  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@8646db9
2018-10-09 13:59:23,411  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960001 has expired
2018-10-09 13:59:32,474  INFO ZkClient:713 - zookeeper state changed (Disconnected)
2018-10-09 13:59:23,404  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960007 has expired
2018-10-09 13:59:23,390  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 13:59:32,477  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@4671e53b
2018-10-09 13:59:23,390  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960008 has expired
2018-10-09 13:59:23,390  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 13:59:32,477  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@6a1aab78
2018-10-09 13:59:23,389  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960004 has expired
2018-10-09 13:59:32,417  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:59:23,380  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:59:23,446  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@dc24521
2018-10-09 13:59:41,829  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960004 has expired, closing socket connection
2018-10-09 13:59:41,832  INFO ZkClient:936 - Waiting for keeper state SyncConnected
2018-10-09 13:59:41,829  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960008 has expired, closing socket connection
2018-10-09 13:59:41,831  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:41,830  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960007 has expired, closing socket connection
2018-10-09 13:59:41,830  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960001 has expired, closing socket connection
2018-10-09 13:59:41,860  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:42,585  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 13:59:42,810  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 13:59:42,835  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:31,813  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 48978ms for sessionid 0x100175687960002
2018-10-09 14:00:31,825  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 49644ms for sessionid 0x100175687960005
2018-10-09 14:00:31,825  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 49644ms for sessionid 0x100175687960005, closing socket connection and attempting recon
nect
2018-10-09 14:00:31,827  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 49968ms for sessionid 0x100175687960006
2018-10-09 14:00:31,827  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 49968ms for sessionid 0x100175687960006, closing socket connection and attempting recon
nect
2018-10-09 14:00:31,842  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 50011ms for sessionid 0x100175687960003
2018-10-09 14:00:31,868  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 50011ms for sessionid 0x100175687960003, closing socket connection and attempting recon
nect
2018-10-09 14:00:31,853  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 48978ms for sessionid 0x100175687960002, closing socket connection and attempting recon
nect
2018-10-09 14:00:31,885  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:31,886  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:31,887  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:31,887  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:31,907  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960001
2018-10-09 14:00:31,907  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960008
2018-10-09 14:00:31,908  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960004
2018-10-09 14:00:31,944  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960007
2018-10-09 14:00:33,391  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:33,396  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:33,424  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 1336ms for sessionid 0x0
2018-10-09 14:00:33,430  INFO ClientCnxn:1299 - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x10017568796000b, negotiated timeout = 30000
2018-10-09 14:00:33,517  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:33,516  INFO ZkClient:713 - zookeeper state changed (SyncConnected)
2018-10-09 14:00:34,399  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:34,354  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 1336ms for sessionid 0x0, closing socket connection and attempting reconnect
2018-10-09 14:00:34,433  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:34,475  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:34,476  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:34,485  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 968ms for sessionid 0x0
2018-10-09 14:00:34,488  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 968ms for sessionid 0x0, closing socket connection and attempting reconnect
2018-10-09 14:00:37,472  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:37,484  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:37,487  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:37,488  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:37,489  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:37,479  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960006 has expired
2018-10-09 14:00:37,495  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960006 has expired, closing socket connection
2018-10-09 14:00:37,447  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 14:00:37,479  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 14:00:37,519  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@69b0fd6f
2018-10-09 14:00:37,519  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@4a87761d
2018-10-09 14:00:37,446  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960005 has expired
2018-10-09 14:00:37,519  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960005 has expired, closing socket connection
2018-10-09 14:00:37,765  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 14:00:37,780  INFO ZkClient:713 - zookeeper state changed (Expired)
2018-10-09 14:00:37,780  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960003 has expired
2018-10-09 14:00:37,791  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960003 has expired, closing socket connection
2018-10-09 14:00:38,194  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@3aeaafa6
2018-10-09 14:00:37,995  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 507ms for sessionid 0x0
2018-10-09 14:00:52,148  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 507ms for sessionid 0x0, closing socket connection and attempting reconnect
2018-10-09 14:00:38,198  INFO ZooKeeper:438 - Initiating client connection, connectString= sessionTimeout=500 watcher=org.I0Itec.zkclient.ZkClient@491cc5c9
2018-10-09 14:00:52,141  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960006
2018-10-09 14:00:52,128  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:52,154  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:52,126  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:00:52,179  INFO ClientCnxn:876 - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-10-09 14:00:38,010  WARN ClientCnxn:1285 - Unable to reconnect to ZooKeeper service, session 0x100175687960002 has expired
2018-10-09 14:00:52,231  INFO ClientCnxn:1154 - Unable to reconnect to ZooKeeper service, session 0x100175687960002 has expired, closing socket connection
2018-10-09 14:00:52,683  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 504ms for sessionid 0x0
2018-10-09 14:05:12,238  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:05:12,176  INFO ClientCnxn:1032 - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-10-09 14:08:21,078  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960002
2018-10-09 14:05:12,113  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 259911ms for sessionid 0x10017568796000b
2018-10-09 14:08:21,107  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 259911ms for sessionid 0x10017568796000b, closing socket connection and attempting reco
nnect
2018-10-09 14:05:12,098  INFO ClientCnxn:519 - EventThread shut down for session: 0x100175687960003
2018-10-09 14:00:52,677  WARN ClientCnxn:1108 - Client session timed out, have not heard from server in 501ms for sessionid 0x0
2018-10-09 14:08:21,107  INFO ClientCnxn:1156 - Client session timed out, have not heard from server in 501ms for sessionid 0x0, closing socket connection and attempting reconnect

 經大概看了下帖子https://blog.csdn.net/xjping0794/article/details/77784171的內容,查看該段時間系統io,確實很高,高達50%,如下:

14時00分28秒       sda   3062.38 922268.58    670.77    301.38      5.17      1.71      0.16     49.44
14時00分28秒   ol-root   3111.77 922266.41    495.79    296.54      5.29      1.70      0.16     49.43
14時00分28秒   ol-swap     22.04      2.09    174.24      8.00      0.13      5.80      0.15      0.33
14時11分16秒       sda   5432.75 1537105.34    768.61    283.07     19.06      3.53      0.17     91.53
14時11分16秒   ol-root   5513.26 1537106.56    731.82    278.93     19.55      3.54      0.17     91.52
14時11分16秒   ol-swap      5.07      4.68     35.87      8.00      0.01      2.27      0.19      0.10

14時11分16秒       DEV       tps  rd_sec/s  wr_sec/s  avgrq-sz  avgqu-sz     await     svctm     %util
14時20分01秒       sda   2784.00 795332.59    462.60    285.85     10.89      3.93      0.18     50.09
14時20分01秒   ol-root   2827.44 795311.85    414.30    281.43     11.18      3.95      0.18     50.07
14時20分01秒   ol-swap      6.96     12.98     42.72      8.00      0.05      7.80      0.18      0.12
14時30分01秒       sda      3.13     12.42     59.59     23.04      0.00      0.57      0.44      0.14

但是這段時間沒有東西特別在運行,這就比較奇怪了,那會兒一下子也忘了用iotop看下是哪個進程所致。上述帖子提到的幾點是:

關於ZK日志存放,官網給出如下建議:

Having a dedicated log devicehas a large impact on throughput and stable latencies. It is highly recommenedto dedicate a log device and set dataLogDir to point to a directory on thatdevice, and then make sure to point dataDir to a directory not residing on thatdevice.

在ZOO.CFG中增加:

      forceSync=no

  默認是開啟的,為避免同步延遲問題,ZK接收到數據后會立刻去講當前狀態信息同步到磁盤日志文件中,同步完成后才會應答。將此項關閉后,客戶端連接可以得到快速響應(這一點在有BMU的服務器上問題不大)。

   再看下zk服務器的日志,差不多時間開始出現大量CancelledKeyException:

2018-10-09 13:56:36,712 [myid:] - INFO  [SyncThread:0:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:14926 which had sessionid 0x100175687960008
2018-10-09 13:56:43,857 [myid:] - INFO  [SyncThread:0:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:14924 which had sessionid 0x100175687960006
2018-10-09 13:56:49,783 [myid:] - INFO  [SyncThread:0:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:14919 which had sessionid 0x100175687960001
2018-10-09 13:56:49,816 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@236] - Ignoring unexpected runtime exception
java.nio.channels.CancelledKeyException
	at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
	at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87)
	at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205)
	at java.lang.Thread.run(Thread.java:748)
2018-10-09 13:58:54,331 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23459
2018-10-09 13:58:54,377 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23459, probably expired
2018-10-09 13:58:54,401 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23485
2018-10-09 13:58:54,441 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23494
2018-10-09 13:58:56,314 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23459 which had sessionid 0x10017
5687960000
2018-10-09 13:58:56,336 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23485
2018-10-09 13:58:56,392 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23485, probably expired
2018-10-09 13:58:57,890 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23497
2018-10-09 13:58:59,480 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23485 which had sessionid 0x10017
5687960000
2018-10-09 13:59:00,383 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23494
2018-10-09 13:59:00,910 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23494, probably expired
2018-10-09 13:59:02,140 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23507
2018-10-09 13:59:03,286 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23497
2018-10-09 13:59:03,671 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23494 which had sessionid 0x10017
5687960000
2018-10-09 13:59:03,905 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23497, probably expired
2018-10-09 13:59:05,341 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@236] - Ignoring unexpected runtime exception
java.nio.channels.CancelledKeyException
	at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
	at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87)
	at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205)
	at java.lang.Thread.run(Thread.java:748)
2018-10-09 13:59:06,862 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23511
2018-10-09 13:59:10,044 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23507
2018-10-09 13:59:10,267 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23497 which had sessionid 0x10017
5687960000
2018-10-09 13:59:10,285 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23507, probably expired
2018-10-09 13:59:10,286 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@236] - Ignoring unexpected runtime exception
java.nio.channels.CancelledKeyException
	at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
	at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87)
	at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205)
	at java.lang.Thread.run(Thread.java:748)
2018-10-09 13:59:10,287 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23507 which had sessionid 0x10017
5687960000
2018-10-09 13:59:10,287 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23511
2018-10-09 13:59:10,287 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23511, probably expired
2018-10-09 13:59:10,313 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23519
2018-10-09 13:59:10,313 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23511 which had sessionid 0x10017
5687960000
2018-10-09 13:59:10,314 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /192.168.223.137:23524
2018-10-09 13:59:10,314 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23519
2018-10-09 13:59:10,314 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23519, probably expired
2018-10-09 13:59:10,315 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@941] - Client attempting to renew session 0x100175687960000 at /192.168.223.137:23524
2018-10-09 13:59:10,315 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /192.168.223.137:23519 which had sessionid 0x10017
5687960000
2018-10-09 13:59:10,316 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@686] - Invalid session 0x100175687960000 for client /192.168.223.137:23524, probably expired
2018-10-09 13:59:10,321 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@236] - Ignoring unexpected runtime exception
java.nio.channels.CancelledKeyException
	at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
	at sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87)
	at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205)
	at java.lang.Thread.run(Thread.java:748)

  上述帖子中提到在3.4.8中修復,我們用的3.4.12。進一步查找,有些提及寫日志延遲很大,例如“fsync-ing the write ahead log in SyncThread:0 took 8001ms which will adversely effect operation latency. See the ZooKeeper troubleshooting guide ”但是日志中並沒有看到該告警。決定加上forceSync=no試試看,參考https://www.jianshu.com/p/73eec030db86。

  至於日志中的超時時間有長、有短,這是tickTime有關,可以解釋,不做詳細說明。

zk日志中大量下列錯誤信息:

id:0x9d zxid:0x42 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics
2018-10-09 12:01:07,918 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx
id:0xa5 zxid:0x45 txntype:-1 reqpath:n/a Error Path:/brokers/topics/uft_individual/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/uft_individual/partitions/0
2018-10-09 12:01:07,921 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx
id:0xa6 zxid:0x46 txntype:-1 reqpath:n/a Error Path:/brokers/topics/uft_individual/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/uft_individual/partitions
2018-10-09 12:01:17,740 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:setData c
xid:0xaf zxid:0x4a txntype:-1 reqpath:n/a Error Path:/config/topics/uft_splitter Error:KeeperErrorCode = NoNode for /config/topics/uft_splitter
2018-10-09 12:01:17,741 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx
id:0xb0 zxid:0x4b txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics
2018-10-09 12:01:17,753 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx
id:0xb8 zxid:0x4e txntype:-1 reqpath:n/a Error Path:/brokers/topics/uft_splitter/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/uft_splitter/partitions/0
2018-10-09 12:01:17,754 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:create cx
id:0xb9 zxid:0x4f txntype:-1 reqpath:n/a Error Path:/brokers/topics/uft_splitter/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/uft_splitter/partitions
2018-10-09 12:01:35,671 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x100175687960000 type:setData c
xid:0xc2 zxid:0x53 txntype:-1 reqpath:n/a Error Path:/config/topics/cres_global Error:KeeperErrorCode = NoNode for /config/topics/cres_global

  參考https://github.com/mesos/kafka/issues/136,可是kafka服務一直正常啟動着啊(對比啟動日志也可以看出確實已經啟動了)。https://stackoverflow.com/questions/34393837/zookeeper-kafka-error-keepererrorcode-nodeexists還有一個原因,是因為zk的data未刪除的原因,可我們是全新安裝過一會也有這個問題。最后查看https://stackoverflow.com/questions/43559328/got-user-level-keeperexception-when-processing,如下:

The message you see is not an error yet. It is a potential exception raised by Zookeeper that original object making a request has to handle.

When you start a fresh Kafka, it gets a bunch of NoNode messages. It's normal because some paths don't exist yet. At the same time, you get also NodeExists messages as the path exists already.

Example: Error:KeeperErrorCode = NoNode for /config/topics/test It's because Kafka sends a request to Zookeeper for this path. But it doesn't exist. That's OK, because you are trying to create it. So, you see "INFO" from Zookeeper but no error from Kafka. Once Kafka gets this message, it tries to create your topic. To do so, it needs to access a path in Zookeeper for topics. So, it sends a request and gets an error NodeExists for /config/topics. Again, it's normal and Kafka ignores the message.

Long story short, these are all non-issue messages and you should skip them. If it bothers you, change logging configuration of Zookeeper (it's not recommended though).

其實就是提示性信息,不用管它就好了,kafka會直接忽略該信息。

編譯不報錯,啟動時報下列錯誤:

java.lang.NoSuchMethodError:
 org.apache.kafka.clients.consumer.KafkaConsumer.subscribe

  原因:編譯依賴的kafka client版本和運行時不一致如0.9.1和0.11.0,典型的例如間接依賴,對比下編譯依賴的版本和運行時打出來的版本。

客戶端警告:

  [WARN] consumer-0-C-1 org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:846) Error while fetching metadata with correlation id 105 : {hs:ta:channel:Metadata=INVALID_TOPIC_EXCEPTION} 

解決方法:主題名不能包含:字符。

 

客戶端錯誤

Exception in thread "CDC-Kafka-[hsta.cdcRecord_F6]-Consumer-1" Exception in thread "CDC-Kafka-[hsta.cdcRecord_F7]-Consumer-1" org.apache.kafka.common.errors.WakeupException
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:487)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)

解決方法:

 

客戶端錯誤

org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'correlation_id': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:53)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:435)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
首先檢查,確保配置正確,比如是不是連接到了正確的kafka版本,其次客戶端的版本是不是比server高。
 

客戶端錯誤:

[] 2020-01-09 13:35:19 [32026] [ERROR] consumer-0-C-1 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:692) Container exception
org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [hs:ta:channel:Metadata]

解決方法:主題名不能包含:字符。

kafka后台啟動方式

  默認情況下,執行

./kafka-server-start.sh ../config/server.properties的時候,進程是前台模式的,意味着關掉控制台,kafka就停了。所以需要加-daemon選項以后台模式啟動。如下:
./kafka-server-start.sh -daemon ../config/server.properties

提交失敗

//自動提交失敗,也包括手工提交失敗。
Auto-commit of offsets {wmBiz_topic_india_people_articletype_total_dev2-2=OffsetAndMetadata{offset
=191084, metadata=''}, wmBiz_topic_india_people_articletype_total_dev2-3=OffsetAndMetadata{offset=196003, metadata=''}} failed for group group_db_2_es
_people_articletype_total_matrix_dev: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the pol
l loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.
  對kafka應用而言,在consumer消費性能慢的情況下,在指定時間內無法自動提交,如果觸發上述條件出現rebalanced,數據重新發送到新的consumer上消費,就會出現數據重復消費問題,如果一直都在間隔時間內無法完成消費,就會出現重復消費但offset不變的死循環。
因此,我們需要做到:
  • 1、保證consumer的消費性能足夠快
  • 2、盡量讓consumer邏輯冪等
  • 3、通過配置優化解決重復消費
  • 4、不單單做好消息積壓的監控,還要做消息消費速度的監控(前后兩次offset比較)
  所以可以設置加大max.poll.interval.ms(其默認值是3秒,而不是網傳的300秒,看server.properties,所以太小了)或減少max.poll.records。

  我們使用的Kafka的api, 調用的是KafkaConsumer的poll方法,該方法調用了pollOnce方法,該方法又調用了ConsumerCoordinator 的poll方法,該方法的最后調用了自動offset同步的方法,關鍵就在這個方法,這個方法只有在poll的時候才會調用,如果數據處理時間操過poll的最大時間,就會導致本文開始的錯誤,而不能提交offset.

  將max.poll.interval.ms加到300后,不再報錯。
  注意:kafka在0.9版本無max.poll.records參數,默認拉取記錄是500,直到0.10版本才引入該參數,所以在0.9版本配置是無效的。
  在不確定數據情況下推薦一個線程消費kafka數據,消費后將數據傳給其他線程處理。
  這里涉及到fetch時間、最大fetch大小、處理時間、超時時間直接的各種關系,還需要梳理下。
  相關工作機制參考:https://juejin.im/post/5b4454a4e51d451908693763

消費慢的各種問題

https://blog.csdn.net/qq_16681169/article/details/101081656

https://blog.csdn.net/qq_40384985/article/details/90675986

https://blog.csdn.net/qq_34894188/article/details/80554570

問題

[2018-10-14 21:20:28,860] INFO [Consumer clientId=consumer-1, groupId=console-consumer-28334] Discovered group coordinator wxoddc2nn1:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 

https://www.jianshu.com/p/0e7e38c98d13 解決方法 

kafka:為什么說Kafka使用磁盤比內存快(這個結論本質上是說不正確的,為什么呢?因為它的機制是連續讀寫為主,因此只是把活丟給了OS的頁面緩存而已,mongodb亦如此。你可以說這個決定不一定比自己做差,但是說磁盤比內存快就有點過了)

  Kafka最核心的思想是使用磁盤,而不是使用內存,可能所有人都會認為,內存的速度一定比磁盤快,我也不例外。

  在看了Kafka的設計思想,查閱了相應資料再加上自己的測試后,發現磁盤的順序讀寫速度(Cassandra, LevelDB, RocksDB也都是這種策略)和內存持平。      

   而且Linux對於磁盤的讀寫優化也比較多,包括read-ahead和write-behind,磁盤緩存等。

  如果在內存做這些操作的時候,一個是JAVA對象的內存開銷很大,另一個是隨着堆內存數據的增多,JAVA的GC時間會變得很長,而利用OS的page cache,gc的開銷就節省了不少(JNI是否也可以達到類似效果???,起碼netty中的ByteBuffer及Unsafe一大部分是的)。

  使用磁盤操作有以下幾個好處:

  • 磁盤緩存由Linux系統維護,減少了java程序員的不少工作。
  • 磁盤順序讀寫速度超過內存隨機讀寫。
  • JVM的GC效率低,內存占用大。
  • 系統冷啟動后,磁盤緩存依然可用。

優化

  • 消息刪除方面,使用實時標記代替刪除。
  • 發送方面,使用批量發送代替實時發送。
  • 在jvm方面,默認kafka用的是cms gc,可以考慮g1垃圾回收期,調整為:-server -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true

其他關鍵實踐

Kafka uses tombstones instead of deleting records right away
❖ Kafka producers support record batching. by the size of records and auto-flushed based on time
❖ Batching is good for network IO throughput.
❖ Batching speeds up throughput drastically

With Kafka consumers pull data from brokers replica.lag.time.max.ms > lag時,leader就把follow從ISRs踢掉

If all replicas are down for a partition, Kafka chooses first replica (not necessarily in ISR set) that comes alive as the leader
❖ Config unclean.leader.election.enable=true is default

❖ If unclean.leader.election.enable=false, if all replicas are down for a partition, Kafka waits for the ISR member that comes alive as new leader.

Outside of using a single ensemble(協調器,zookeeper) for multiple Kafka clusters, it is not recommended
to share the ensemble with other applications, if it can be avoided. Kafka is sensitive
to Zookeeper latency and timeouts, and an interruption in communications with the
ensemble will cause the brokers to behave unpredictably. This can easily cause multiple
brokers to go offline at the same time, should they lose Zookeeper connections,
which will result in offline partitions. It also puts stress on the cluster controller,
which can show up as subtle errors long after the interruption has passed, such as
when trying to perform a controlled shutdown of a broker. Other applications that
can put stress on the Zookeeper ensemble, either through heavy usage or improper
operations, should be segregated to their own ensemble.

Kafka與MQTT

  不同於rabbitmq、active mq,kafka默認不支持MQTT協議,如果希望現有和rabbitmq通過MQTT對接的應用無縫切換,要么自己寫gateway,要么借用三方插件,比較正統的主要有https://www.confluent.io/connector/kafka-connect-mqtt/,https://www.infoq.cn/article/fdbcrh6I*9ajCWLvippC

參考

  • https://my.oschina.net/u/3853586/blog/3138825(相關總結性概念,還不錯)
  • https://kafka.apache.org/documentation(single page模式)
  • http://cloudurable.com/blog/kafka-architecture/index.html
  • https://cwiki.apache.org/confluence/display/KAFKA/
  • http://cloudurable.com/ppt/4-kafka-detailed-architecture.pdf 
  • Learning Apache Kafka Second Edition (針對0.8.x版本)
  • Kafka: The Definitive Guide(針對0.9.x版本)
  • https://cwiki.apache.org/confluence/display/KAFKA/Kafka+papers+and+presentations
  • https://www.iteblog.com/archives/2605.html(32 道常見的 Kafka 面試題你都會嗎)
  • https://www.confluent.io/resources/?_ga=2.259543491.831483530.1575092374-1512510254.1571457212(如果說percona server是依托mysql的主要商業版公司的話,confluent則是依托kafka的商業公司)
  • https://www.jianshu.com/p/d2cbaae38014(kafka運維天坑,版本有點老,0.9.1系列的,但是針對服務器的一些問題值得參考下)

相關最佳實踐參考:

  • https://dzone.com/articles/20-best-practices-for-working-with-apache-kafka-at


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM