Golang中如何正確的使用sarama包操作Kafka?


Golang中如何正確的使用sarama包操作Kafka?

一、背景

在一些業務系統中,模塊之間通過引入Kafka解藕,拿IM舉例( 圖來源):
用戶A給B發送消息,msg_gateway收到消息后,投遞消息到Kafka后就給A返回發送成功。這個時候,其實還沒有持久化到mysql中,雖然最終會保持一致性。所以,試想如果Kafka丟消息了,是不是就出大問題了?A認為給B發送消息成功了,但是在服務器內部消息丟失了B並沒有收到。
 
所以,在使用Kafka的時候,有一些業務對消息丟失問題非常的關注。
 
同樣,常見的問題還有:
  • 重復消費的問題。
  • 亂序的問題。
 
下面我們來一起看一下如何使用sarama包來解決這些問題。
 

二、Kafka消息丟失問題描述

以下內容來源  kafka什么時候會丟消息
 
上面我們擔心的點需要進一步明確一下丟消息的定義:kafka集群中的部分或全部broker掛了,導致consumer沒有及時收到消息,這不屬於丟消息。broker掛了,只要消息全部持久化到了硬盤上,重啟broker集群之后,使消費者繼續拉取消息,消息就沒有丟失,仍然全量消費了。所以我的理解,所謂丟消息,意味着: 開發人員未感知到哪些消息沒有被消費
 
作者把消息的丟失歸納了以下幾種情況:
1) producer把消息發送給broker,因為網絡抖動,消息沒有到達broker,且開發人員無感知。
解決方案: producer設置acks參數,消息同步到master之后返回ack信號,否則拋異常使應用程序感知到並在業務中進行重試發送。這種方式一定程度保證了消息的可靠性,producer等待broker確認信號的時延也不高。
 
2)producer把消息發送給broker-master,master接收到消息,在未將消息同步給follower之前,掛掉了,且開發人員無感知。
解決方案: producer設置acks參數,消息同步到master且同步到所有follower之后返回ack信號,否則拋異常使應用程序感知到並在業務中進行重試發送。這樣設置,在更大程度上保證了消息的可靠性,缺點是producer等待broker確認信號的時延比較高。
 
3)producer把消息發送給broker-master,master接收到消息,master未成功將消息同步給每個follower,有消息丟失風險。
解決方案:同上。
 
4)某個broker消息尚未從內存緩沖區持久化到磁盤,就掛掉了,這種情況無法通過ack機制感知。
解決方案:設置參數,加快消息持久化的頻率,能在一定程度上減少這種情況發生的概率。但提高頻率自然也會影響性能。
 
5)consumer成功拉取到了消息,consumer掛了。
解決方案: 設置手動sync,消費成功才提交
 
綜上所述,集群/項目運轉正常的情況下,kafka不會丟消息。一旦集群出現問題,消息的可靠性無法完全保證。要想盡可能保證消息可靠,基本只能在發現消息有可能沒有被消費時,重發消息來解決。所以在業務邏輯中,要考慮消息的重復消費問題,對於關鍵環節,要有冪等機制。
 
作者的幾條建議:
1)如果一個業務很關鍵,使用kafka的時候要考慮丟消息的成本和解決方案。
2)producer端確認消息是否到達集群,若有異常,進行重發。
3) consumer端保障消費冪等性
4)運維保障集群運轉正常且高可用,保障網絡狀況良好。
 

三、生產端丟消息問題解決

上面說了,只需要把 producer設置acks參數,等待Kafka所有follower都成功后再返回。我們只需要進行如下設置:
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // -1

ack參數有如下取值:

const (
    // NoResponse doesn't send any response, the TCP ACK is all you get.
    NoResponse RequiredAcks = 0
    // WaitForLocal waits for only the local commit to succeed before responding.
    WaitForLocal RequiredAcks = 1
    // WaitForAll waits for all in-sync replicas to commit before responding.
    // The minimum number of in-sync replicas is configured on the broker via
    // the `min.insync.replicas` configuration key.
    WaitForAll RequiredAcks = -1
)

  

四、消費端丟消息問題

通常消費端丟消息都是因為Offset自動提交了,但是數據並沒有插入到mysql(比如出現BUG或者進程Crash),導致下一次消費者重啟后,消息漏掉了,自然數據庫中也查不到。這個時候,我們可以通過手動提交解決,甚至在一些復雜場景下,還要使用二階段提交。
 

自動提交模式下的丟消息問題

默認情況下,sarama是自動提交的方式,間隔為1秒鍾
// NewConfig returns a new configuration instance with sane defaults.
func NewConfig() *Config {
   // …
   c.Consumer.Offsets.AutoCommit.Enable = true. // 自動提交
   c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // 間隔
   c.Consumer.Offsets.Initial = OffsetNewest
   c.Consumer.Offsets.Retry.Max = 3
   // ...
}

這里的自動提交,是基於被標記過的消息(sess.MarkMessage(msg, “"))

type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
      // 標記消息已處理,sarama會自動提交
      sess.MarkMessage(msg, "")
   }
   return nil
}

如果不調用sess.MarkMessage(msg, “"),即使啟用了自動提交也沒有效果,下次啟動消費者會從上一次的Offset重新消費,我們不妨注釋掉sess.MarkMessage(msg, “"),然后打開Offset Explorer查看:

 
那么這樣,我們大概理解了sarama自動提交的原理:先標記再提交。我們只需要保持標記邏輯在插入mysql代碼之后即可確保不會出現丟消息的問題:
 
正確的調用順序:
func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      // 插入mysql
      insertToMysql(msg)
 
      // 正確:插入mysql成功后程序崩潰,下一次頂多重復消費一次,而不是因為Offset超前,導致應用層消息丟失了
      sess.MarkMessage(msg, “")
   }
   return nil
}

  

錯誤的順序:
func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      // 錯誤1:不能先標記,再插入mysql,可能標記的時候剛好自動提交Offset,但mysql插入失敗了,導致下一次這個消息不會被消費,造成丟失
      // 錯誤2:干脆忘記調用sess.MarkMessage(msg, “"),導致重復消費
      sess.MarkMessage(msg, “")
 
      // 插入mysql
      insertToMysql(msg)
   }
   return nil
}

sarama手動提交模式

當然,另外也可以通過手動提交來處理丟消息的問題,但是個人不推薦,因為自動提交模式下已經能解決丟消息問題。
consumerConfig := sarama.NewConfig()
consumerConfig.Version = sarama.V2_8_0_0
consumerConfig.Consumer.Return.Errors = false
consumerConfig.Consumer.Offsets.AutoCommit.Enable = false  // 禁用自動提交,改為手動
consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
 
func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
 
      // 插入mysql
      insertToMysql(msg)
 
      // 手動提交模式下,也需要先進行標記
      sess.MarkMessage(msg, "")
 
      consumerCount++
      if consumerCount%3 == 0 {
         // 手動提交,不能頻繁調用,耗時9ms左右,macOS i7 16GB
         t1 := time.Now().Nanosecond()
         sess.Commit()
         t2 := time.Now().Nanosecond()
         fmt.Println("commit cost:", (t2-t1)/(1000*1000), "ms")
      }
   }
   return nil
}

   

五、Kafka消息順序問題

投遞Kafka之前,我們通過一次gRPC調用解決了消息序號的生成問題,但是這里其實還涉及一個消息順序問題:訂閱Kafka的消費者如何按照消息順序寫入mysql,而不是隨機寫入呢?
我們知道,Kafka的消息在一個partition中是有序的,所以 只要確保發給某個人的消息都在同一個partition中即可
 
1.全局一個partition
這個最簡單,但是在kafka中一個partition對應一個線程,所以這種模型下Kafka的吞吐是個問題。
 
2.多個partition,手動指定
msg := &sarama.ProducerMessage{
   Topic: “msgc2s",
   Value: sarama.StringEncoder(“hello”),
   Partition: toUserId % 10,
}
partition, offset, err := producer.SendMessage(msg)

生產消息的時候,除了Topic和Value,我們可以通過手動指定partition,比如總共有10個分區,我們根據用戶ID取余,這樣發給同一個用戶的消息,每次都到1個partition里面去了,消費者寫入mysql中的時候,自然也是有序的。

但是,因為分區總數是寫死的,萬一Kafka的分區數要調整呢?那不得重新編譯代碼?所以這個方式不夠優美。
 
3.多個partition,自動計算
kafka客戶端為我們提供了這種支持。首先,在初始化的時候,設置選擇分區的策略為Hash
p.config.Producer.Partitioner = sarama.NewHashPartitioner

然后,在生成消息之前,設置消息的Key值:

msg := &sarama.ProducerMessage{
   Topic: "testAutoSyncOffset",
   Value: sarama.StringEncoder("hello"),
   Key: sarama.StringEncoder(strconv.Itoa(RecvID)),
}
 
Kafka客戶端會根據Key進行Hash,我們通過把接收用戶ID作為Key,這樣就能讓所有發給某個人的消息落到同一個分區了,也就有序了。
 

4.擴展知識:多線程情況下一個partition的亂序處理

我們上面說了,Kafka客戶端針對一個partition開一個線程進行消費,如果處理比較耗時的話,比如處理一條消息耗時幾十 ms,那么 1 秒鍾就只能處理幾十條消息,這吞吐量太低了。這個時候,我們可能就把邏輯移動到其他線程里面去處理,這樣的話,順序就可能會亂。
我們可以通過寫 N 個內存 queue,具有相同 key 的數據都到同一個內存 queue;然后對於 N 個線程,每個線程分別消費一個內存 queue 即可,這樣就能保證順序性。PS:就像4 % 10 = 4,14 % 10 = 4,他們取余都是等於4,所以落到了一個partition,但是key值不一樣啊,我們可以自己再取余,放到不同的queue里面。
 

六、重復消費和消息冪等

這篇文章中: kafka什么時候會丟消息 詳細了描述了各種丟消息的情況,我們通過設置 RequiredAcks = sarama.WaitForAll(-1),可以解決生產端丟消息的問題。第六節中也對消費端丟消息進行了說明,只需要確保在插入數據庫之后,調用sess.MarkMessage(msg, "”)即可。
 
如果出現了插入Mysql成功,但是因為自動提交有1秒的間隔,如果此時崩潰,下次啟動消費者勢必會對者1秒的數據進行重復消費,我們在應用層需要處理這個問題。
 
常見的有2種思路:
  1. 如果是存在redis中不需要持久化的數據,比如string類型,set具有天然的冪等性,無需處理。
  2. 插入mysql之前,進行一次query操作,針對每個客戶端發的消息,我們為它生成一個唯一的ID(比如GUID),或者直接把消息的ID設置為唯一索引。
 
第2個方案的難點在於,全局唯一ID的生成,理論上GUID也是存在重復的可能性的,如果是客戶端生成,那么插入失敗,怎么讓客戶端感知呢?所以,這里我認為還是需要自定義ID生產,比如通過組合法:用戶ID + 當前時間 + 32位GUID,是不是幾乎不會重復了呢(試想,1個人發1億條文本需要多少年。。。)?
 

七、完整代碼實例

consumer.go
type msgConsumerGroup struct{}
 
func (msgConsumerGroup) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (msgConsumerGroup) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
 
      // 查mysql去重
      if check(msg) {
          // 插入mysql
          insertToMysql()
      }
 
      // 標記,sarama會自動進行提交,默認間隔1秒
      sess.MarkMessage(msg, "")
   }
   return nil
}
 
func main(){
    consumerConfig := sarama.NewConfig()
    consumerConfig.Version = sarama.V2_8_0_0 // specify appropriate version
    consumerConfig.Consumer.Return.Errors = false
    //consumerConfig.Consumer.Offsets.AutoCommit.Enable = true      // 禁用自動提交,改為手動
    //consumerConfig.Consumer.Offsets.AutoCommit.Interval = time.Second * 1 // 測試3秒自動提交
    consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
 
    cGroup, err := sarama.NewConsumerGroup([]string{"10.0.56.153:9092", "10.0.56.153:9093", "10.0.56.153:9094"},"testgroup", consumerConfig)
    if err != nil {
       panic(err)
    }
 
   for {
      err := cGroup.Consume(context.Background(), []string{"testAutoSyncOffset"}, consumerGroup)
      if err != nil {
         fmt.Println(err.Error())
         break
      }
   }
 
   _ = cGroup.Close()
}

  

producer.go
func main(){
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有follower都回復ack,確保Kafka不會丟消息
    config.Producer.Return.Successes = true
    config.Producer.Partitioner = sarama.NewHashPartitioner // 對Key進行Hash,同樣的Key每次都落到一個分區,這樣消息是有序的
 
    // 使用同步producer,異步模式下有更高的性能,但是處理更復雜,這里建議先從簡單的入手
    producer, err := sarama.NewSyncProducer([]string{"10.0.56.153:9092"}, config)
    defer func() {
       _ = producer.Close()
    }()
    if err != nil {
       panic(err.Error())
    }
 
    msgCount := 4
    // 模擬4個消息
    for i := 0; i < msgCount; i++ {
        rand.Seed(int64(time.Now().Nanosecond()))
        msg := &sarama.ProducerMessage{
          Topic: "testAutoSyncOffset",
          Value: sarama.StringEncoder("hello+" + strconv.Itoa(rand.Int())),
          Key:   sarama.StringEncoder("BBB”),
        }
 
        t1 := time.Now().Nanosecond()
        partition, offset, err := producer.SendMessage(msg)
        t2 := time.Now().Nanosecond()
 
        if err == nil {
            fmt.Println("produce success, partition:", partition, ",offset:", offset, ",cost:", (t2-t1)/(1000*1000), " ms")
        } else {
            fmt.Println(err.Error())
        }
    }
}

  

八、參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM