sarama连接kafka


package main

import (
    "fmt"
    "github.com/Shopify/sarama"
)

//基于sarama第三方库开发的kafka client
func main() {
    config := sarama.NewConfig()

    //tailf 包使用
    config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要 leader和follow都确认
    config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个 partition;s随机;3种  哈希
    //config.Producer.Partitioner = sarama.NewRoundRobinPartitioner // 新选出一个 partition;轮训
    config.Producer.Return.Successes = true // 成功交付的消息将在success_channel返回
    //构造一个消息
    msg := &sarama.ProducerMessage{
        Topic: "x",
        Value: sarama.StringEncoder("this is a test log"),
    }
    //连接kafka
    client, err := sarama.NewSyncProducer([]string{"193.112.27.150:9092"}, config)
    if err != nil {
        fmt.Println("producer closed err:", err)
        return
    }
    defer client.Close()
    //发送消息
    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send msg failed , err:", err)
        return
    }
    fmt.Printf("pid :%v offset :%v\n", pid, offset)
    //pid :0 offset :1
}

//http_proxy=https://127.0.0.1:1087 go get -v github.com/Shopify/sarama
//
//
//http_proxy=socks5://127.0.0.1:1086 go get -v github.com/Shopify/sarama
//https_proxy=socks5://127.0.0.1:1086 go get -u github.com/jinzhu/gorm
//
//http_proxy=https://goproxy.io/ go get -u -v gopkg.in/jcmturner/gokrb5.v7/types

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM