背景
在kafka的消費者中,如果消費某條消息出錯,會導致該條消息不會被ack,該消息會被不斷的重試,阻塞該分區的其他消息的消費,因此,為了保證消息隊列不被阻塞,在出現異常的情況下,我們一般還是會ack該條消息,再另外對失敗的情況進行重試
目標
實現一個完善的重試邏輯,一般需要考慮一下幾個因素:
- 重試的時間間隔
- 最大重試次數
- 是否會漏掉消息
實現
扔回隊尾
在消息出錯時,將消息扔回隊尾
優點:
- 實現簡單,沒有別的依賴項
缺點:
- 無法控制重試時間間隔
基於數據庫任務表的掃描方案
在數據庫中增加一個任務的狀態表,然后用一個定時任務去掃描任務表中,失敗的任務,然后進行重試,其中記錄下重試的次數即可
優點:
- 實現簡單,一般這種離線任務,根據統計的需求,都會有一個任務狀態表的,所以僅僅是增加一個定時任務去掃表
缺點:
- 性能較差,定時任務,一般都在無意義的掃描,浪費性能
新增重試隊列的方案
新增一個重試隊列,消費消息出錯時,將時間戳和消息發送到重試隊列,然后在重試隊列中,根據時間,來判斷阻塞時間,代碼如下:
func handleRetryEvent(ctx context.Context, conf *util.Conf, data []byte) (err error) {
defer common.Recover(ctx, &err)
log := common.Logger(ctx).WithField("Method", "consumer.handleRetryEvent")
retryEvent := &MergeRetryEvent{}
err = json.Unmarshal(data, retryEvent)
if err != nil {
log.WithError(err).Error("failed to unmarshal data")
return nil
}
log.WithField("contact_id", retryEvent.ContactId).Info("receive message")
delaySecond := (retryEvent.CreateTime + SLEEPSECOND) - time.Now().Unix()
if delaySecond <= 0 {
log.Info("send message to account merge event")
err = SendAccountMergeEventTopic(ctx, retryEvent.ContactId)
return err
} else {
log.Infof("sleep %d seconds", delaySecond)
time.Sleep(time.Duration(delaySecond) * time.Second)
err = SendAccountMergeEventTopic(ctx, retryEvent.ContactId)
return err
}
}
優點:
相對於掃表的方案,改方案沒有無意義的掃表操作,性能更好
注意:之前在網上看到一個重試隊列的實現,因為害怕開過多的線程(協程),作者用了一個channel來緩存重試消息,然后在一個協程池中去消費消息,消費的邏輯和上面的實例代碼差不多,這樣做是有風險的,因為channel是在本機的內存中,沒有本地存儲的,是存在丟消息的風險的(服務重啟等情況)
參考鏈接:
https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a
