- kafka消費者的Consume()方法會阻塞;
- 當Consume()方法返回err時,不確定繼續消費有沒有問題;保險起見,退出進程,然后重新初始化。
- 當Consume()方法返回nil是,是可以繼續消費的,親測有效。
需要注意的點寫在了注釋里:
//StartKafkaConsumer 啟動kafka消費者
func StartKafkaConsumer(ctx context.Context) {
//defer utils.ForPanic() //當消費者出現問題的時候,通過panic退出進程。然后重新啟動初始化
//因此代碼里不要加panic處理的機制
config := sarama.NewConfig()
config.Version = sarama.V2_1_0_0 // specify appropriate version
config.Consumer.Return.Errors = true
cfg := ReaderConfig.Config()
group, err := sarama.NewConsumerGroup(
[]string{cfg.GetString("kafka.addr")},
cfg.GetString("kafka.group_id"), config)
if err != nil {
my_logger.Errorf("sarama.NewConsumerGroup error, err=%s", err.Error())
panic(err)
//return
}
defer func() { _ = group.Close() }()
// Track errors
go func() {
defer utils.ForPanic()
for err := range group.Errors() {
if err != nil {
my_logger.Errorf("consumer error:%s", err.Error())
}
}
}()
topics := []string{cfg.GetString("kafka.topics")}
queueSize := cfg.GetInt64("kafka.queue_size")
if queueSize <= 0 {
my_logger.Errorf("queueSize <= 0")
panic("queue_size error")
}
log.Printf("queue size:%d\n", queueSize)
handler := ConsumerGroupHandler{
Pipe: make(chan []byte, queueSize),
}
coroutineCount := cfg.GetInt64("kafka.co_count")
if coroutineCount <= 0 {
my_logger.Errorf("coroutineCount <= 0")
panic("coroutineCount error")
}
for i := 0; i < int(coroutineCount); i++ {
go handler.Do()
}
log.Println("start success!")
for {
//關鍵代碼
//正常情況下:Consume()方法會一直阻塞
//我測試發現,約30分鍾左右,Consume()會返回,但沒有error
//無error的情況下,可以重復調用Consume()方法
//當有error產生的時候,不確定Consume()是否能夠繼續完善的執行。
//因此保險的辦法是拋出panic,讓進程重啟。
err = group.Consume(context.Background(), topics, &handler)
if err != nil {
my_logger.Errorf("group.Consume error: err=%s", err.Error())
panic(err)
} else {
my_logger.Info("group.Consume exit")
}
}
}