1)包的選擇
confluent-kafka-go使用了rdkafka的c庫,破壞了go代碼的收斂,不使用;
sarama不支持groud id 的功能,寫consumer需要自己管理消費的partition,offset;很難用;
sarama-cluster是對sarama的一層封裝,實現了group id 功能
2)關於offset問題
sarama-cluster有auto commit的功能,默認是一秒;但最好自己管理,如每100條數據MarkOffset,並CommitOffsets
3)實現consumer的Priority MQ功能
如1-5優先級的5個Topic,傳入
map[string]int32 {
topic1: 1,
topic2: 2,
....
}
按Priority生成排序的consumerList,for循環遍歷consume,<-consumer.Messages(),select之並設置default分支
4)producer
producer使用的AsyncProducer的對象池;測試:本機1K以上message大小,producer池可提升效率,原因是I/O時間長,單一Producer發送效率受限;小message(10byte),單個producer發送效率要高,瓶頸在producer池的頻繁Get與Put
5)網絡問題時,consumer會自動重連;
https://github.com/Shopify/sarama/issues/72
6)接收producer的Errors() chan一定要用for _, err := range producer.Errors();勿用for{}否則producer意外關閉,這里會死循環;
for {
err := <-producer.Errors() // 錯誤示例;若producer意外關掉,此外err一直返回nil,跑滿CPU
if err != nil {
// print log
}
}