直接上代碼
package main
import (
"context"
"fmt"
"github.com/Shopify/sarama"
)
var Consumer sarama.Consumer
func main() {
var err error
Consumer, err = sarama.NewConsumer([]string{".....:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer,err:%v\n", err)
return
}
topics := []string{"redis", "mysql", "nsq"}
//topics := []string{"nsq"}
//topic := "nsq"
for _, topic := range topics {
ctx, _ := context.WithCancel(context.Background())
go testTopic(topic, ctx)
}
select {}
}
func testTopic(topic string, ctx context.Context) {
partitionList, err := Consumer.Partitions(topic)
fmt.Println(partitionList)
if err != nil {
fmt.Printf("fail to start consumer partition,err:%v\n", err)
return
}
for partition := range partitionList {
// 遍歷所有的分區,並且針對每一個分區建立對應的消費者
pc, err := Consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("fail to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
go testGetMsg(pc, ctx)
}
for {
select {
case <-ctx.Done():
return
default:
continue
}
}
}
func testGetMsg(partitionConsumer sarama.PartitionConsumer, ctx context.Context) {
for msg := range partitionConsumer.Messages() {
fmt.Printf("Partition:%d Offset:%v Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
select {
case <-ctx.Done():
return
default:
continue
}
}
}
注意,testTopic這個函數一定不能夠提前退出,如果退出了就不能消費到數據了。testTopic這個函數中有一個遍歷partitionlist的循環,這個循環和testGetMsg這個函數中的遍歷消息的循環是不一樣的,如果是下面的遍歷消息的循環,他不會主動退出循環,相當於是一個死循環,如果是上面的循環,那么就會退出循環,不是一個死循環,所以我在后面加了一個for循環,這個循環保證了testTopic這個函數不會主動退出。
