一、官網示例
針對golang的 kafka client有很多開源package,例如sarama, confluent等等。
confluent-kafka-go,是kafka官網推薦的golang package。
網址:https://github.com/confluentinc/confluent-kafka-go/tree/v1.1.0
消費者示例:
import ( "fmt" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" ) func main() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost", "group.id": "myGroup", "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { // The client will automatically try to recover from all errors. fmt.Printf("Consumer error: %v (%v)\n", err, msg) } } c.Close() }
生成者示例:
import ( "fmt" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" ) func main() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"}) if err != nil { panic(err) } defer p.Close() // Delivery report handler for produced messages go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() // Produce messages to topic (asynchronously) topic := "myTopic" for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } // Wait for message deliveries before shutting down p.Flush(15 * 1000) }
二、項目示例:
1.參數group.instance.id說明:
配置了此參數,說明該consumer是靜態成員。
靜態成員配以較大的session超時設置能夠避免因成員臨時不可用(比如重啟)而引發的Rebalance。
2.參數session.timeout.ms說明:
配合group.instance.id使用,如果該consumer超過了該時間,還沒有上線,那么將觸發rebalance。
如果不配置此參數,默認時間是6000ms。
實際生成中,程序或容器的重啟可能需要幾分鍾。因此可以設置大一點。
3.參數auto.offset.reset:
如果kafka服務器記錄有消費者消費到的offset,那么消費者都會從該offset開始消費。
kafka中沒有offset時,不論是什么原因,offset沒了,這是auto.offset.reset配置就會起作用,
- earliest:從最早的offset開始消費,就是partition的起始位置開始消費
- latest:從最近的offset開始消費,就是新加入partition的消息才會被消費
- none:報錯
消費者:
package kafka import ( "fmt" "strings" "time" "math/rand" "github.com/google/uuid" "github.com/confluentinc/confluent-kafka-go/kafka" ) func InitKafka(address []string, groupID, topic string) { // 簡單的隨機數生成,結合時間模塊初始化種子 rand.Seed(time.Now().Unix()) // (time.Now().UnixNano() //fmt.Println(rand.Int()) // 19位隨機數 brokers := strings.Join(address, ",") fmt.Println("kafka 服務器地址", broker) consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "broker.address.family": "v4", "group.id": groupID, "group.instance.id": uuid.NewString(), "session.timeout.ms": 6000, "auto.offset.reset": "earliest", }) if err != nil { panic(fmt.Errorf("cannot create kafka consumer %s", err.Error())) } err = consumer.SubscribeTopics([]string{topic}, nil) if err != nil { panic(fmt.Errorf("consumer cannot subscribe %s", err.Error())) } return consumer }