Kafka介紹
Kafka是Apache軟件基金會開發的一個開源流處理平台,由Java和Scala編寫;Kafka是一種高吞吐、分布式、基於訂閱發布的消息系統。
Kafka名稱解釋
- Producer:生產者
- Consumer:消費者
- Topic:消息主題,每一類的消息稱之為一個主題
- Broker:Kafka以集群的方式運行,可以由一個或多個服務器組成,每個服務器叫做一個broker
- Partition:物理概念上的分區,為了提供系統吞吐量,在物理上每個Topic會分為一個或多個Partition
Kafka架構圖
一個典型的Kafka集群中包含若干Producer,若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干Consumer Group,以及一個Zookeeper集群。
Kafka通過Zookeeper管理集群配置及服務協同,Producer使用push模式將消息發布到broker,Consumer通過監聽使用pull模式從broker訂閱並消費消息。
圖上有個細節需要注意,producer給broker的過程是push,也就是有數據就推送給broker,而consumer給broker的過程是pull,是通過consumer主動去拉數據的,而不是broker把數據主動發送給consumer端的。
Kafka與RabbitMQ比較
- Kafka比RabbitMQ性能要高
- RabbitMQ比Kafka可靠性要高
- 因此在金融支付領域使用RabbitMQ居多,而在日志處理、大數據等方面Kafka使用居多。
Kafka安裝
第一步 下載Kafka:
地址 http://kafka.apache.org/downloads
第二步 解壓Kafka:
tar -zxvf kafka.tgz -C /usr/local/kafka
第三步 運行Zookeeper:
以后台方式運行 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & zookeeper端口 2181
第四步 運行Kafka:
以后台方式運行 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties kafka端口 9092
Kafka圖形管理工具
http://www.kafkatool.com/download.html
Go語言中使用Kafka
Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later).
安裝sarama
go get github.com/Shopify/sarama
Producer
package main import ( "fmt" "github.com/Shopify/sarama" ) func main() { // 新建一個arama配置實例 config := sarama.NewConfig() // WaitForAll waits for all in-sync replicas to commit before responding. config.Producer.RequiredAcks = sarama.WaitForAll // NewRandomPartitioner returns a Partitioner which chooses a random partition each time. config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true // 新建一個同步生產者 client, err := sarama.NewSyncProducer([]string{"172.16.65.210:9092"}, config) if err != nil { fmt.Println("producer close, err:", err) return } defer client.Close() // 定義一個生產消息,包括Topic、消息內容、 msg := &sarama.ProducerMessage{} msg.Topic = "revolution" msg.Key = sarama.StringEncoder("miles") msg.Value = sarama.StringEncoder("hello world...") // 發送消息 pid, offset, err := client.SendMessage(msg) msg2 := &sarama.ProducerMessage{} msg2.Topic = "revolution" msg2.Key = sarama.StringEncoder("monroe") msg2.Value = sarama.StringEncoder("hello world2...") pid2, offset2, err := client.SendMessage(msg2) if err != nil { fmt.Println("send message failed,", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) fmt.Printf("pid2:%v offset2:%v\n", pid2, offset2) }
Consumer
package main import ( "sync" "github.com/Shopify/sarama" "fmt" ) var wg sync.WaitGroup func main() { consumer, err := sarama.NewConsumer([]string{"172.16.65.210:9092"}, nil) if err != nil { fmt.Println("consumer connect error:", err) return } fmt.Println("connnect success...") defer consumer.Close() partitions, err := consumer.Partitions("revolution") if err != nil { fmt.Println("geet partitions failed, err:", err) return } for _, p := range partitions { partitionConsumer, err := consumer.ConsumePartition("revolution", p, sarama.OffsetOldest) if err != nil { fmt.Println("partitionConsumer err:", err) continue } wg.Add(1) go func(){ for m := range partitionConsumer.Messages() { fmt.Printf("key: %s, text: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset) } wg.Done() }() } wg.Wait() }