Go 關於 kafka 的生產者、消費者實例


zookeeper + kafka

首先要在 apche 官網下載 kafka 的程序包(linux版本),然后放到服務器上解壓,得到以下目錄

bin 目錄下包含了服務的啟動腳本

 

啟動 zookeeper

./bin/zookeeper-server-start.sh config/zookeeper.properties

  

啟動 kafka server

./bin/kafka-server-start.sh config/server.properties

 

創建一個主題

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cctv1(主題名)

  

啟動生產者

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic cctv1

 

啟動消費者

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1

如果要消費前面的數據,在啟動時添加 --from-beginning 參數

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1 --from-beginning

 

查看 kafka 進程命令:

jps

 

producer

安裝 Sarama 包

go get github.com/Shopify/sarama

 

demo:創建生產者,往單節點的 kafka 上發送數據

package main
import (
   "fmt"
   "github.com/Shopify/sarama"
   "time"
)

//消息寫入kafka
func main() {
   //初始化配置
   config := sarama.NewConfig()
   config.Producer.RequiredAcks = sarama.WaitForAll
   config.Producer.Partitioner = sarama.NewRandomPartitioner
   config.Producer.Return.Successes = true
   //生產者
   client, err := sarama.NewSyncProducer([]string{"10.10.4.35:9092"}, config)
   if err != nil {
      fmt.Println("producer close,err:", err)
      return
   }
   defer client.Close()

   for i:=0; i<5; i++ {
      //創建消息
      msg := &sarama.ProducerMessage{}
      msg.Topic = "cctv1"
      msg.Value = sarama.StringEncoder("this is a good test,hello kai")
      //發送消息
      pid, offset, err := client.SendMessage(msg)
      if err != nil {
         fmt.Println("send message failed,", err)
         return
      }
      fmt.Printf("pid:%v offset:%v\n", pid, offset)
      time.Sleep(time.Second)
   }
}

 

運行結果:(上述代碼執行一下,就會往 kafka 中 cctv1 的主題發布 5 條消息)

 

然后通過 kafka 自帶的消費者終端查看發送的數據

./bin/kafka-console-consumer.sh --bootstrap-server 10.10.4.35:9092 --topic cctv1 --from-beginning

 

consumer

demo:創建消費者,從單節點的 kafka 中消費數據

package main
import (
	"fmt"
	"github.com/Shopify/sarama"
	"sync"
)

var wg sync.WaitGroup

func main() {
    consumer, err := sarama.NewConsumer([]string{"10.10.4.35:9092"}, nil)
    if err != nil {
        fmt.Println("consumer connect err:", err)
        return
    }
    defer consumer.Close()

    //獲取 kafka 主題
    partitions, err := consumer.Partitions("cctv1")
    if err != nil {
        fmt.Println("get partitions failed, err:", err)
        return
    }

    for _, p := range partitions {
    	//sarama.OffsetNewest:從當前的偏移量開始消費,sarama.OffsetOldest:從最老的偏移量開始消費
        partitionConsumer, err := consumer.ConsumePartition("cctv1", p, sarama.OffsetNewest)
        if err != nil {
            fmt.Println("partitionConsumer err:", err)
            continue
        }
        wg.Add(1)
        go func(){
            for m := range partitionConsumer.Messages() {
                fmt.Printf("key: %s, text: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset)
            }
            wg.Done()
        }()
    }
    wg.Wait()
}

 

consumer 版本2

package kafkaPlugin
import (
    "fmt"
    "github.com/Shopify/sarama"
)

type KafkaConsumer struct {
    Node string
    Topic string
    //Message string
    MessageQueue chan string
}

func (this KafkaConsumer) Consume(){
    consumer, err := sarama.NewConsumer([]string{this.Node}, nil)
    if err != nil {
        fmt.Printf("kafka connnet failed, error[%v]", err.Error())
        return
    }
    defer consumer.Close()

    partitions, err := consumer.Partitions(this.Topic)
    if err != nil {
        fmt.Printf("get topic failed, error[%v]", err.Error())
        return
    }
    for _, p := range partitions {
        partitionConsumer, err := consumer.ConsumePartition(this.Topic, p, sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("get partition consumer failed, error[%v]", err.Error())
            continue
        }

        for message := range partitionConsumer.Messages() {
            fmt.Printf("message:[%v], key:[%v], offset:[%v]\n", string(message.Value), string(message.Key), string(message.Offset))
            this.MessageQueue <- string(message.Value)
        }
    }
}

func main(){
    var kafkaConsumer = KafkaConsumer{
        Node: "10.10.4.35:9092",
        Topic: "cctv1",
    }
    kafkaConsumer.Consume()
}
View Code

 

上述代碼執行起來,就會開啟 kafka 消費者持續監聽,然后通過 kafka 自帶的生產者終端發送 2 條測試數據,消費結果如下:

 

windows 環境,運行編譯錯誤:exec: "gcc": executable file not found in %PATH%

問題處理請參考下面的鏈接:

https://blog.csdn.net/myBarbara/article/details/95358694

https://www.cnblogs.com/ggg-327931457/p/9694516.html

 

 

ending ~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM