golang--kafka


一、官网示例

针对golang的 kafka client有很多开源package,例如sarama, confluent等等。

confluent-kafka-go,是kafka官网推荐的golang package。

网址:https://github.com/confluentinc/confluent-kafka-go/tree/v1.1.0

消费者示例:

import (
    "fmt"
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost",
        "group.id":          "myGroup",
        "auto.offset.reset": "earliest",
    })

    if err != nil {
        panic(err)
    }

    c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

    for {
        msg, err := c.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
        } else {
            // The client will automatically try to recover from all errors.
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }
    }

    c.Close()
}

生成者示例:

import (
    "fmt"
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
    if err != nil {
        panic(err)
    }

    defer p.Close()

    // Delivery report handler for produced messages
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                }
            }
        }
    }()

    // Produce messages to topic (asynchronously)
    topic := "myTopic"
    for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
        p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }

    // Wait for message deliveries before shutting down
    p.Flush(15 * 1000)
}

 

二、项目示例:

1.参数group.instance.id说明:

配置了此参数,说明该consumer是静态成员。

静态成员配以较大的session超时设置能够避免因成员临时不可用(比如重启)而引发的Rebalance。

2.参数session.timeout.ms说明:

配合group.instance.id使用,如果该consumer超过了该时间,还没有上线,那么将触发rebalance。

如果不配置此参数,默认时间是6000ms。

实际生成中,程序或容器的重启可能需要几分钟。因此可以设置大一点。

3.参数auto.offset.reset

 如果kafka服务器记录有消费者消费到的offset,那么消费者都会从该offset开始消费。

kafka中没有offset时,不论是什么原因,offset没了,这是auto.offset.reset配置就会起作用,

  1. earliest:从最早的offset开始消费,就是partition的起始位置开始消费
  2. latest:从最近的offset开始消费,就是新加入partition的消息才会被消费
  3. none:报错

消费者:

package kafka

import (
    "fmt"
    "strings"
    "time"
    "math/rand"
    
    "github.com/google/uuid"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func InitKafka(address []string, groupID, topic string) {
    // 简单的随机数生成,结合时间模块初始化种子
    rand.Seed(time.Now().Unix())  // (time.Now().UnixNano()
    //fmt.Println(rand.Int()) // 19位随机数

    brokers := strings.Join(address, ",")
    fmt.Println("kafka 服务器地址", broker)

    consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
            "bootstrap.servers": broker,
            "broker.address.family": "v4",
            "group.id":              groupID,
            "group.instance.id":     uuid.NewString(),
            "session.timeout.ms":    6000,
            "auto.offset.reset":     "earliest",
            })
    if err != nil {
        panic(fmt.Errorf("cannot create kafka consumer %s", err.Error()))
    }
    err = consumer.SubscribeTopics([]string{topic}, nil)
    if err != nil {
        panic(fmt.Errorf("consumer cannot subscribe %s", err.Error()))
    }
    return consumer
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM