直接上代碼
package main import ( "context" "fmt" "github.com/Shopify/sarama" ) var Consumer sarama.Consumer func main() { var err error Consumer, err = sarama.NewConsumer([]string{".....:9092"}, nil) if err != nil { fmt.Printf("fail to start consumer,err:%v\n", err) return } topics := []string{"redis", "mysql", "nsq"} //topics := []string{"nsq"} //topic := "nsq" for _, topic := range topics { ctx, _ := context.WithCancel(context.Background()) go testTopic(topic, ctx) } select {} } func testTopic(topic string, ctx context.Context) { partitionList, err := Consumer.Partitions(topic) fmt.Println(partitionList) if err != nil { fmt.Printf("fail to start consumer partition,err:%v\n", err) return } for partition := range partitionList { // 遍歷所有的分區,並且針對每一個分區建立對應的消費者 pc, err := Consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("fail to start consumer for partition %d,err:%v\n", partition, err) return } defer pc.AsyncClose() go testGetMsg(pc, ctx) } for { select { case <-ctx.Done(): return default: continue } } } func testGetMsg(partitionConsumer sarama.PartitionConsumer, ctx context.Context) { for msg := range partitionConsumer.Messages() { fmt.Printf("Partition:%d Offset:%v Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value)) select { case <-ctx.Done(): return default: continue } } }
注意,testTopic這個函數一定不能夠提前退出,如果退出了就不能消費到數據了。testTopic這個函數中有一個遍歷partitionlist的循環,這個循環和testGetMsg這個函數中的遍歷消息的循環是不一樣的,如果是下面的遍歷消息的循環,他不會主動退出循環,相當於是一個死循環,如果是上面的循環,那么就會退出循環,不是一個死循環,所以我在后面加了一個for循環,這個循環保證了testTopic這個函數不會主動退出。