前言
消息隊列的主要有3大作用
進程通信(IPC):Interprocess Communication
程序解耦:程序由異步變為了異步,提升程序並發(規避IO等待時間)能力。
數據流量削峰:把消息暫時緩沖在消息隊列里面。
NSQ傳遞的消息通常是無序的,當然你也可以保留下信息去check時間戳,因此NSQ更適合處理數據量大但是彼此間沒有順序關系的消息。
消息隊列的2種消息傳遞的模式:
1.點對點模式(queue)
消息生產者生成消息發送到queue中,然后消費者從queue中主動獲取數據進行消費。
當1條消息被消費之后,這條消息就會從queue中消失,不存在重復消費。
golang中的channel就是這種模型。
2.發布和訂閱者(topic)
生產者生成消息時把消息分類成不同Topic,消費者通過訂閱(subcrible)這些Topic,把不同的消息獲出來消費。
發布和訂閱模式和點對點模式最大的區別:
引入 Publish---> Topic<--->Subcrible概念,Topic把消息分類了意味着生產端可以有多個不同的生成者生成不同的消息,消費端也可有多個不同的消費者訂閱不同的Topic。
這種設計模式為生產和消費2端都提供了擴展/收縮的彈性空間 。
在發布和訂閱這模式中,消費者端從消息隊列獲取數據的方式也有2種:
2.1.隊列主動推送消息到消費端(供大於求)
2.2.消費者主動去隊列拉取消息 (求大於供)
kafka概念
kafka是由Linkedln公司開發的用於處理公司內部海量日志傳輸問題的,由Scala和Java編寫。2011年該公司把它貢獻給Apche軟件基金會,現已發展成Apche中的頂級開源項目,kafaka是1個分布式(一提到分布式就會想到集群)的基於發布/訂閱模式的消息隊列(message queue),主要應用於大數據實時處理場景。
kafka具有高吞吐、低延遲、容錯率高的特點。
kafka架構
從宏觀角度來看kafka就是1個非常粗大管子。在這個大管子的兩端有2中角色生產者(producer)和消費者(consumer)。
broker:kafka的分布式就體現在kafka集群中可以靈活擴展broker(kafka集群中的1個節點,服務器)方便我們對kafka集群進行彈性擴展。broker-id不在集群里不能重復。
topic:對消息的分類。
partion:每個Topic可以有N個partion,同1個Topic的數據分布在不同Partion且數據是不重復的,所以partion實現了topic數據的負載均衡,partion的表現形式就是1個個的文件夾。
每個partion類似於Python里面的list,來1條消息apend進去,保證了消息的順序。
這個list中的每條消息都會分配1個的index(offset偏移量),offset保證消息快速讀取,而不是遍歷隨機讀,這也是kafka讀取消息快的原因。
follower:每個leader partion(主分區)都有多個 follower pation(副本分區)也就是是備胎(follower),follower實現partion的備份。
當leader partion故障時kafak會選擇1個follower partion成為leader partion,kafaka中主分區可以設置的最大的副本數量為10,leader和folloer partion不能在同一個服務器上,followers的數量也不能大於brokers(1個kafka集群中服務器)的數量。
consumer group: 多個消費者組成1個組對同1個topc的消息進行消費(增加了kafka的消費能力)
在同1個consumer group中1個consumer可消費多個分區,但是1個分區只能被consumer group里其中1個consumer消費。
也就是說只要N個consumer在同1 consumer group中,它們消費的數據永遠是不一致的。這也是kafka的partion和NSQ的channel之間的本質區別!
producer寫數據到kafka的工作流程
1.producer先從kafka集群中獲取分區的leader信息
2.producer將消息發送給leader
3.leader將消息寫入本地磁盤
4.follower從leader分區拉取消息數據
5.follower將消息寫入本地磁盤,向leader partion發送ACK確定.
6.leader收到所有follower的ACK之后,向Producer發生ACK確定。
ps:ack應答機制保證了Producer數據寫入的可靠性。
關於以上步驟細節:
producer選擇Leader partion的原則?
在kafka中1個topic對應多個leader分區,producer在獲取leader信息的時候是如何選擇其中1個leader的呢?
1.根據producer指定leader分區來幫它存儲數據。(Producer指定了Partion)
2.如果producer沒有指定特定的partion那么會根據producer設置的key,hash出1個值自動判斷選擇哪個leader partion.(Producer指定了Key)
3.如果既沒有指定partion也沒有指定key就采取輪詢的方式寫入到不同的partion(輪着來雨露均沾)
producer往kafka發送數據成功之后,ACK應答機制都也哪些?
producer在向kafka發布消息時,可以設置消息數據寫入kafka成功之后是否需要ACK應答機制?有以下3個參數!
0:producer往集群中發布數據不需要等待kafka的ACK。(可靠性低,效率高!)
1:代表producer要求只需要leader進行ACK,follower從leader拉取數據的時候就別ACK了。(折中方案)
all:producer--->leader----->leader下面所有的follower數據接收成功后都需要 ACK應答。(可靠性高,效率低)
ps:如果producer往不存在的Topic中發送數據時,kafka會自動創建該Topic,partion和replication的數量默認配置都為1.
安裝kafka
1.下載二進制包
二進制包下載之后,根據操作系統 執行kafka和zookeeper的啟動腳本即可
[root@zhanggen bin]# ls connect-distributed.sh kafka-producer-perf-test.sh connect-mirror-maker.sh kafka-reassign-partitions.sh connect-standalone.sh kafka-replica-verification.sh kafka-acls.sh kafka-run-class.sh kafka-broker-api-versions.sh kafka-server-start.sh kafka-configs.sh kafka-server-stop.sh kafka-console-consumer.sh kafka-streams-application-reset.sh kafka-console-producer.sh kafka-topics.sh kafka-consumer-groups.sh kafka-verifiable-consumer.sh kafka-consumer-perf-test.sh kafka-verifiable-producer.sh kafka-delegation-tokens.sh trogdor.sh kafka-delete-records.sh windows kafka-dump-log.sh zookeeper-security-migration.sh kafka-leader-election.sh zookeeper-server-start.sh kafka-log-dirs.sh zookeeper-server-stop.sh kafka-mirror-maker.sh zookeeper-shell.sh kafka-preferred-replica-election.sh [root@zhanggen bin]#
2.啟動zookeeper
zookeeprt是kafka集群中注冊、自動發現服務,類似於NSQ中的Lookupd。kafka的二進制包包含了zookeeper無需單獨下載
zookeeper配置kafka集群信息

tickTime=2000 dataDir=/home/myname/zookeeper clientPort=2181 initLimit=5 syncLimit=2 server.1=192.168.229.160:2888:3888 server.2=192.168.229.161:2888:3888 server.3=192.168.229.162:2888:3888
啟動zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
3.啟動1個kafka broker
broker配置文件

listeners=PLAINTEXT://192.168.11.103:9092
啟動
./bin/kafka-server-start.sh ./config/server.properties
4.生產topic
package main import ( "fmt" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll //賦值為-1:這意味着producer在follower副本確認接收到數據后才算一次發送完成。 config.Producer.Partitioner = sarama.NewRandomPartitioner //寫到隨機分區中,默認設置8個分區 config.Producer.Return.Successes = true msg := &sarama.ProducerMessage{} msg.Topic = `CMDB_log` msg.Value = sarama.StringEncoder("this is a good test") client, err := sarama.NewSyncProducer([]string{"192.168.56.133:9092"}, config) if err != nil { fmt.Println("producer close err, ", err) return } fmt.Println("Kafka連接成功!") defer client.Close() pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send message failed, ", err) return } fmt.Printf("分區ID:%v, offset:%v \n", pid, offset) }
5.驗證topic對應partion
生產者生產的每1個Topic都會有對應的partion進行數據存儲。
[root@zhanggen kafka-logs]# pwd
/tmp/kafka-logs
[root@zhanggen kafka-logs]# ls
cleaner-offset-checkpoint nginx_log-0 zhanggen_log-0
log-start-offset-checkpoint recovery-point-offset-checkpoint
meta.properties replication-offset-checkpoint
[root@zhanggen kafka-logs]# cd zhanggen_log-0/
[root@zhanggen zhanggen_log-0]# ls
00000000000000000000.index 00000000000000000000.timeindex
00000000000000000000.log leader-epoch-checkpoint
[root@zhanggen zhanggen_log-0]#
6.消費數據
[root@zhanggen kafka_2.12-2.5.0]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.133:9092 --topic=zhanggen_log --from-beginning this is a good test this is a good test this is a good test How's it going?
7.使用go消費數據
根據topic查詢到所有分區,然后使用gorutine對每個分區進行消費。
package main import ( "fmt" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() consumer,err:=sarama.NewConsumer([]string{"192.168.56.133:9092"},config) if err!=nil{ fmt.Println("開啟consumer 失敗") return } //獲取某個topic對應的存儲分區列表 partionList,err:=consumer.Partitions("ngix-log") if err!=nil{ fmt.Println("獲取topic:tailconfig下對應分區失敗!") return } fmt.Println(partionList) for _,partion := range partionList{ //從分區列表中獲取每個分區,針對每個分區開啟單獨的go程進行消費, pc,err:=consumer.ConsumePartition("ngix-log",int32(partion),sarama.OffsetNewest) if err!=nil{ fmt.Println("從分區%s獲取數據失敗") return } defer pc.AsyncClose() //開啟go程:異步從每個分區獲取數據 go func(partitionConsumer sarama.PartitionConsumer){ for msg := range pc.Messages(){ fmt.Printf("分區:%d offset:%d key:%v value:%v\n",msg.Partition,msg.Offset,msg.Key,string(msg.Value)) } }(pc) } select { } }
驗證kafka是實時的數據流處理平台,這個很關鍵。
我的日志內容增加了kafaka的topic需要處理,如果減少了也需要處理。所以kafka很適合傳輸日志內容。
D:\goproject\src\go相關模塊\kafka\consumer>go run main.go [0] 分區:0 offset:93 key:[] value:nice to meet you Tom.
------------------------------------------------ 分區:0 offset:94 key:[] value:nice to meet you Jack 分區:0 offset:95 key:[] value:how are you? 分區:0 offset:96 key:[] value:it's ok,I lost my girl firend. 分區:0 offset:97 key:[] value:.............
-----------------------------------在文件里刪除以上內容之后 分區:0 offset:98 key:[] value: nice to meet you Marry.
參考