kafka消費者重試邏輯的實現


背景

在kafka的消費者中,如果消費某條消息出錯,會導致該條消息不會被ack,該消息會被不斷的重試,阻塞該分區的其他消息的消費,因此,為了保證消息隊列不被阻塞,在出現異常的情況下,我們一般還是會ack該條消息,再另外對失敗的情況進行重試

目標

實現一個完善的重試邏輯,一般需要考慮一下幾個因素:

  1. 重試的時間間隔
  2. 最大重試次數
  3. 是否會漏掉消息

實現

扔回隊尾

在消息出錯時,將消息扔回隊尾

優點:

  1. 實現簡單,沒有別的依賴項

缺點:

  1. 無法控制重試時間間隔

基於數據庫任務表的掃描方案

在數據庫中增加一個任務的狀態表,然后用一個定時任務去掃描任務表中,失敗的任務,然后進行重試,其中記錄下重試的次數即可

優點:

  1. 實現簡單,一般這種離線任務,根據統計的需求,都會有一個任務狀態表的,所以僅僅是增加一個定時任務去掃表

缺點:

  1. 性能較差,定時任務,一般都在無意義的掃描,浪費性能

新增重試隊列的方案

新增一個重試隊列,消費消息出錯時,將時間戳和消息發送到重試隊列,然后在重試隊列中,根據時間,來判斷阻塞時間,代碼如下:

func handleRetryEvent(ctx context.Context, conf *util.Conf, data []byte) (err error) {
	defer common.Recover(ctx, &err)
	log := common.Logger(ctx).WithField("Method", "consumer.handleRetryEvent")
	retryEvent := &MergeRetryEvent{}
	err = json.Unmarshal(data, retryEvent)
	if err != nil {
		log.WithError(err).Error("failed to unmarshal data")
		return nil
	}
	log.WithField("contact_id", retryEvent.ContactId).Info("receive message")
	delaySecond := (retryEvent.CreateTime + SLEEPSECOND) - time.Now().Unix()
	if delaySecond <= 0 {
		log.Info("send message to account merge event")
		err = SendAccountMergeEventTopic(ctx, retryEvent.ContactId)
		return err
	} else {
		log.Infof("sleep %d seconds", delaySecond)
		time.Sleep(time.Duration(delaySecond) * time.Second)
		err = SendAccountMergeEventTopic(ctx, retryEvent.ContactId)
		return err
	}
}

優點:

相對於掃表的方案,改方案沒有無意義的掃表操作,性能更好

注意:之前在網上看到一個重試隊列的實現,因為害怕開過多的線程(協程),作者用了一個channel來緩存重試消息,然后在一個協程池中去消費消息,消費的邏輯和上面的實例代碼差不多,這樣做是有風險的,因為channel是在本機的內存中,沒有本地存儲的,是存在丟消息的風險的(服務重啟等情況)

參考鏈接:

https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM