zookeeper + kafka
首先要在 apche 官網下載 kafka 的程序包(linux版本),然后放到服務器上解壓,得到以下目錄
bin 目錄下包含了服務的啟動腳本
啟動 zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties
啟動 kafka server
./bin/kafka-server-start.sh config/server.properties
創建一個主題
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cctv1(主題名)
啟動生產者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic cctv1
啟動消費者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1
如果要消費前面的數據,在啟動時添加 --from-beginning 參數
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1 --from-beginning
查看 kafka 進程命令:
jps
producer
安裝 Sarama 包
go get github.com/Shopify/sarama
demo:創建生產者,往單節點的 kafka 上發送數據
package main import ( "fmt" "github.com/Shopify/sarama" "time" ) //消息寫入kafka func main() { //初始化配置 config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true //生產者 client, err := sarama.NewSyncProducer([]string{"10.10.4.35:9092"}, config) if err != nil { fmt.Println("producer close,err:", err) return } defer client.Close() for i:=0; i<5; i++ { //創建消息 msg := &sarama.ProducerMessage{} msg.Topic = "cctv1" msg.Value = sarama.StringEncoder("this is a good test,hello kai") //發送消息 pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send message failed,", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) time.Sleep(time.Second) } }
運行結果:(上述代碼執行一下,就會往 kafka 中 cctv1 的主題發布 5 條消息)
然后通過 kafka 自帶的消費者終端查看發送的數據
./bin/kafka-console-consumer.sh --bootstrap-server 10.10.4.35:9092 --topic cctv1 --from-beginning
consumer
demo:創建消費者,從單節點的 kafka 中消費數據
package main import ( "fmt" "github.com/Shopify/sarama" "sync" ) var wg sync.WaitGroup func main() { consumer, err := sarama.NewConsumer([]string{"10.10.4.35:9092"}, nil) if err != nil { fmt.Println("consumer connect err:", err) return } defer consumer.Close() //獲取 kafka 主題 partitions, err := consumer.Partitions("cctv1") if err != nil { fmt.Println("get partitions failed, err:", err) return } for _, p := range partitions { //sarama.OffsetNewest:從當前的偏移量開始消費,sarama.OffsetOldest:從最老的偏移量開始消費 partitionConsumer, err := consumer.ConsumePartition("cctv1", p, sarama.OffsetNewest) if err != nil { fmt.Println("partitionConsumer err:", err) continue } wg.Add(1) go func(){ for m := range partitionConsumer.Messages() { fmt.Printf("key: %s, text: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset) } wg.Done() }() } wg.Wait() }
consumer 版本2

package kafkaPlugin import ( "fmt" "github.com/Shopify/sarama" ) type KafkaConsumer struct { Node string Topic string //Message string MessageQueue chan string } func (this KafkaConsumer) Consume(){ consumer, err := sarama.NewConsumer([]string{this.Node}, nil) if err != nil { fmt.Printf("kafka connnet failed, error[%v]", err.Error()) return } defer consumer.Close() partitions, err := consumer.Partitions(this.Topic) if err != nil { fmt.Printf("get topic failed, error[%v]", err.Error()) return } for _, p := range partitions { partitionConsumer, err := consumer.ConsumePartition(this.Topic, p, sarama.OffsetNewest) if err != nil { fmt.Printf("get partition consumer failed, error[%v]", err.Error()) continue } for message := range partitionConsumer.Messages() { fmt.Printf("message:[%v], key:[%v], offset:[%v]\n", string(message.Value), string(message.Key), string(message.Offset)) this.MessageQueue <- string(message.Value) } } } func main(){ var kafkaConsumer = KafkaConsumer{ Node: "10.10.4.35:9092", Topic: "cctv1", } kafkaConsumer.Consume() }
上述代碼執行起來,就會開啟 kafka 消費者持續監聽,然后通過 kafka 自帶的生產者終端發送 2 條測試數據,消費結果如下:
windows 環境,運行編譯錯誤:exec: "gcc": executable file not found in %PATH%
問題處理請參考下面的鏈接:
https://blog.csdn.net/myBarbara/article/details/95358694
https://www.cnblogs.com/ggg-327931457/p/9694516.html
ending ~