轉 https://www.jianshu.com/p/2c4caed49343
消息隊列之 Kafka
Kafka 特點
Kafka 最早是由 LinkedIn 公司開發一種分布式的基於發布/訂閱的消息系統,之后成為 Apache 的頂級項目。主要特點如下:
-
同時為發布和訂閱提供高吞吐量
Kafka 的設計目標是以時間復雜度為 O(1) 的方式提供消息持久化能力,即使對TB 級以上數據也能保證常數時間的訪問性能。即使在非常廉價的商用機器上也能做到單機支持每秒 100K 條消息的傳輸。 -
消息持久化
將消息持久化到磁盤,因此可用於批量消費,例如 ETL 以及實時應用程序。通過將數據持久化到硬盤以及 replication 防止數據丟失。 -
分布式
支持 Server 間的消息分區及分布式消費,同時保證每個 partition 內的消息順序傳輸。這樣易於向外擴展,所有的producer、broker 和 consumer 都會有多個,均為分布式的。無需停機即可擴展機器。 -
消費消息采用 pull 模式
消息被處理的狀態是在 consumer 端維護,而不是由 server 端維護,broker 無狀態,consumer 自己保存 offset。 -
支持 online 和 offline 的場景。
同時支持離線數據處理和實時數據處理。
Kafka 中的基本概念
-
Broker
Kafka 集群中的一台或多台服務器統稱為 Broker -
Topic
每條發布到 Kafka 的消息都有一個類別,這個類別被稱為 Topic 。(物理上不同
Topic 的消息分開存儲。邏輯上一個 Topic 的消息雖然保存於一個或多個broker上,但用戶只需指定消息的 Topic 即可生產或消費數據而不必關心數據存於何處) -
Partition
Topic 物理上的分組,一個 Topic 可以分為多個 Partition ,每個 Partition 是一個有序的隊列。Partition 中的每條消息都會被分配一個有序的 id(offset) -
Producer
消息和數據的生產者,可以理解為往 Kafka 發消息的客戶端 -
Consumer
消息和數據的消費者,可以理解為從 Kafka 取消息的客戶端 -
Consumer Group
每個 Consumer 屬於一個特定的 Consumer Group(可為每個 Consumer 指定Group Name,若不指定 Group Name 則屬於默認的 Group)。
這是 Kafka 用來實現一個 Topic 消息的廣播(發給所有的 Consumer )和單播(發給任意一個 Consumer )的手段。一個 Topic 可以有多個 Consumer Group。Topic 的消息會復制(不是真的復制,是概念上的)到所有的 Consumer Group,但每個 Consumer Group 只會把消息發給該 Consumer Group 中的一個 Consumer。如果要實現廣播,只要每個 Consumer 有一個獨立的 Consumer Group 就可以了。如果要實現單播只要所有的 Consumer 在同一個 Consumer Group 。用 Consumer Group 還可以將 Consumer 進行自由的分組而不需要多次發送消息到不同的 Topic 。
Kafka 安裝
Mac 用戶用 HomeBrew 來安裝,安裝前要先更新 brew
brew update
接着安裝 kafka
brew install kafka
安裝完成之后可以查看 kafka 的配置文件
cd /usr/local/etc/kafka
我的電腦通過 HomeBrew 安裝的 kafka 位置在 /usr/local/Cellar/kafka/0.11.0.1/bin ,可以看到 HomeBrew 安裝下來的 kafka 的版本已經是 0.11.0.1 的了。
kafka 需要用到 zookeeper,HomeBrew 安裝kafka 的時候會同時安裝 zookeeper。下面先啟動 zookeeper:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
接着啟動 kafka
cd /usr/local/Cellar/kafka/0.11.0.1 ./bin/kafka-server-start /usr/local/etc/kafka/server.properties
創建 topic,設置 partition 數量為2,topic 的名字叫 test-topic,下面的例子都用這個 topic
cd /usr/local/Cellar/kafka/0.11.0.1 ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic
查看創建的 topic
cd /usr/local/Cellar/kafka/0.11.0.1 ./bin/kafka-topics --list --zookeeper localhost:2181
Kafka 命令行測試
發送消息
cd /usr/local/Cellar/kafka/0.11.0.1/bin kafka-console-producer --broker-list localhost:9092 --topic test-topic
消費消息
cd /usr/local/Cellar/kafka/0.11.0.1/bin kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning
刪除 topic
cd /usr/local/Cellar/kafka/0.11.0.1/bin ./bin/kafka-topics --delete --zookeeper localhost:2181 --topic test-topic
如果 kafka 啟動時加載的配置文件中 server.properties 沒有配置delete.topic.enable=true,那么此時的刪除並不是真正的刪除,而是把 topic 標記為:marked for deletion
可以通過命令來查看所有 topic
cd /usr/local/Cellar/kafka/0.11.0.1/bin ./bin/kafka-topics --zookeeper localhost:2181 --list
如果想真正刪除它,可以如下操作
登錄zookeeper客戶端:/usr/local/Cellar/zookeeper/3.4.10/bin/zkCli 找到topic所在的目錄:ls /brokers/topics 找到要刪除的topic,執行命令:rmr /brokers/topics/test-topic 即可,此時topic被徹底刪除
Java 客戶端訪問
- maven工程的pom文件中添加依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </dependency>
- 消息生產者
package org.study.kafka;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.HashMap; import java.util.Map; public class ProducerSample { public static void main(String[] args) { Map<String, Object> props = new HashMap<String, Object>(); props.put("zk.connect", "127.0.0.1:2181");//zookeeper 的地址 props.put("bootstrap.servers", "localhost:9092");//用於建立與 kafka 集群連接的 host/port 組。 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); String topic = "test-topic"; Producer<String, String> producer = new KafkaProducer<String, String>(props); producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 1")); producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 2")); producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 3")); producer.close(); } }
- 消息消費者
package org.study.kafka;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class ConsumerSample { public static void main(String[] args) { String topic = "test-topic";// topic name Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092");//用於建立與 kafka 集群連接的 host/port 組。 props.put("group.id", "testGroup1");// Consumer Group Name props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自動提交 props.put("auto.commit.interval.ms", "1000");// 自動提交 offset 到 zookeeper 的時間間隔,時間是毫秒 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String, String> consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } }
- 啟動 zookeeper
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
- 啟動 kafka 服務器
kafka-server-start /usr/local/etc/kafka/server.properties
- 運行 Consumer
先運行 Consumer ,這樣當生產者發送消息的時候能在消費者后端看到消息記錄。 -
運行 Producer
運行 Producer,發布幾條消息,在 Consumer 的控制台能看到接收的消息
Consumer 控制台
Kafka 集群配置
kafka 的集群配置一般有三種,即: single node - single broker ,single node - multiple broker ,multiple node - multiple broker
前兩種實際上官網有介紹。
-
single node - single broker
單節點單 broker
- 啟動 zookeeper
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
- 啟動 kafka broker
kafka-server-start /usr/local/etc/kafka/server.properties
- 創建一個 kafka topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic topic-singlenode-singlebroker
- 啟動 producer 發送信息
kafka-console-producer --broker-list localhost:9092 --topic topic-singlenode-singlebroker
broker-list 和 topic 這兩個參數是必須的,broker-list 指定要連接的 broker 的地址,格式為 node_address:port 。topic 是必須的,因為需要發送消息給訂閱了該
topic 的 consumer group 。
現在可以在命令行里輸入一些信息,每一行會被作為一個消息。
- 啟動 consumer 消費消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-singlenode-singlebroker
在不同的終端窗口里分別啟動 zookeeper、broker、producer、consumer 后,在
producer 終端里輸入消息,消息就會在 consumer 終端中顯示了。
-
single node - multiple broker
單節點多 broker
- 啟動 zookeeper
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
- 啟動broker
如果需要在單個節點(即一台機子)上面啟動多個 broker(這里作為例子啟動三個 broker),需要准備多個server.properties文件即可,所以需要復制 /usr/local/etc/kafka/server.properties 文件。因為需要為每個 broker 指定單獨的屬性配置文件,其中 broker.id 、 port 、 log.dir 這三個屬性必須是不同的。
新建一個 kafka-example 目錄和三個存放日志的目錄
mkdir kafka-example
mkdir kafka-logs-1
mkdir kafka-logs-2
mkdir kafka-logs-3
復制 /usr/local/etc/kafka/server.properties 文件三份
cp server.properties /Users/niwei/Downloads/kafka-example/server-1.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-2.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-3.properties
在 broker1 的配置文件 server-1.properties 中,相關要修改的參數為:
broker.id=1 port=9093 log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1
broker2 的配置文件 server-2.properties 中,相關要修改的參數為:
broker.id=2 port=9094 log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-2
broker3 的配置文件 server-3.properties 中,相關要修改的參數為:
broker.id=3 port=9095 log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-3
啟動每個 broker
cd /Users/niwei/Downloads/kafka-example kafka-server-start server-1.properties kafka-server-start server-2.properties kafka-server-start server-3.properties
- 創建 topic
創建一個名為 topic-singlenode-multiplebroker 的topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic topic-singlenode-multiplebroker
- 啟動 producer 發送信息
如果一個 producer 需要連接多個 broker 則需要傳遞參數 broker-list
kafka-console-producer --broker-list localhost:9093, localhost:9094, localhost:9095 --topic topic-singlenode-multiplebroker
- 啟動 consumer 消費消息
kafka-console-consumer --zookeeper localhost:2181 --topic topic-singlenode-multiplebroker
- multiple node - multiple broker
多節點多 broker
在多節點多 broker 集群中,每個節點都需要安裝 Kafka,且所有的 broker 都連接到同一個 zookeeper 。這里 zookeeper 當然也是可以配置成集群方式的,具體步驟參見我之前寫的搭建 zookeeper 集群。
- Kafka 的集群配置
broker.id=1 #當前機器在集群中的唯一標識 port=9093 #當前 kafka 對外提供服務的端口,默認是 9092 host.name=192.168.121.101 #這個參數默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。 log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1 #消息存放的目錄,這個目錄可以配置為逗號分割的表達式 zookeeper.connect=192.168.120.101:2181,192.168.120.102:2181,192.168.120.103:2181 #設置 zookeeper 集群的連接端口 num.network.threads=3 #這個是 borker 進行網絡處理的線程數 num.io.threads=5 #這個是 borker 進行 IO 處理的線程數 socket.send.buffer.bytes=102400 #發送緩沖區的大小,數據先回存儲到緩沖區了到達一定的大小后在發送能提高性能 socket.receive.buffer.bytes=102400 #接收緩沖區的大小,當數據到達一定大小后在序列化到磁盤 socket.request.max.bytes=104857600 #這個參數是向 kafka 請求消息或者向 kafka 發送消息的請求的最大數,這個值不能超過 jvm 的堆棧大小 num.partitions=1 #默認的分區數,一個 topic 默認1個分區數 log.retention.hours=24 #默認消息的最大持久化時間,24小時 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka 保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務 replica.fetch.max.bytes=5242880 #取消息的最大直接數 log.segment.bytes=1073741824 #這個參數是因為 kafka 的消息是以追加的形式落地到文件,當超過這個值的時候,kafka 會新建一個文件 log.retention.check.interval.ms=300000 #每隔 300000 毫秒去檢查上面配置的 log 失效時間(log.retention.hours=24 ),到目錄查看是否有過期的消息如果有則刪除 log.cleaner.enable=false #是否啟用 log 壓縮,一般不用啟用,啟用的話可以提高性能
由於是多節點多 broker 的,所以每個 broker 的配置文件 server.properties 都要按以上說明修改
- producer 的配置修改
kafka-console-producer --broker-list 192.168.21.1:9092,192.168.21.2:9092,192.168.21.3:9092 --topic topic-multiplenode-multiplebroker
- consumer 的配置修改
kafka-console-consumer --zookeeper 192.168.120.101:2181,192.168.120.102:2181,192.168.120.103:2181 --topic topic-multiplenode-multiplebroker
Kafka 高可靠性配置
Kafka 提供了很高的數據冗余彈性,對於需要數據高可靠性的場景可以增加數據冗余備份數(replication.factor),調高最小寫入副本數的個數(min.insync.replicas)等等,但是這樣會影響性能。反之,性能提高而可靠性則降低,用戶需要自身業務特性在彼此之間做一些權衡性選擇。
要保證數據寫入到 Kafka 是安全的、高可靠的,需要如下的配置:
- topic 的配置
replication.factor>=3,即副本數至少是3個2<=min.insync.replicas<=replication.factor - broker 的配置
leader 的選舉條件 unclean.leader.election.enable=false - producer 的配置
request.required.acks=-1,producer.type=sync
Kafka 高吞吐量的秘訣
消息中間件從功能上看就是寫入數據、讀取數據兩大類,優化也可以從這兩方面來看。
為了優化寫入速度 Kafak 采用以下技術:
-
順序寫入
磁盤大多數都還是機械結構(SSD不在討論的范圍內),如果將消息以隨機寫的方式存入磁盤,就需要按柱面、磁頭、扇區的方式尋址,緩慢的機械運動(相對內存)會消耗大量時間,導致磁盤的寫入速度與內存寫入速度差好幾個數量級。為了規避隨機寫帶來的時間消耗,Kafka 采取了順序寫的方式存儲數據,如下圖所示:
順序寫
每條消息都被append 到該 partition 中,屬於順序寫磁盤,因此效率非常高。
但這種方法有一個缺陷:沒有辦法刪除數據。所以Kafka是不會刪除數據的,它會把所有的數據都保留下來,每個消費者(Consumer)對每個 Topic 都有一個
offset 用來表示讀取到了第幾條數據。
消費消息
上圖中有兩個消費者,Consumer1 有兩個 offset 分別對應 Partition0、Partition1(假設每一個 Topic 一個 Partition )。Consumer2 有一個 offset 對應Partition2 。這個 offset 是由客戶端 SDK 保存的,Kafka 的 Broker 完全無視這個東西的存在,一般情況下 SDK 會把它保存到 zookeeper 里面。
如果不刪除消息,硬盤肯定會被撐滿,所以 Kakfa 提供了兩種策略來刪除數據。一是基於時間,二是基於 partition 文件大小,具體配置可以參看它的配置文檔。
即使是順序寫,過於頻繁的大量小 I/O 操作一樣會造成磁盤的瓶頸,所以 Kakfa 在此處的處理是把這些消息集合在一起批量發送,這樣減少對磁盤 I/O 的過度操作,而不是一次發送單個消息。 -
內存映射文件
即便是順序寫入硬盤,硬盤的訪問速度還是不可能追上內存。所以 Kafka 的數據並不是實時的寫入硬盤,它充分利用了現代操作系統分頁存儲來利用內存提高I/O效率。Memory Mapped Files (后面簡稱mmap)也被翻譯成內存映射文件,在64位操作系統中一般可以表示 20G 的數據文件,它的工作原理是直接利用操作系統的 Page 來實現文件到物理內存的直接映射。完成映射之后對物理內存的操作會被同步到硬盤上(由操作系統在適當的時候)。
通過 mmap 進程像讀寫硬盤一樣讀寫內存,也不必關心內存的大小,有虛擬內存為我們兜底。使用這種方式可以獲取很大的 I/O 提升,因為它省去了用戶空間到內核空間復制的開銷(調用文件的 read 函數會把數據先放到內核空間的內存中,然后再復制到用戶空間的內存中)
但這樣也有一個很明顯的缺陷——不可靠,寫到 mmap 中的數據並沒有被真正的寫到硬盤,操作系統會在程序主動調用 flush 的時候才把數據真正的寫到硬盤。所以 Kafka 提供了一個參數—— producer.type 來控制是不是主動 flush,如果Kafka 寫入到 mmap 之后就立即 flush 然后再返回 Producer 叫同步(sync);如果寫入 mmap 之后立即返回,Producer 不調用 flush ,就叫異步(async)。 -
標准化二進制消息格式
為了避免無效率的字節復制,尤其是在負載比較高的情況下影響是顯著的。為了避免這種情況,Kafka 采用由 Producer,Broker 和 Consumer 共享的標准化二進制消息格式,這樣數據塊就可以在它們之間自由傳輸,無需轉換,降低了字節復制的成本開銷。
而在讀取速度的優化上 Kafak 采取的主要是零拷貝
-
零拷貝(Zero Copy)的技術:
傳統模式下我們從硬盤讀取一個文件是這樣的
文件傳輸到 Socket 的常規方式
(1)操作系統將數據從磁盤讀到內核空間的頁緩存區
(2)應用將數據從內核空間讀到用戶空間的緩存中
(3)應用將數據寫會內核空間的套接字緩存中
(4)操作系統將數據從套接字緩存寫到網卡緩存中,以便將數據經網絡發出
這樣做明顯是低效的,這里有四次拷貝,兩次系統調用。
針對這種情況 Unix 操作系統提供了一個優化的路徑,用於將數據從頁緩存區傳輸到 socket。在 Linux 中,是通過 sendfile 系統調用來完成的。Java提供了訪問這個系統調用的方法:FileChannel.transferTo API。這種方式只需要一次拷貝:操作系統將數據直接從頁緩存發送到網絡上,在這個優化的路徑中,只有最后一步將數據拷貝到網卡緩存中是需要的。
零拷貝方式傳輸到 Socket
這個技術其實非常普遍,The C10K problem 里面也有很詳細的介紹,Nginx 也是用的這種技術,稍微搜一下就能找到很多資料。
Kafka 速度的秘訣在於它把所有的消息都變成一個的文件。通過 mmap 提高 I/O
的速度,寫入數據的時候是末尾添加所以速度最優;讀取數據的時候配合sendfile 直接暴力輸出。所以單純的去測試 MQ 的速度沒有任何意義,Kafka 的這種暴力的做法已經脫了 MQ 的底褲,更像是一個暴力的數據傳送器。
