1. Kafka概述
1.1. 消息隊列
1)點對點模式(一對一,消費者主動拉取數據,消息收到后消息清除)
點對點模型通常是一個基於拉取或者輪詢的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。這個模型的特點是發送到隊列的消息被一個且只有一個接收者接收處理,即使有多個消息監聽者也是如此。
(2)發布/訂閱模式(一對多,數據生產后,推送給所有訂閱者)
發布訂閱模型則是一個基於推送的消息傳送模型。發布訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的所有消息,即使當前訂閱者不可用,處於離線狀態。
1.1. 為什么需要消息隊列
1)解耦:
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2)冗余:
消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
3)擴展性:
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
4)靈活性 & 峰值處理能力:
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
5)可恢復性:
系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
6)順序保證:
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性)
7)緩沖:
有助於控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。
8)異步通信:
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
1.2. 什么是Kafka
在流式計算中,Kafka一般用來緩存數據,Storm通過消費Kafka的數據進行計算。
1)Apache Kafka是一個開源消息系統,由Scala寫成。是由Apache軟件基金會開發的一個開源消息系統項目。
2)Kafka最初是由LinkedIn公司開發,並於2011年初開源。2012年10月從Apache Incubator畢業。該項目的目標是為處理實時數據提供一個統一、高通量、低等待的平台。
3)Kafka是一個分布式消息隊列。Kafka對消息保存時根據Topic進行歸類,發送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)稱為broker。
4)無論是kafka集群,還是consumer都依賴於zookeeper集群保存一些meta信息,來保證系統可用性。
1)Producer :消息生產者,就是向kafka broker發消息的客戶端;
2)Consumer :消息消費者,向kafka broker取消息的客戶端;
3)Topic :可以理解為一個隊列;
4) Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會復制(不是真的復制,是概念上的)到所有的CG,但每個partion只會把消息發給該CG中的一個consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次發送消息到不同的topic;
5)Broker :一台kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic;
6)Partition:為了實現擴展性,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的整體(多個partition間)的順序;
7)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位於2049的位置,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka。
1. Kafka單節點運行方式
Setp 1:下載代碼
下載 kafka_2.12-2.1.0 版本並且解壓。
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.12-2.1.0.tgz
> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0
Setp 2:啟動服務
Kafka 使用 ZooKeeper 如果你還沒有ZooKeeper服務器,你需要先啟動一個ZooKeeper服務器。 您可以通過與kafka打包在一起的便捷腳本來快速簡單地創建一個單節點ZooKeeper實例。如果你有使用docker的經驗,你可以使用docker-compose快速搭建一個zk集群。
> bin/zookeeper-server-start.sh config/zookeeper.properties
現在啟動Kafka服務器:
> bin/kafka-server-start.sh config/server.properties
后台啟動:
> bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
其中1>/dev/null 2>&1 是將命令產生的輸入和錯誤都輸入到空設備,也就是不輸出的意思。
/dev/null代表空設備。
Setp 3:創建一個topic
創建一個名為“test”的topic,它有一個分區和一個副本:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
運行list(列表)命令來查看這個topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 test
除了手工創建topic外,你也可以配置你的broker,當發布一個不存在的topic時自動創建topic。
Setp 4:發送消息
Kafka自帶一個命令行客戶端,它從文件或標准輸入中獲取輸入,並將其作為message(消息)發送到Kafka集群。默認情況下,每行將作為單獨的message發送。
運行 producer,然后在控制台輸入一些消息以發送到服務器。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
hello world
Hello study.163.com
Setp 5:啟動消費者
Kafka還有一個命令行使用者,它會將消息轉儲到標准輸出。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello world
hello study.163.com
如果在不同的終端中運行上述命令,能夠在生產者終端中鍵入消息並看到它們出現在消費者終端中。
所有命令行工具都有選項; 運行不帶參數的命令將顯示使用信息。
=========================================================================================
Kafka集群部署方式
Setp 6:設置多 broker 集群
到目前,我們只是單一的運行一個broker,對於Kafka,一個broker僅僅只是一個集群的大小,接下來我們來設多個broker。
首先為每個broker創建一個配置文件:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
現在編輯這些新建的文件,設置以下屬性:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id屬性是集群中每個節點的名稱,這一名稱是唯一且永久的。
我們已經建立Zookeeper和一個單節點了,現在我們只需要啟動兩個新的節點:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
現在創建一個副本為3的新topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
運行命令“describe topics” 查看集群中的topic信息
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
以下是對輸出信息的解釋:第一行給出了所有分區的摘要,下面的每行都給出了一個分區的信息。因為我們只有一個分區,所以只有一行。
l “leader”是負責給定分區所有讀寫操作的節點。每個節點都是隨機選擇的部分分區的領導者。
l “replicas”是復制分區日志的節點列表,不管這些節點是leader還是僅僅活着。
l “isr”是一組“同步”replicas,是replicas列表的子集,它活着並被指到leader。
請注意,在示例中,節點1是該主題中唯一分區的領導者。
我們運行這個命令,看看一開始我們創建的那個test節點:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
這並不奇怪,剛才創建的主題沒有Replicas,並且在服務器“0”上,我們創建它的時候,集群中只有一個服務器,所以是“0”。
發布一些信息在新的topic上:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
消費這些消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
測試集群的容錯,kill掉leader,Broker1作為當前的leader,也就是kill掉Broker1。
> ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564
備份節點之一成為新的leader,而broker1已經不在同步備份集合里了。
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
即使最初接受寫入的領導者已經失敗,這些消息仍可供消費:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
Setp 7:使用 Kafka Connect 導入/導出數據
Kafka Connect是Kafka的一個工具,它可以將數據導入和導出到Kafka。它是一種可擴展工具,通過運行connectors(連接器), 使用自定義邏輯來實現與外部系統的交互。接下來我們將學習如何使用簡單的connectors來運行Kafka Connect,這些connectors 將文件中的數據導入到Kafka topic中,並從中導出數據到一個文件。
首先,我們將創建一些種子數據來進行測試:
> echo -e "allen" > test.txt
> echo -e "tony" >> test.txt
接下來,我們將啟動兩個standalone(獨立)運行的連接器,第一個是源連接器,它從輸入文件讀取行並生成Kafka主題,第二個是宿連接器從Kafka主題讀取消息並將每個消息生成為輸出文件中的一行。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
一旦Kafka Connect進程啟動,源連接器應該開始從test.txt主題讀取行並生成它們connect-test,並且接收器連接器應該開始從主題讀取消息connect-test 並將它們寫入文件test.sink.txt。我們可以通過檢查輸出文件的內容來驗證數據是否已通過整個管道傳遞:
> more test.sink.txt
allen
tony
數據存儲在Kafka主題中connect-test,因此我們還可以運行控制台使用者來查看主題中的數據:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"allen"}
{"schema":{"type":"string","optional":false},"payload":"tony"}
...
連接器一直在處理數據,所以我們可以將數據添加到文件中,並看到它在pipeline 中移動:
> echo mike >> test.txt