sarama連接kafka


package main

import (
    "fmt"
    "github.com/Shopify/sarama"
)

//基於sarama第三方庫開發的kafka client
func main() {
    config := sarama.NewConfig()

    //tailf 包使用
    config.Producer.RequiredAcks = sarama.WaitForAll          // 發送完數據需要 leader和follow都確認
    config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個 partition;s隨機;3種  哈希
    //config.Producer.Partitioner = sarama.NewRoundRobinPartitioner // 新選出一個 partition;輪訓
    config.Producer.Return.Successes = true // 成功交付的消息將在success_channel返回
    //構造一個消息
    msg := &sarama.ProducerMessage{
        Topic: "x",
        Value: sarama.StringEncoder("this is a test log"),
    }
    //連接kafka
    client, err := sarama.NewSyncProducer([]string{"193.112.27.150:9092"}, config)
    if err != nil {
        fmt.Println("producer closed err:", err)
        return
    }
    defer client.Close()
    //發送消息
    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send msg failed , err:", err)
        return
    }
    fmt.Printf("pid :%v offset :%v\n", pid, offset)
    //pid :0 offset :1
}

//http_proxy=https://127.0.0.1:1087 go get -v github.com/Shopify/sarama
//
//
//http_proxy=socks5://127.0.0.1:1086 go get -v github.com/Shopify/sarama
//https_proxy=socks5://127.0.0.1:1086 go get -u github.com/jinzhu/gorm
//
//http_proxy=https://goproxy.io/ go get -u -v gopkg.in/jcmturner/gokrb5.v7/types

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM