Golang操作Kafka


一.使用庫說明

Golang中連接kafka可以使用第三方庫:github.com/Shopify/sarama

二.Kafka Producer發送消息

package main 

import (
	"fmt"
	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll // 發送完數據需要leader和follower都確認
	config.Producer.Partitioner = sarama.NewRandomPartitioner  //寫到隨機分區中,我們默認設置32個分區
	config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回

	// 構造一個消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "task"
	msg.Value = sarama.StringEncoder("producer kafka messages...")

	// 連接kafka
	client, err := sarama.NewSyncProducer([]string{"192.20.216.8:9092"}, config)
	if err != nil {
		fmt.Println("Producer closed, err:", err)
		return
	}
	defer client.Close()

	// 發送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

三.Kafka Consumer消費消息

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	consumer, err := sarama.NewConsumer([]string{"192.20.216.8:9092"}, nil)
	if err != nil {
		fmt.Println("Failed to start consumer: %s", err)
		return
	}
	partitionList, err := consumer.Partitions("task-status-data") // 通過topic獲取到所有的分區
	if err != nil {
		fmt.Println("Failed to get the list of partition: ", err)
		return
	}
	fmt.Println(partitionList)

	for partition := range partitionList{ // 遍歷所有的分區
		pc, err := consumer.ConsumePartition("task", int32(partition), sarama.OffsetNewest) // 針對每個分區創建一個分區消費者
		if err != nil {
			fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
		}
		wg.Add(1)
		go func(sarama.PartitionConsumer) { // 為每個分區開一個go協程取值
			for msg := range pc.Messages() { // 阻塞直到有值發送過來,然后再繼續等待
				fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
			}
			defer pc.AsyncClose()
			wg.Done()
		}(pc)
	}
	wg.Wait()
	consumer.Close()
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM