Kafka


kafka

啟動kafka

命令行分別啟動zookeeperkafka

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties

終端獲取kafka數據

bin\windows\kafka-console-consumer.bat --bootstrap-server=127.0.0.1:9092 --topic=web_log --from-beginning

生產者producer

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
)

func main()  {
	go kafka_producer_test()
}

func kafka_producer_test(){
	config := sarama.NewConfig()
	// 發送完數據需要leader和follow都確認
	config.Producer.RequiredAcks = sarama.WaitForAll
	// 選出一個partition
	config.Producer.Partitioner = sarama.NewCustomPartitioner()
	// 成功交付的消息將在success channel 返回
	config.Producer.Return.Successes = true
	// 構造一個消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web_log"
	msg.Value = sarama.StringEncoder("this is a test log")
	//連接kafka
	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
	if err !=nil{
		fmt.Println("producer closed, err:", err)
		return
	}
	defer client.Close()
	// 發送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil{
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

消費者 consumer

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"sync"
)

func main()  {
	go kafka_consumer_test()
}

func kafka_consumer_test(){
	var wg sync.WaitGroup
	consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
	if err != nil{
		fmt.Println("Failed to start consumer: %s", err)
		return
	}
	partitionList, err := consumer.Partitions("web_log")  //獲得該topic所有的分區
	if err != nil{
		fmt.Println("Failed to get the list of partition:, ",err)
		return
	}
	fmt.Println(partitionList)

	for partition := range partitionList{
		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
		if err != nil{
			fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
			return
		}
		wg.Add(1)
		go func(sarama.PartitionConsumer) { //為每個分區開一個go協程去取值
			for msg := range pc.Messages(){  //阻塞直到有值發送過來,然后再繼續等待
				fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key),string(msg.Value))
			}
			defer pc.AsyncClose()
			wg.Done()
		}(pc)
	}
	wg.Wait()
	consumer.Close()
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM