一、官网示例
针对golang的 kafka client有很多开源package,例如sarama, confluent等等。
confluent-kafka-go,是kafka官网推荐的golang package。
网址:https://github.com/confluentinc/confluent-kafka-go/tree/v1.1.0
消费者示例:
import ( "fmt" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" ) func main() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost", "group.id": "myGroup", "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { // The client will automatically try to recover from all errors. fmt.Printf("Consumer error: %v (%v)\n", err, msg) } } c.Close() }
生成者示例:
import ( "fmt" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" ) func main() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"}) if err != nil { panic(err) } defer p.Close() // Delivery report handler for produced messages go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() // Produce messages to topic (asynchronously) topic := "myTopic" for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } // Wait for message deliveries before shutting down p.Flush(15 * 1000) }
二、项目示例:
1.参数group.instance.id说明:
配置了此参数,说明该consumer是静态成员。
静态成员配以较大的session超时设置能够避免因成员临时不可用(比如重启)而引发的Rebalance。
2.参数session.timeout.ms说明:
配合group.instance.id使用,如果该consumer超过了该时间,还没有上线,那么将触发rebalance。
如果不配置此参数,默认时间是6000ms。
实际生成中,程序或容器的重启可能需要几分钟。因此可以设置大一点。
3.参数auto.offset.reset:
如果kafka服务器记录有消费者消费到的offset,那么消费者都会从该offset开始消费。
kafka中没有offset时,不论是什么原因,offset没了,这是auto.offset.reset配置就会起作用,
- earliest:从最早的offset开始消费,就是partition的起始位置开始消费
- latest:从最近的offset开始消费,就是新加入partition的消息才会被消费
- none:报错
消费者:
package kafka import ( "fmt" "strings" "time" "math/rand" "github.com/google/uuid" "github.com/confluentinc/confluent-kafka-go/kafka" ) func InitKafka(address []string, groupID, topic string) { // 简单的随机数生成,结合时间模块初始化种子 rand.Seed(time.Now().Unix()) // (time.Now().UnixNano() //fmt.Println(rand.Int()) // 19位随机数 brokers := strings.Join(address, ",") fmt.Println("kafka 服务器地址", broker) consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "broker.address.family": "v4", "group.id": groupID, "group.instance.id": uuid.NewString(), "session.timeout.ms": 6000, "auto.offset.reset": "earliest", }) if err != nil { panic(fmt.Errorf("cannot create kafka consumer %s", err.Error())) } err = consumer.SubscribeTopics([]string{topic}, nil) if err != nil { panic(fmt.Errorf("consumer cannot subscribe %s", err.Error())) } return consumer }