一、kafka集群搭建
至於kafka是什么我都不多做介紹了,網上寫的已經非常詳盡了。
(沒安裝java環境的需要先安裝 yum -y install java-1.8.0-openjdk*)
1. 下載zookeeper https://zookeeper.apache.org/releases.html
2. 下載kafka http://kafka.apache.org/downloads
3. 啟動zookeeper集群(我的示例是3台機器,后面的kafka也一樣,這里就以1台代指3台,當然你也可以只開1台)
1)配置zookeeper。 修改復制一份 zookeeper-3.4.13/conf/zoo_sample.cfg 改名成zoo.cfg。修改以下幾個參數,改成適合自己機器的。
dataDir=/home/test/zookeeper/data dataLogDir=/home/test/zookeeper/log server.1=10.22.1.1:2888:3888 server.2=10.22.1.2:2888:3888 server.3=10.22.1.3:2888:3888
2) 創建myid文件,確定機器編號。分別在3台機器的/home/test/zookeeper/data目錄執行分別執行命令 echo 1 > myid(注意ip為10.22.1.2把1改成2,見上面的配置)
3) 啟動zookeeper集群。分別進入目錄zookeeper-3.4.13/bin 執行 sh zkServer.sh start
4. 啟動kafka集群
1) 配置kafka。進入kafka_2.11-2.2.0/config。復制3份,分別為server1.properties,server2.properties,server3.properties。修改以下幾項(注意對應的機器id)
log.dirs和zookeeper.connect 是一樣的。broker.id和listeners分別填對應的id和ip
broker.id=1 listeners=PLAINTEXT://10.22.1.1:9092 log.dirs=/home/test/kafka/log zookeeper.connect=10.22.1.1:2181,10.22.1.2:2181,10.22.1.3:2181
2) 啟動kafka集群。分別進入kafka_2.11-2.2.0/bin目錄,分別執行sh kafka-server-start.sh ../config/server1.properties (第2台用server2.properties配置文件)
二、Golang生產者和消費者
目前比較流行的golang版的kafka客戶端庫有兩個:
1. https://github.com/Shopify/sarama
2. https://github.com/confluentinc/confluent-kafka-go
至於誰好誰壞自己去分辨,我用的是第1個,star比較多的。
1. kafka生產者代碼
這里有2點要說明:
1) config.Producer.Partitioner = sarama.NewRandomPartitioner,我分partition用的是隨機,如果你想穩定分paritition的話可以自定義,還有輪詢和hash方式
2) 我的topic是走的外部配置,可以根據自己的需求修改
// Package kafka_producer kafka 生產者的包裝
package kafka_producer
import (
"github.com/Shopify/sarama"
"strings"
"sync"
"time"
"github.com/alecthomas/log4go"
)
// Config 配置
type Config struct {
Topic string `xml:"topic"`
Broker string `xml:"broker"`
Frequency int `xml:"frequency"`
MaxMessage int `xml:"max_message"`
}
type Producer struct {
producer sarama.AsyncProducer
topic string
msgQ chan *sarama.ProducerMessage
wg sync.WaitGroup
closeChan chan struct{}
}
// NewProducer 構造KafkaProducer
func NewProducer(cfg *Config) (*Producer, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.NoResponse // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = time.Duration(cfg.Frequency) * time.Millisecond // Flush batches every 500ms
config.Producer.Partitioner = sarama.NewRandomPartitioner
p, err := sarama.NewAsyncProducer(strings.Split(cfg.Broker, ","), config)
if err != nil {
return nil, err
}
ret := &Producer{
producer: p,
topic: cfg.Topic,
msgQ: make(chan *sarama.ProducerMessage, cfg.MaxMessage),
closeChan: make(chan struct{}),
}
return ret, nil
}
// Run 運行
func (p *Producer) Run() {
p.wg.Add(1)
go func() {
defer p.wg.Done()
LOOP:
for {
select {
case m := <-p.msgQ:
p.producer.Input() <- m
case err := <-p.producer.Errors():
if nil != err && nil != err.Msg {
l4g.Error("[producer] err=[%s] topic=[%s] key=[%s] val=[%s]", err.Error(), err.Msg.Topic, err.Msg.Key, err.Msg.Value)
}
case <-p.closeChan:
break LOOP
}
}
}()
for hasTask := true; hasTask; {
select {
case m := <-p.msgQ:
p.producer.Input() <- m
default:
hasTask = false
}
}
}
// Close 關閉
func (p *Producer) Close() error {
close(p.closeChan)
l4g.Warn("[producer] is quiting")
p.wg.Wait()
l4g.Warn("[producer] quit over")
return p.producer.Close()
}
// Log 發送log
func (p *Producer) Log(key string, val string) {
msg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(key),
Value: sarama.StringEncoder(val),
}
select {
case p.msgQ <- msg:
return
default:
l4g.Error("[producer] err=[msgQ is full] key=[%s] val=[%s]", msg.Key, msg.Value)
}
}
2. kafka消費者
幾點說明:
1) kafka一定要選用支持集群的版本
2) 里面帶了創建topic,刪除topic,打印topic的工具
3) replication是外面配置的
4) 開多個consumer需要在創建topic時設置多個partition。官方的示例當開多個consumer的時候會崩潰,我這個版本不會,我給官方提交了一個PR,還不知道有沒有采用
// Package main Kafka消費者
package main
import (
"context"
"encoding/xml"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"
"github.com/Shopify/sarama"
"github.com/alecthomas/log4go"
)
// Consumer Consumer配置
type ConsumerConfig struct {
Topic []string `xml:"topic"`
Broker string `xml:"broker"`
Partition int32 `xml:"partition"`
Replication int16 `xml:"replication"`
Group string `xml:"group"`
Version string `xml:"version"`
}
var (
configFile = "" // 配置路徑
initTopic = false
listTopic = false
delTopic = ""
cfg = &Config{}
)
// Config 配置
type Config struct {
Consumer ConsumerConfig `xml:"consumer"`
}
func init() {
flag.StringVar(&configFile, "config", "../config/consumer.xml", "config file ")
flag.BoolVar(&initTopic, "init", initTopic, "create topic")
flag.BoolVar(&listTopic, "list", listTopic, "list topic")
flag.StringVar(&delTopic, "del", delTopic, "delete topic")
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
defer func() {
time.Sleep(time.Second)
log4go.Warn("[main] consumer quit over!")
log4go.Global.Close()
}()
contents, _ := ioutil.ReadFile(configFile)
xml.Unmarshal(contents, cfg)
// sarama的logger
sarama.Logger = log.New(os.Stdout, fmt.Sprintf("[%s]", "consumer"), log.LstdFlags)
// 指定kafka版本,一定要支持kafka集群
version, err := sarama.ParseKafkaVersion(cfg.Consumer.Version)
if err != nil {
panic(err)
}
config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// 工具
if tool(cfg, config) {
return
}
// kafka consumer client
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(cfg.Consumer.Broker, ","), cfg.Consumer.Group, config)
if err != nil {
panic(err)
}
consumer := Consumer{}
go func() {
for {
err := client.Consume(ctx, cfg.Consumer.Topic, &consumer)
if err != nil {
log4go.Error("[main] client.Consume error=[%s]", err.Error())
// 5秒后重試
time.Sleep(time.Second * 5)
}
}
}()
// os signal
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
<-sigterm
cancel()
err = client.Close()
if err != nil {
panic(err)
}
log4go.Info("[main] consumer is quiting")
}
func tool(cfg *Config, config *sarama.Config) bool {
if initTopic || listTopic || len(delTopic) > 0 {
ca, err := sarama.NewClusterAdmin(strings.Split(cfg.Consumer.Broker, ","), config)
if nil != err {
panic(err)
}
if len(delTopic) > 0 { // 刪除Topic
if err := ca.DeleteTopic(delTopic); nil != err {
panic(err)
}
log4go.Info("delete ok topic=[%s]\n", delTopic)
} else if initTopic { // 初始化Topic
if detail, err := ca.ListTopics(); nil != err {
panic(err)
} else {
for _, v := range cfg.Consumer.Topic {
if d, ok := detail[v]; ok {
if cfg.Consumer.Partition > d.NumPartitions {
if err := ca.CreatePartitions(v, cfg.Consumer.Partition, nil, false); nil != err {
panic(err)
}
log4go.Info("alter topic ok", v, cfg.Consumer.Partition)
}
} else {
if err := ca.CreateTopic(v, &sarama.TopicDetail{NumPartitions: cfg.Consumer.Partition, ReplicationFactor: cfg.Consumer.Replication}, false); nil != err {
panic(err)
}
log4go.Info("create topic ok", v)
}
}
}
}
// 顯示Topic列表
if detail, err := ca.ListTopics(); nil != err {
log4go.Info("ListTopics error", err)
} else {
for k := range detail {
log4go.Info("[%s] %+v", k, detail[k])
}
}
if err := ca.Close(); nil != err {
panic(err)
}
return true
}
return false
}
type Consumer struct {
}
func (consumer *Consumer) Setup(s sarama.ConsumerGroupSession) error {
return nil
}
func (consumer *Consumer) Cleanup(s sarama.ConsumerGroupSession) error {
return nil
}
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
key := string(message.Key)
val := string(message.Value)
log4go.Info("%s-%s", key, val)
session.MarkMessage(message, "")
}
return nil
}

