記一個golang中用庫sarama消費kafka的坑


直接上代碼

package main

import (
	"context"
	"fmt"

	"github.com/Shopify/sarama"
)

var Consumer sarama.Consumer

func main() {
	var err error
	Consumer, err = sarama.NewConsumer([]string{".....:9092"}, nil)
	if err != nil {
		fmt.Printf("fail to start consumer,err:%v\n", err)
		return
	}
	topics := []string{"redis", "mysql", "nsq"}
	//topics := []string{"nsq"}
	//topic := "nsq"
	for _, topic := range topics {
		ctx, _ := context.WithCancel(context.Background())
		go testTopic(topic, ctx)
	}
	select {}
}
func testTopic(topic string, ctx context.Context) {
	partitionList, err := Consumer.Partitions(topic)
	fmt.Println(partitionList)
	if err != nil {
		fmt.Printf("fail to start consumer partition,err:%v\n", err)
		return
	}
	for partition := range partitionList {
		//	遍歷所有的分區,並且針對每一個分區建立對應的消費者
		pc, err := Consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("fail to start consumer for partition %d,err:%v\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		go testGetMsg(pc, ctx)
	}
	for {
		select {
		case <-ctx.Done():
			return
		default:
			continue
		}
	}
}
func testGetMsg(partitionConsumer sarama.PartitionConsumer, ctx context.Context) {
	for msg := range partitionConsumer.Messages() {
		fmt.Printf("Partition:%d Offset:%v Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
		select {
		case <-ctx.Done():
			return
		default:
			continue
		}
	}
}

  注意,testTopic這個函數一定不能夠提前退出,如果退出了就不能消費到數據了。testTopic這個函數中有一個遍歷partitionlist的循環,這個循環和testGetMsg這個函數中的遍歷消息的循環是不一樣的,如果是下面的遍歷消息的循環,他不會主動退出循環,相當於是一個死循環,如果是上面的循環,那么就會退出循環,不是一個死循環,所以我在后面加了一個for循環,這個循環保證了testTopic這個函數不會主動退出。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM