在以前的文章kafka初探go和C#的實現里面我們用了sarama來消費kafka的消息,但是很遺憾它沒有group的概念。沒辦法 我們只能用sarama-cluster來實現, 注意sarama版本不要太新否則有錯誤panic: non-positive interval for NewTicker 問題處理,建議大家可以修改go.mod文件如下:
require ( github.com/Shopify/sarama v1.24.1 github.com/bsm/sarama-cluster v2.1.15+incompatible
package main import ( "context" "fmt" "log" "os" "os/signal" _ "regexp" "time" "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" ) var Address = []string{"192.168.100.30:9092"} var Topic = "gavintest" //panic: non-positive interval for NewTicker // 修改go.mod //github.com/Shopify/sarama v1.24.1 // github.com/bsm/sarama-cluster v2.1.15+incompatible //修改 /** 消費者 */ func main() { go syncConsumer("demo1") go syncConsumer("demo2") go ConsumerDemo3() go syncProducer() select {} } func syncConsumer(groupName string) { config := cluster.NewConfig() config.Consumer.Return.Errors = true config.Group.Return.Notifications = true // init consumer //可以訂閱多個主題 topics := []string{Topic} consumer, err := cluster.NewConsumer(Address, groupName, topics, config) if err != nil { panic(err) } //這里需要注意的是defer 一定要在panic 之后才能關閉連接 defer consumer.Close() // trap SIGINT to trigger a shutdown. signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // consume errors go func() { for err := range consumer.Errors() { log.Printf("Error: %s\n", err.Error()) } }() // consume notifications go func() { for ntf := range consumer.Notifications() { log.Printf("Rebalanced: %+v\n", ntf) } }() // 循環從通道中獲取message //msg.Topic 消息主題 //msg.Partition 消息分區 //msg.Offset //msg.Key //msg.Value 消息值 for { select { case msg, ok := <-consumer.Messages(): if ok { fmt.Printf("%s receive message %s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", groupName, msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) consumer.MarkOffset(msg, "") // 上報offset } case err := <-consumer.Errors(): { fmt.Println(fmt.Sprintf("consumer error:%v", err)) } case <-signals: return } } } //同步消息模式 func syncProducer() { //指定配置 config := sarama.NewConfig() // 等待服務器所有副本都保存成功后的響應 config.Producer.RequiredAcks = sarama.WaitForAll // 隨機的分區類型:返回一個分區器,該分區器每次選擇一個隨機分區 config.Producer.Partitioner = sarama.NewRandomPartitioner // 是否等待成功和失敗后的響應 config.Producer.Return.Successes = true config.Producer.Timeout = 5 * time.Second producer, err := sarama.NewSyncProducer(Address, config) if err != nil { log.Printf("sarama.NewSyncProducer err, message=%s \n", err) return } defer producer.Close() msg := &sarama.ProducerMessage{ Topic: Topic, } var i = 1100 for { i++ //將字符串轉換為字節數組 msg.Value = sarama.ByteEncoder(fmt.Sprintf("this is a message:%d", i)) //SendMessage:該方法是生產者生產給定的消息 //partition, offset, err := producer.SendMessage(msg) _, _, err := producer.SendMessage(msg) //生產失敗的時候返回error if err != nil { fmt.Println(fmt.Sprintf("Send message Fail %v", err)) } //生產成功的時候返回該消息的分區和所在的偏移量 //fmt.Printf("send message Partition = %d, offset=%d\n", partition, offset) time.Sleep(time.Second * 5) } } func ConsumerDemo3() { config := sarama.NewConfig() // Version 必須大於等於 V0_10_2_0 config.Version = sarama.V0_10_2_1 config.Consumer.Return.Errors = true fmt.Println("start connect kafka") // 開始連接kafka服務器 //group, err := sarama.NewConsumerGroup(Address, "demo3", config) client, err := sarama.NewClient(Address, config) if err != nil { fmt.Println("connect kafka failed; err", err) return } defer func() { _ = client.Close() }() //// group, err := sarama.NewConsumerGroupFromClient("demo3", client) if err != nil { fmt.Println("connect kafka failed; err", err) return } // 檢查錯誤 go func() { for err := range group.Errors() { fmt.Println("group errors : ", err) } }() ctx := context.Background() fmt.Println("start get msg") // for 是應對 consumer rebalance for { // 需要監聽的主題 topics := []string{Topic} handler := ConsumerGroupHandler{} // 啟動kafka消費組模式,消費的邏輯在上面的 ConsumeClaim 這個方法里 err := group.Consume(ctx, topics, handler) if err != nil { fmt.Println("consume failed; err : ", err) return } } } type ConsumerGroupHandler struct{} func (ConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error { return nil } func (ConsumerGroupHandler) Cleanup(sess sarama.ConsumerGroupSession) error { return nil } // 這個方法用來消費消息的 func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // 獲取消息 for msg := range claim.Messages() { fmt.Printf("demo3 receive message %s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) // 將消息標記為已使用 sess.MarkMessage(msg, "") } return nil }
運行結果:
https://github.com/bsm/sarama-cluster