Ubuntu簡單安裝kafka及使用


參考地址:https://www.jianshu.com/p/d0e630c8f4ae

kafka是一個分布式的基於發布訂閱模式的消息隊列,主要應用於大數據實時處理領域。(kafka是消費者主動拉取生產者的信息)

作用(所有的消息隊列):(1)、解耦:不需要兩個應用都同時在線(強依賴);(2)、緩沖:解決生產消息和消費消息的處理速度不一致的情況;(3)、靈活性&削封:在訪問量劇增的情況下,可以使關鍵組件頂住突發的訪問壓力,而不會造成組件的崩潰;(4)、異步通信:當一些信息不需要立即處理時,可以充當隊列,在需要處理的時候從隊列中獲取。

(為防止log文件過大導致數據定位效率低下,kafka采取了分片和索引機制,將每個分區分為多個片段(segment),每個segment對應兩個文件--.index和.log文件)

分區原因:

  (1)、方便在集群中擴展。每個分區可以通過調整以適應它所在的機器,而一個topic又可以由多個分區組成,因而整個集群就可以適應任意大小的數據了;

  (2)、可以提高並發。因為可以以分區為單位進行讀寫了。

kafka數據可靠性保證(不丟數據):為保證producer發送的數據,能可靠的發送到指定的topic,topic的每個分區收到producer發送的數據之后,都需要向producer發送acks(acknowledgement確認收到),如果producer沒有收到acks,將會重新發送數據,否者進行下一輪發送。

kafka采用所有副本同步完成之后,才發送acks(這樣做延遲高,但是重新選舉leader時,容忍n台節點的故障,只需要n+1個副本就可以了)。但是可能出現leader收到數據,所有follower都開始同步數據,但有一個follower因為某種

故障,遲遲不能與leader通信,leader將會一直等下去,為了解決這個問題,leader維護了一個動態的ISR,表示和leader保持同步的follower集合。當ISR中的follower完成數據同步之后,leader就會給producer發送acks,當ISR中的

follower超過replica.lag.time.max.ms設置的時間,為沒有向leader同步數據,則將改follower剔除。leader發生故障后,就會從ISR中重新選舉leader。

對於某些不太重要的參數,可以容忍數據的少量丟失,所以沒必要等到ISR中的所有follower同步成功。可以通過ack參數進行配置:

0:producer不等待acks,延遲低,當broker出現故障可能丟失數據;

1:producer等待acks,分區的leader寫入磁盤成功后返回acks(只等leader寫完,不管follower),若在follower同步成功之前leader故障,將會丟數據;

-1:producer等待acks,分區的leader和ISR中的follower全部寫入成功才返回acks,若follower同步完成后,broker發送acks之前,leader故障,將會造成數據重復。

(消費者消費和kafka存儲)kafka數據一致性:(LEO:指的是每個副本最大的offset(偏移量),HW:指的是ISR隊列中最小的offset,也就是消費者能見到的最大的offset)

(1)、follower故障:follower發生故障后會被踢出ISR隊列,等follower恢復后,follower會讀取本地磁盤記錄的掛掉之前的HW,並將log文件高於HW的截掉,然后從HW的位置開始向leader同步,等該follower的LED大於等於該分區的

HW,即follower追上leader之后,就可以重新加入ISR隊列了。

(2)、leader故障:leader發生故障之后,會從ISR中選出新的leader,然后其余的follower會先將各自log文件高於HW的部分截掉,接着從新的leader同步數據。

注意:這只能保證副本之間的數據一致性,不能保證數據不丟失或者不重復。

 kafka的冪等性:(無論producer發送多少次,broker只會存儲一次)

kafka中設置enable.idempotence設置為true時,即可保證冪等性,此時acks默認是-1了。開啟冪等性之后,producer會在初始化的時候分配一個PID,發往同一個分區的信息會攜帶一個seqNumber,而broker端

會對<PID,Partiton,SeqNumber>做緩存,當具有相同的SeqNumber消息時,broker之后保存一條。但是PID每次重啟都會變化,不同的分區也具有 不同的主鍵,所有冪等性無法保證跨分區回話的冪等性。

一、下載 kafka 二進制安裝包

下載地址: http://kafka.apache.org/downloads

二、上傳 kafka 壓縮包到虛擬機中並解壓到指定文件夾

tar -xzf kafka_2.11-2.3.0.tgz 

三、啟動 kafka 服務

kafka需要使用Zookeeper,首先需要啟動Zookeeper服務,如果沒有的話,可以使用kafka自帶的腳本啟動一個簡單的單一節點Zookeeper實例:

bin/zookeeper-server-start.sh config/zookeeper.properties &
(使用 bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 以守護進程啟動)

如果已經啟動了 zookeeper,則直接運行下面的命令:

bin/kafka-server-start.sh config/server.properties &

停止 kafka 服務則運行下面命令:

bin/kafka-server-stop.sh config/server.properties

四、kafka 簡單使用

4.1、創建一個主題

首先創建一個名為test的topic,只使用單個分區和一個復本

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

4.2、刪除主題

bin/kafka-topic.sh --delete --zookeeper localhost:2181 --topic test

 上面的提示表示該主題僅僅標記為“待刪除”,至於topic是否被真正刪除取決於broker端(server.properties)的參數設置delete.topic.enable,若設置為false,那么即使運行了上面的命令,主題也不會被刪除。需要說明的是主題的刪除時異步的,就算將delete.topic.enable

設置為true,當執行了上面的命令后,也需要根據主題的分片依次進行刪除。

4.3、查看主題

bin/kafka-topics.sh --list --zookeeper localhost:2181

4.4、發送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

4.5、接收消息

啟動一個消費者,消費者會接收到消息

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

五、 搭建一個多個broker的集群

剛才只是啟動了單個broker,現在啟動有3個broker組成的集群,這些broker節點也都是在本機上的:

首先為每個節點編寫配置文件:

cp config/server.properties config/server_1.properties cp config/server.properties config/server_2.properties

在拷貝出的新文件中添加以下參數:server_1.properties

 broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1

broker.id在集群中唯一的標注一個節點,因為在同一個機器上,所以必須制定不同的端口和日志文件,避免數據被覆蓋。

剛才已經啟動可Zookeeper和一個節點,現在啟動另外兩個節點:

bin/kafka-server-start.sh config/server_1.properties & bin/kafka-server-start.sh config/server_2.properties &

創建一個擁有3個副本的topic:

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic zx_test

現在我們搭建了一個集群,怎么知道每個節點的信息呢?運行“"describe topics”命令就可以了:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic zx_test

下面解釋一下這些輸出。第一行是對所有分區的一個描述,然后每個分區都會對應一行,因為我們只有一個分區所以下面就只有一行。

leader:負責處理消息的讀和寫,leader是從所有節點中隨機選擇的.
replicas:列出了所有的副本節點,不管節點是否在服務中.
isr:是正在服務中的節點.

在我們的例子中,節點1是作為leader運行。

 向topic發送消息:

 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic zx_test

消費這些消息:(三個副本,任意一個端口即可)

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092/9093/9094 --topic zx_test--from-beginning

測試一下容錯能力:Broker 1作為leader運行,現在我們kill掉它:

ps -ef|grep server-1.properties kill -9 xxxx

另外一個節點被選做了leader,node 1 不再出現在 ISR副本列表中:

雖然最初負責續寫消息的leader down掉了,但之前的消息還是可以消費的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM