一些Go操作Kafka的問題


1)包的選擇

confluent-kafka-go使用了rdkafka的c庫,破壞了go代碼的收斂,不使用;

sarama不支持groud id 的功能,寫consumer需要自己管理消費的partition,offset;很難用;

sarama-cluster是對sarama的一層封裝,實現了group id 功能

 

2)關於offset問題

sarama-cluster有auto commit的功能,默認是一秒;但最好自己管理,如每100條數據MarkOffset,並CommitOffsets

 

3)實現consumer的Priority MQ功能

如1-5優先級的5個Topic,傳入

map[string]int32 {

  topic1: 1,

  topic2: 2,

  ....

}

按Priority生成排序的consumerList,for循環遍歷consume,<-consumer.Messages(),select之並設置default分支

 

4)producer

producer使用的AsyncProducer的對象池;測試:本機1K以上message大小,producer池可提升效率,原因是I/O時間長,單一Producer發送效率受限;小message(10byte),單個producer發送效率要高,瓶頸在producer池的頻繁Get與Put

 

5)網絡問題時,consumer會自動重連;

https://github.com/Shopify/sarama/issues/72

 

6)接收producer的Errors() chan一定要用for _, err := range producer.Errors();勿用for{}否則producer意外關閉,這里會死循環;

for {

  err := <-producer.Errors()  // 錯誤示例;若producer意外關掉,此外err一直返回nil,跑滿CPU

  if err != nil {

    // print log

  }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM