1.官網上下載kafka安裝包:http://kafka.apache.org/downloads.html
2.執行命令運行zookeeper 實例(單點):
bin/zookeeper-server-start.sh config/zookeeper.properties
3. 啟動kafka broker 服務:
bin/kafka-server-start.sh config/server.properties
其中的 server.properties 有些配置需要修改:
listeners=PLAINTEXT://hostName:9092
如果是遠程producer,hostname設置為ip,這樣遠程機器無需設置host.
log.dir 是broker的日志地址。
4.在使用go的客戶端 Shopify/sarama 包的操作過程:
(1) go get "github.com/Shopify/sarama"
(2) 修改config 中的配置:
c.Version = V0_10_0_0 //使用的是kafka 0.10.0.0的版本
(3) producer測試代碼如下:
package main
import (
"github.com/Shopify/sarama"
"log"
"os"
"strings"
)
var logger = log.New(os.Stderr, "[TEST]", log.LstdFlags)
func main(){
sarama.Logger = logger
config := sarama.NewConfig()
config.ClientID = "newsDataSource"
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
msg := &sarama.ProducerMessage{}
msg.Topic = "hello"
msg.Partition = int32(-1)
msg.Key = sarama.StringEncoder("key")
msg.Value = sarama.ByteEncoder("hello")
producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)
if err != nil {
logger.Printf("Failed to produce message :%s", err )
os.Exit(500)
}
defer producer.Close()
partition, offset, err := producer.SendMessage(msg)
if err != nil {
logger.Printf("Failed to produce message :%s", err )
}
logger.Printf("partition:%d, offset: %d\n", partition, offset )
}
