Kafka的安裝與啟動
kafka中涉及的名詞
- 消息記錄:由一個key,一個value和一個時間戳構成,消息最終存儲在主題下的分區中,記錄在生產中稱為生產者記錄,在消費者中稱為消費記錄。Kafka集群保持了所有發布的消息,直到它們過期,無論消息是否被消費了,在一個可配置的時間段內,Kafka集群保留了所有發布的消息。比如消息的保存策略被設置為2天,那么在一個消息被發布的兩天時間內,它都是可以被消費的。Kafka的性能是和數據量無關的常量級的,所以保留太多數據並不是問題
- 生成者:生產者用於發布消息
- 消費者:消費者用於訂閱消息
- 消費者組:相同的groupID的消費者將視為同一個消費者組,每個消費者都需要設置一個組id,每條消息只能被consumer group中的一個Consumer消費,但是可以被多個consumer group消費
- 主題(topic):消息的一種邏輯分組,用於對消息分門別類,每一類消息稱之為一個主題,相同主題的消息放在一個隊列中
- 分區(partition):消息的一種物理分組,一個主題被拆成多個分區,每一個分區就是一個順序的,不可變的消息隊列,並且可以持續添加,分區中的每個消息都被分配了一個唯一的id,稱之為偏移量(offset),在每個分區中偏移量都是唯一的。每個分區對應一個邏輯log,有多個segment組成
- 偏移量:分區中每個消息都有一個唯一的Id,稱之為偏移量,代表已經消費的位置
- 代理(broker):一台kafka服務器稱之為一個broker
- 副本(replica):副本只是一個分區(partition)的備份。副本不讀取或寫入數據。它們用於防止數據丟失
- 領導者:leader是負責給定分區的所有讀取和寫入的節點
- 追隨者:跟隨領導者指令的節點被稱為Follower。
- zookeeper:Kafka代理是無狀態的,所以它們使用Zookeeper來維護它們的集群狀態。Zookeeper用於管理和協調Kafka代理
kafka功能
- 發布訂閱:生產者生產消息(數據流),將消息發送給kafka指定的主題隊列中,也可以發送到topic中的指定分區中,消費者從kafka的指定隊列中獲取消息,然后來處理消息
一. Mac版安裝
brew install kafka
安裝kafka需要依賴zookeeper的,所以安裝kafka的時候也會包含zooker
- kafka的安裝目錄:/usr/local/Cellar/kafka
- kafka的配置文件目錄:/usr/local/etc/kafka
- kafka服務的配置文件:/usr/local/etc/kafka/server.properties
- zookeeper配置文件:/usr/local/etc/kafka/zookeeper.properties
server.properties中重要配置
- broker.id=0
- listeners=PLAINTEXT://:9092
- advertised.listeners=PLAINTEXT://127.0.0.1:9092
- log.dirs=/usr/local/var/lib/kafka-logs
zookeeper.properties重要配置
- dataDir=/usr/local/var/lib/zookeeper
- clientPort=2181
- maxClientCnxns=0
二. 啟動zookeeper
新創建終端啟動zookeeper
- cd /usr/local/Cellar/kafka/2.1.0
- ./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
- 打印台顯示:INFO Reading configuration from: /usr/local/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
- ...即是啟動成功
三.啟動kafka
新創建終端啟動kafka(啟動kafka之前必須先啟動zookeeper)
- cd /usr/local/Cellar/kafka/2.1.0
- ./bin/kafka-server-start /usr/local/etc/kafka/server.properties
- 打印台顯示:INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
- ...即啟動成功
- 啟動了kafka之后,zookeeper端會報一些Error:KeeperErrorCode = NoNode for /config/topics/test之類的錯誤,這個是沒有問題的,這是因為kafka向zookeeper發送了關於該路徑的一些請求信息,但是不存在,所以這是沒有問題的
四.創建topic
新創建終端
- cd /usr/local/Cellar/kafka/2.1.0
- 創建一個名為“test”的主題:./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- 查看所有的topic:./bin/kafka-topics --list --zookeeper localhost:2181
- 查看某個topic的信息,比如test:./bin/kafka-topics --describe --zookeeper localhost:2181 --topic test
五.發送消息
新創建一個終端,作為生產者,用於發送消息,每一行就是一條信息,將消息發送到kafka服務器
- cd /usr/local/Cellar/kafka/2.1.0
- ./bin/kafka-console-producer --broker-list localhost:9092 --topic test
- send one message
- send two message
六.消費消息(接受消息)
新創建一個終端作為消費者,接受消息
- cd /usr/local/Cellar/kafka/2.1.0
- ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
- send one message
- send two message(這些便是從生產者獲得的消息)
注意:發送消息與接受消息必須啟動kafka與zookeeper
GoLang實現kafka的信息發布與訂閱
生產者
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
// 等待服務器所有副本都保存成功后的響應
config.Producer.RequiredAcks = sarama.WaitForAll
// 隨機的分區類型:返回一個分區器,該分區器每次選擇一個隨機分區
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 是否等待成功和失敗后的響應
config.Producer.Return.Successes = true
// 使用給定代理地址和配置創建一個同步生產者
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer producer.Close()
//構建發送的消息,
msg := &sarama.ProducerMessage {
//Topic: "test",//包含了消息的主題
Partition: int32(10),//
Key: sarama.StringEncoder("key"),//
}
var value string
var msgType string
for {
_, err := fmt.Scanf("%s", &value)
if err != nil {
break
}
fmt.Scanf("%s",&msgType)
fmt.Println("msgType = ",msgType,",value = ",value)
msg.Topic = msgType
//將字符串轉換為字節數組
msg.Value = sarama.ByteEncoder(value)
//fmt.Println(value)
//SendMessage:該方法是生產者生產給定的消息
//生產成功的時候返回該消息的分區和所在的偏移量
//生產失敗的時候返回error
partition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Send message Fail")
}
fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
}
}
消費者
import (
"fmt"
"github.com/Shopify/sarama"
"sync"
)
var (
wg sync.WaitGroup
)
func main() {
// 根據給定的代理地址和配置創建一個消費者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
//Partitions(topic):該方法返回了該topic的所有分區id
partitionList, err := consumer.Partitions("test")
if err != nil {
panic(err)
}
for partition := range partitionList {
//ConsumePartition方法根據主題,分區和給定的偏移量創建創建了相應的分區消費者
//如果該分區消費者已經消費了該信息將會返回error
//sarama.OffsetNewest:表明了為最新消息
pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer pc.AsyncClose()
wg.Add(1)
go func(sarama.PartitionConsumer) {
defer wg.Done()
//Messages()該方法返回一個消費消息類型的只讀通道,由代理產生
for msg := range pc.Messages() {
fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}(pc)
}
wg.Wait()
consumer.Close()
}
kafka使用場景
-
kafka的應用很廣泛,在這里簡單介紹幾種
-
服務解耦
比如我們發了一個帖子,除了寫入數據庫之外還有很多聯動操作,比如給關注這個用戶的人發送通知,推送到首頁的時間線列表,如果用代碼實現的話,發帖服務就要調用通知服務,時間線服務,這樣的耦合很大,並且如果增加一個功能依賴發帖,除了要增加新功能外還要修改發帖代碼。
解決方法:引入kafka,將發完貼的消息放入kafka消息隊列中,對這個主題感興趣的功能就自己去消費這個消息,那么發帖功能就能夠完全獨立。同時即使發帖進程掛了,其他功能還能夠使用,這樣可以將bug隔離在最小范圍內
-
流量削峰
流量削峰在消息隊列中也是常用場景,一般在秒殺或團購活動中使用比較廣泛。當流量太大的時候達到服務器瓶頸的時候可以將事件放在kafka中,下游服務器當接收到消息的時候自己去消費,有效防止服務器被擠垮
- 消息通訊
消息隊列一般都內置了高效的通信機制,因此也可以用在純的消息通訊中,比如客戶端A跟客戶端B都使用同一隊列進行消息通訊,客戶端A,客戶端B,客戶端N都訂閱了同一個主題進行消息發布和接受不了實現類似聊天室效果
-