Kafka 分布式發布-訂閱消息系統


1. Kafka 概述

1.1什么是 Kafka

Apache Kafka 是分布式發布-訂閱消息系統(消息中間件)。它最初由 LinkedIn 公司開發,之后成為 Apache 項目的一部分。Kafka 是一種快速、可擴展的、設計內在就是分布式的,分區的和可復制的提交日志服務。

 

簡單說明什么是Kafka:

舉個例子,生產者消費者,生產者生產雞蛋,消費者消費雞蛋,生產者生產一個雞蛋, 消費者就消費一個雞蛋,假設消費者消費雞蛋的時候噎住了(系統宕機了),生產者還在生 產雞蛋,那新生產的雞蛋就丟失了。再比如生產者很強勁(大交易量的情況),生產者1秒鍾生產100 個雞蛋,消費者1 秒鍾只能吃50 個雞蛋,那要不了一會,消費者就吃不消了

(消息堵塞,最終導致系統超時),消費者拒絕再吃了,”雞蛋“又丟失了,這個時候我們

放個籃子在它們中間,生產出來的雞蛋都放到籃子里,消費者去籃子里拿雞蛋,這樣雞蛋就 不會丟失了,都在籃子里,而這個籃子就是”Kafka“。 雞蛋其實就是“數據流”,系統之間的交互都是通過“數據流”來傳輸的(就是tcp、http 什么的),也稱為報文,也叫“消息”。 消息隊列滿了,其實就是籃子滿了,”雞蛋“ 放不下了,那趕緊多放幾個籃子,其實就是Kafka 的擴容。Kafka 就是例子中的"籃子"。

 

傳統消息中間件服務 RabbitMQ、Apache ActiveMQ 等。

Apache Kafka 與傳統消息系統相比,有以下不同:

  1.它是分布式系統,易於向外擴展;

  2.它同時為發布和訂閱提供高吞吐量;

  3.它支持多訂閱者,當失敗時能自動平衡消費者;

  4.它將消息持久化到磁盤,因此可用於批量消費,例如 ETL,以及實時應用程序。

1.2Kafka 術語

術語

解釋

Broker

Kafka 集群包含一個或多個服務器,這種服務器被稱為 broker

Topic

每條發布到 Kafka 集群的消息都有一個類別,這個類別被稱為 Topic。(物

理上不同 Topic 的消息分開存儲,邏輯上一個 Topic 的消息雖然保存於一個或多個 broker 上但用戶只需指定消息的 Topic 即可生產或消費數據而不必關心數據存於何處)

Partition

Partition 是物理上的概念,每個 Topic 包含一個或多個 Partition.

Producer

負責發布消息到 Kafka broker

Consumer

消息消費者,向 Kafka broker 讀取消息的客戶端

Consumer Group

每個 Consumer 屬於一個特定的 Consumer Group(可為每個 Consumer

指定 group name,若不指定 group name 則屬於默認的 group)

replica

partition  的副本,保障 partition  的高可用

leader

replica  中的一個角色, producer  和 consumer  只跟 leader  交互

follower

replica  中的一個角色,從 leader  中復制數據

controller

Kafka  集群中的其中一個服務器,用來進行 leader election  以及各種

failover

小白理解:

  producer:生產者,就是它來生產“雞蛋”的。

  consumer:消費者,生出的“雞蛋”它來消費。

  topic把它理解為標簽,生產者每生產出來一個雞蛋就貼上一個標簽(topic),消費者可不是誰生產的“雞蛋”都吃的,這樣不同的生產者生產出來的“雞蛋”,消費者就可以選擇性的“吃”了。

  broker:就是籃子了。

如果從技術角度,topic標簽實際就是隊列,生產者把所有“雞蛋(消息)”都放到對應的隊列里了,消費者到指定的隊列里取。

 

2. Kafka 安裝

2.1下載

Apache kafka 官方: http://kafka.apache.org/downloads.html

Scala 2.11  - kafka_2.11-0.10.2.0.tgz (asc, md5)

2.1. Kafka 集群安裝

  2.1.1. 安裝 JDK &配置 JAVA_HOME

  2.1.2. 安裝 Zookeeper

  參照 Zookeeper 官網搭建一個 ZK 集群, 並啟動 ZK 集群。

  2.1.3. 解壓 Kafka 安裝包

    2.1.3.1. 修改配置文件 config/server.properties

vi  server.properties
broker.id=0    //為依次增長的:0、1、2、3、4,集群中唯一id
log.dirs=/kafkaData/logs // Kafka 的消息數據存儲路徑zookeeper.connect=master:2181,slave1:2181,slave2:2181   //zookeeperServers   列表,各節點以逗號分開

Vi  zookeeper.properties
dataDir=/root/zkdata #指向你安裝的zk 的數據存儲目錄

#  將 Kafka server.properties    zookeeper.properties    文件拷貝到其他節點機器
KAFKA_HOME/config>scp server.properties    zookeeper.properties xx:$PWD

    2.1.3.2. 啟動 Kafka

    在每台節點上啟動:

    bin/kafka-server-start.sh -daemon config/server.properties &

    2.1.3.3. 測試集群

      1-進入 kafka 根目錄,創建 Topic 名稱為: test 的主題

    bin/kafka-topics.sh --create --zookeeper hadoop:2181,hadoop001:2181,hadoop002:2181 --replication-factor 3 --partitions 3 --topic testTopic

      2-列出已創建的 topic 列表

    bin/kafka-topics.sh --list --zookeeper localhost:2181

      3-查看 Topic 的詳細信息

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:3 Configs:

    Topic: test Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

    第一行是對所有分區的一個描述,然后每個分區對應一行,因為只有一個分區所以下面一行。

      leader:負責處理消息的讀和寫,leader 是從所有節點中隨機選擇的.

      replicas:列出了所有的副本節點,不管節點是否在服務中.

      isr:是正在服務中的節點.

    在例子中,節點 1 是作為 leader 運行。

      4-模擬客戶端去發送消息

    bin/kafka-console-producer.sh --broker-list hadoop:9092,hadoop001:9092 --topic test

      5-模擬客戶端去接受消息

    bin/kafka-console-consumer.sh --bootstrap-server hadoop:9092 --from-beginning --topic hellotopic

      6-測試一下容錯能力.

    Kill -9 pid[leader 節點]

    另外一個節點被選做了 leader,node 1 不再出現在 in-sync 副本列表中: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:3 Configs:

    Topic: test Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

    雖然最初負責續寫消息的 leader down 掉了,但之前的消息還是可以消費的:

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test

3. Kafka 客戶端開發

  3.1. Java Client

  3.1.1. 添加 pom.xml 依賴

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency>

  3.1.2. Producer  生產者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
/**
 * kafka 生產端Api開發
 */
public class ProducerApi {
    public static void main(String[] args) throws Exception{
        Properties props = new Properties();
        props.setProperty("bootstrap.servers","hadoop:9092,hadoop001:9092,hadoop002:9092");
        props.setProperty("key.serializer",StringSerializer.class.getName());
        props.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        /**
         * 發送數據的時候是否需要應答
         * 取值范圍:
         * [all,-1,0,1]
         * 0:leader不做任何應答
         * 1:leader會給producer做出應答
         * all、-1:fllower->leader->producer
         * 默認值: 1
         */
        //props.setProperty("acks","1")
        /**
         * 自定義分區
         * 默認值:org.apache.kafaka.clients.producer.internals.DefaultPartitoner
         */
        //props.setProperty("partitioner.class","org.apache.kafaka.clients.producer.internals.DefaultPartitoner");

        //創建一個生產者的客戶端實例
        KafkaProducer<Object, Object> kafkaproducer = new KafkaProducer<>(props);
        int count=0;
        while (count<1000){
            int partitionNum=count%3;
            //封裝一條消息
            ProducerRecord record = new ProducerRecord("testTopic", partitionNum, "", count+"");
            //發送一條消息
            kafkaproducer.send(record);
            count++;
            Thread.sleep(1*1000);
        }
        //釋放
        kafkaproducer.close();
    }
}
View Code

  3.1.3. Consumer  消費者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
/**
 * 消費端Api開發
 */
public class ConsumerApi {
    public static void main(String[] args) {
        Properties config = new Properties();
        HashMap<String, Object> props = new HashMap<>();
        config.put("bootstrap.servers","hadoop:9092,hadoop001:9092,hadoop002:9092");
        config.put("key.deserializer",StringDeserializer.class.getName());
        config.put("value.deserializer",StringDeserializer.class.getName());
        config.put("group.id","day12_001");
        /**
         * 從哪個位置開始獲取數據
         * 取值范圍:
         * [latest,earliest,none]
         * 默認值:
         * latest
         */
        config.put("auto.offset.reset","earliest");
        /**
         * 是否要自動遞交偏移量(offset)這條數據在某個分區所在位置的編號
         */
        config.put("enable.auto.commit",true);
        /**
         * 設置500毫秒遞交一次offset值
         */
        config.put("auto.commit.interval.ms",500);
        //創建一個客戶端實例
        KafkaConsumer<Object, Object> kafkaConsumer = new KafkaConsumer<>(config);
        //訂閱主題
        kafkaConsumer.subscribe(Arrays.asList("testTopic"));

        while (true){
            //拉取數據,會從kafka所以分區下拉取數據
            ConsumerRecords<Object, Object> records = kafkaConsumer.poll(2000);
            Iterator<ConsumerRecord<Object, Object>> iterator = records.iterator();
            while (iterator.hasNext()){
                ConsumerRecord<Object, Object> record = iterator.next();
                System.out.println("record"+record);
            }
        }
    }
}
View Code

4. Kafka 原理

4.1. Kafka 的拓撲結構

如上圖所示,一個典型的 Kafka 集群中包含若干 Producer,若干 broker(Kafka 支持水平擴展, 一般 broker 數量越多,集群吞吐率越高),若干 Consumer Group,以及一個 Zookeeper 集群。Kafka 通過 Zookeeper 管理集群配置,選舉 leader。Producer 使用 push 模式將消息發布 broker,Consumer 使用 pull 模式從 broker 訂閱並消費消息。 

4.2. Zookeeper 節點

4.3. Producer 發布消息

  • producer  采用 push  模式將消息發布到 broker,每條消息都被 append   partition

 中,屬於順序寫磁盤。

  主題是發布記錄的類別或訂閱源名稱。Kafka的主題總是多用戶; 也就是說,一個主題可以有零個,一個或多個消費者訂閱寫入它的數據。

對於每個主題,Kafka群集都維護一個如下所示的分區日志:


  • producer 發送消息到 broker 時,會根據分區算法選擇將其存儲到哪一個 partition。

  1.指定了 partition,則直接使用;

  2.未指定 partition  但指定 key,通過對 key   value  進行 hash  選出一個 partition

  3.partition   key  都未指定,使用輪詢選出一個 partition  。

4.3.1. 寫數據流程

  1. producer 先從 zookeeper  "/brokers/.../state" 節點找到該 partition  leader
  2. producer 將消息發送給該 leader
  3. leader  將消息寫入本地 log
  4. followers   leader pull  消息,寫入本地 log   leader  發送 ACK
  5. leader  收到所有 ISR(in-sync replicas) 中的 replica   ACK  后向 producer  發送 ACK

 

4.4. Broker 存儲消息

4.4.1. 消息存儲方式

物理上把 topic  分成一個或多個 partition(對應 server.properties  中的 num.partitions=3  配置),每個 partition 物理上對應一個文件夾(該文件夾存儲該 partition 的所有消息和索引文件),如下:

4.4.2. 消息存儲策略

無論消息是否被消費,kafka 都會保留所有消息。有兩種策略可以刪除舊數據:

log.retention.hours=168 #基於時間

log.retention.bytes=1073741824 #基於大小 

4.5. Kafka log 的存儲解析

Partition 中的每條 Message offset 來表示它在這個 partition 中的偏移量,這個 offset 不是 Message partition 數據文件中的實際存儲位置,而是邏輯上一個值,它唯一確定了

partition 中的一條 Message。因此,可以認為 offset partition Message id。partition

中的每條 Message 包含了以下三個屬性:

  offset

  MessageSize

  data

其中 offset long 型,MessageSize int32,表示 data 有多大,data message 的具體內容。

 

我們來思考一下,如果一個partition 只有一個數據文件會怎么樣?

1) 新數據是添加在文件末尾,不論文件數據文件有多大,這個操作永遠都是高效的。

2) 查找某個offset  的Message是順序查找的。因此,如果數據文件很大的話,查找的效率就低。

 

Kafka 是如何解決查找效率的的問題呢?有兩大法寶:1)  分段 2)  索引。

Ø 數據文件的分段

Kafka 解決查詢效率的手段之一是將數據文件分段,比如有 100 Message,它們的 offset 是從 0 99。假設將數據文件分成 5 段,第一段為 0-19,第二段為 20-39,以此類推,每段放在一個單獨的數據文件里面,數據文件以該段中最小的 offset 命名。這樣在查找指定 offset Message 的時候,用二分查找就可以定位到該 Message 在哪個段中。

Ø 為數據文件建索引

數據文件分段使得可以在一個較小的數據文件中查找對應 offset Message 了,但是這依然需要順序掃描才能找到對應 offset Message。為了進一步提高查找的效率,Kafka 為每個分段后的數據文件建立了索引文件,文件名與數據文件的名字是一樣的,只是文件擴展名為.index。

索引文件中包含若干個索引條目,每個條目表示數據文件中一條 Message 的索引。索引包含兩個部分,分別為相對 offset position。

  1.相對 offset:因為數據文件分段以后,每個數據文件的起始 offset 不為 0,相對 offset 表示這條 Message 相對於其所屬數據文件中最小的 offset 的大小。舉例,分段后的一個數據文件的 offset 是從 20 開始,那么 offset  25  Message  index 文件中的相對 offset 就是 25-20 = 5。存儲相對 offset 可以減小索引文件占用的空間。

  2.position,表示該條 Message 在數據文件中的絕對位置。只要打開文件並移動文件指針到這個 position 就可以讀取對應的 Message 了。

index 文件中並沒有為數據文件中的每條 Message 建立索引,而是采用了稀疏存儲的方式, 每隔一定字節的數據建立一條索引。這樣避免了索引文件占用過多的空間,從而可以將索引   文件保留在內存中。但缺點是沒有建立索引的 Message 也不能一次定位到其在數據文件的位置,從而需要做一次順序掃描,但是這次順序掃描的范圍就很小了。

 

  我們以幾張圖來總結一下 Message 是如何在 Kafka 中存儲的,以及如何查找指定 offset 的Message 的。

  Message 是按照 topic 來組織,每個 topic 可以分成多個的 partition,比如:有 5 partition的名為為 page_visits topic 的目錄結構為:

partition 是分段的,每個段叫 Segment,包括了一個數據文件和一個索引文件,下圖是某個partition 目錄下的文件:

可以看到,這個 partition 4 Segment。圖示 Kafka 是如何查找 Message 的。

 

比如:要查找絕對 offset 7 Message:

首先是用二分查找確定它是在哪個 LogSegment 中,自然是在第一個 Segment 中。

打開這個 Segment index 文件,也是用二分查找找到 offset 小於或者等於指定 offset 的索引條目中最大的那個 offset。自然 offset 6 的那個索引是我們要找的,通過索引文件我們知道 offset 6 Message 在數據文件中的位置為 9807。

打開數據文件,從位置為9807 的那個地方開始順序掃描直到找到offset 為7 的那條Message。這套機制是建立在 offset 是有序的。索引文件被映射到內存中,所以查找的速度還是很快的。

 

一句話,Kafka Message 存儲采用了分區(partition),分段(LogSegment)和稀疏索引這幾個手段來達到了高效性。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM