golang中使用kafka客戶端sarama消費時需要注意的一個點


  1. kafka消費者的Consume()方法會阻塞;
  2. 當Consume()方法返回err時,不確定繼續消費有沒有問題;保險起見,退出進程,然后重新初始化。
  3. 當Consume()方法返回nil是,是可以繼續消費的,親測有效。

需要注意的點寫在了注釋里:

//StartKafkaConsumer 啟動kafka消費者
func StartKafkaConsumer(ctx context.Context) {
    //defer utils.ForPanic()   //當消費者出現問題的時候,通過panic退出進程。然后重新啟動初始化
          //因此代碼里不要加panic處理的機制
    config := sarama.NewConfig()
    config.Version = sarama.V2_1_0_0 // specify appropriate version
    config.Consumer.Return.Errors = true
    cfg := ReaderConfig.Config()
    group, err := sarama.NewConsumerGroup(
        []string{cfg.GetString("kafka.addr")},
        cfg.GetString("kafka.group_id"), config)
    if err != nil {
        my_logger.Errorf("sarama.NewConsumerGroup error, err=%s", err.Error())
        panic(err)
        //return
    }
    defer func() { _ = group.Close() }()

    // Track errors
    go func() {
        defer utils.ForPanic()
        for err := range group.Errors() {
            if err != nil {
                my_logger.Errorf("consumer error:%s", err.Error())
            }
        }
    }()
    topics := []string{cfg.GetString("kafka.topics")}
    queueSize := cfg.GetInt64("kafka.queue_size")
    if queueSize <= 0 {
        my_logger.Errorf("queueSize <= 0")
        panic("queue_size error")
    }
    log.Printf("queue size:%d\n", queueSize)
    handler := ConsumerGroupHandler{
        Pipe: make(chan []byte, queueSize),
    }
    coroutineCount := cfg.GetInt64("kafka.co_count")
    if coroutineCount <= 0 {
        my_logger.Errorf("coroutineCount <= 0")
        panic("coroutineCount error")
    }
    for i := 0; i < int(coroutineCount); i++ {
        go handler.Do()
    }
    log.Println("start success!")
    for {
        //關鍵代碼
        //正常情況下:Consume()方法會一直阻塞
        //我測試發現,約30分鍾左右,Consume()會返回,但沒有error
        //無error的情況下,可以重復調用Consume()方法
        //當有error產生的時候,不確定Consume()是否能夠繼續完善的執行。
        //因此保險的辦法是拋出panic,讓進程重啟。
        err = group.Consume(context.Background(), topics, &handler)
        if err != nil {
            my_logger.Errorf("group.Consume error: err=%s", err.Error())
            panic(err)
        } else {
            my_logger.Info("group.Consume exit")
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM