GO kafka sarama 生產者 消費者 簡單 實現


前提:開啟 zookeeper 、 kafka 

生產者代碼:

步驟:1. 生成配置文件(生產者基礎配置文件、指定生產者回復消息等級 0 1 all、指定生產者消息發送成功或者失敗后的返回通道是什么、

            指定發送到哪一個分區(本文為 隨機分區 正常有三種: 通過partiton、通過key 去 Hash出一個分區、輪詢))

   2. 構建消息(msg := &sarama.Message{} 這里為指針 1.消息可更改  2. 下面的 發送消息SendMessage() 需要指針類型的參數)

    3. 連接kafka 

    4. 發送消息

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"log"
)

func main()  {
	// 構建 生產者
	// 生成 生產者配置文件
	config := sarama.NewConfig()
	// 設置生產者 消息 回復等級 0 1 all
	config.Producer.RequiredAcks = sarama.WaitForAll
	// 設置生產者 成功 發送消息 將在什么 通道返回
	config.Producer.Return.Successes = true
	// 設置生產者 發送的分區
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	// 構建 消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "aaa"
	msg.Value = sarama.StringEncoder("123")

	// 連接 kafka
	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Print(err)
		return
	}
	defer producer.Close()
	// 發送消息
	message, offset, err := producer.SendMessage(msg)
	if err != nil {
		log.Println(err)
		return
	}
	fmt.Println(message, " ", offset)

}

消費者 代碼: 

步驟: 1. 生成消費者 對象 連接對應的 地址 config可以為nil

     2. 拿到所有對應主題下的所有分區

    3. 遍歷每一個分區 調用 消費者對象 傳入 對應的 主題 哪一個具體的分區 從什么位置開始讀取文件 Return:消息對象

    4. 通過 消息對象.Message() 可以取到對應的消息

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"log"
	"sync"
)

// 消費者練習

func main()  {
	// 生成消費者 實例
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		log.Print(err)
		return
	}
	// 拿到 對應主題下所有分區
	partitionList, err := consumer.Partitions("aaa")
	if err != nil {
		log.Println(err)
		return
	}

	var wg sync.WaitGroup
	wg.Add(1)
	// 遍歷所有分區
	for partition := range partitionList{
		//消費者 消費 對應主題的 具體 分區 指定 主題 分區 offset  return 對應分區的對象
		pc, err := consumer.ConsumePartition("aaa", int32(partition), sarama.OffsetNewest)
		if err != nil {
			log.Println(err)
			return
		}

		// 運行完畢記得關閉
		defer pc.AsyncClose()

		// 去出對應的 消息
		// 通過異步 拿到 消息
		go func(sarama.PartitionConsumer) {
			defer wg.Done()
			for msg := range pc.Messages(){
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
			}
		}(pc)
	}
	wg.Wait()
}

 

參考文檔:https://www.liwenzhou.com/posts/Go/go_kafka/ 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM