直接上代码
package main import ( "context" "fmt" "github.com/Shopify/sarama" ) var Consumer sarama.Consumer func main() { var err error Consumer, err = sarama.NewConsumer([]string{".....:9092"}, nil) if err != nil { fmt.Printf("fail to start consumer,err:%v\n", err) return } topics := []string{"redis", "mysql", "nsq"} //topics := []string{"nsq"} //topic := "nsq" for _, topic := range topics { ctx, _ := context.WithCancel(context.Background()) go testTopic(topic, ctx) } select {} } func testTopic(topic string, ctx context.Context) { partitionList, err := Consumer.Partitions(topic) fmt.Println(partitionList) if err != nil { fmt.Printf("fail to start consumer partition,err:%v\n", err) return } for partition := range partitionList { // 遍历所有的分区,并且针对每一个分区建立对应的消费者 pc, err := Consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("fail to start consumer for partition %d,err:%v\n", partition, err) return } defer pc.AsyncClose() go testGetMsg(pc, ctx) } for { select { case <-ctx.Done(): return default: continue } } } func testGetMsg(partitionConsumer sarama.PartitionConsumer, ctx context.Context) { for msg := range partitionConsumer.Messages() { fmt.Printf("Partition:%d Offset:%v Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value)) select { case <-ctx.Done(): return default: continue } } }
注意,testTopic这个函数一定不能够提前退出,如果退出了就不能消费到数据了。testTopic这个函数中有一个遍历partitionlist的循环,这个循环和testGetMsg这个函数中的遍历消息的循环是不一样的,如果是下面的遍历消息的循环,他不会主动退出循环,相当于是一个死循环,如果是上面的循环,那么就会退出循环,不是一个死循环,所以我在后面加了一个for循环,这个循环保证了testTopic这个函数不会主动退出。