Kafka原理詳細介紹


1 Kafka

1.1 定義

Kafka 是一個分布式流媒體平台,kafka官網:http://kafka.apache.org/

Kafka 是一種高吞吐量、分布式、基於發布/訂閱的消息系統,最初由 LinkedIn 公司開發,使用Scala 語言編寫,目前是Apache 的開源項目。

流媒體平台有三個關鍵功能:

  • 發布和訂閱記錄流,類似於消息隊列或企業消息傳遞系統。
  • 以容錯的持久方式存儲記錄流。
  • 記錄發生時處理流。

Kafka通常用於兩大類應用:

  • 構建可在系統或應用程序之間可靠獲取數據的實時流數據管道
  • 構建轉換或響應數據流的實時流應用程序

1.1.1 Kafka名詞

下面是Kafka中涉及到的相關概念:

  1. brokerKafka 服務器,負責消息存儲和轉發
  2. topic:消息類別,Kafka 按照topic 來分類消息(即使如此,kafka仍然有點對點和廣播發布類型)
  3. partitiontopic 的分區,一個 topic 可以包含多個 partitiontopic 消息保存在各個partition
  4. offset:消息在日志中的位置,可以理解是消息在 partition 上的偏移量,也是代表該消息的唯一序號
  5. Producer:消息生產者
  6. Consumer:消息消費者
  7. Consumer Group:消費者分組,每個Consumer 必須屬於一個 group
  8. Zookeeper:保存着集群 broker、topic、partitionmeta 數據;另外,還負責 broker 故障發現,partition leader 選舉,負載均衡等功能

1.1.2 Kafka核心API

Kafka有四個核心API:

  • Producer API(生產者API)允許應用程序發布記錄流至一個或多個kafkatopics(主題)

  • Consumer API(消費者API)允許應用程序訂閱一個或多個topics(主題),並處理所產生的對他們記錄的數據流。

  • Streams API(流API)允許應用程序充當流處理器,從一個或多個topics(主題)消耗的輸入流,並產生一個輸出流至一個或多個輸出的topics(主題),有效地變換所述輸入流,以輸出流。

  • Connector API(連接器API)允許構建和運行kafka topics(主題)連接到現有的應用程序或數據系統中重用生產者或消費者。例如,關系數據庫的連接器可能捕獲對表的每個更改。

Kafka中,客戶端和服務器之間的通信是通過簡單,高性能,語言無關的TCP協議完成的。此協議已版本化並保持與舊版本的向后兼容性。Kafka提供Java客戶端,但客戶端有多種語言版本。

1.2 相關組件介紹

1.2.1 Topic

Topic 是生產者發送消息的目標地址,是消費者的監聽目標
在這里插入圖片描述
一個服務可以監聽、發送多個 Topics
在這里插入圖片描述
Kafka 中有一個consumer-group(消費者組)的概念。
這是一組服務,扮演一個消費者
在這里插入圖片描述
如果是消費者組接收消息,Kafka 會把一條消息路由到組中的某一個服務
在這里插入圖片描述
這樣有助於消息的負載均衡,也方便擴展消費者。
Topic 扮演一個消息的隊列。
首先,一條消息發送了
在這里插入圖片描述
然后,這條消息被記錄和存儲在這個隊列中,不允許被修改
在這里插入圖片描述
接下來,消息會被發送給此 Topic 的消費者。
但是,這條消息並不會被刪除,會繼續保留在隊列中
在這里插入圖片描述
像之前一樣,這條消息會發送給消費者、不允許被改動、一直呆在隊列中。
(消息在隊列中能呆多久,可以修改 Kafka 的配置)
在這里插入圖片描述
在這里插入圖片描述

1.2.2 Partitions分區

上面 Topic 的描述中,把 Topic 看做了一個隊列,實際上,一個 Topic 是由多個隊列組成的,被稱為Partition(分區)
這樣可以便於 Topic 的擴展
在這里插入圖片描述
生產者發送消息的時候,這條消息會被路由到此 Topic 中的某一個 Partition
在這里插入圖片描述
消費者監聽的是所有分區
在這里插入圖片描述
生產者發送消息時,默認是面向 Topic 的,由 Topic 決定放在哪個 Partition,默認使用輪詢策略
在這里插入圖片描述
也可以配置 Topic,讓同類型的消息都在同一個 Partition
例如,處理用戶消息,可以讓某一個用戶所有消息都在一個 Partition
例如,用戶1發送了3條消息:A、B、C,默認情況下,這3條消息是在不同的 Partition 中(如 P1、P2、P3)。
在配置之后,可以確保用戶1的所有消息都發到同一個分區中(如 P1)
在這里插入圖片描述
這個功能有什么用呢?
這是為了提供消息的【有序性】。
消息在不同的 Partition 是不能保證有序的,只有一個 Partition 內的消息是有序的
在這里插入圖片描述
在這里插入圖片描述

1.2.3 Topics主題 和 partitions分區

一個Topic可以認為是一類消息,每個topic將被分成多個partition(區),每個partition在存儲層面是append log文件
主題是發布記錄的類別或訂閱源名稱。Kafka的主題總是多用戶; 也就是說,一個主題可以有零個,一個或多個消費者訂閱寫入它的數據。
對於每個主題,Kafka集群都維護一個如下所示的分區日志:
在這里插入圖片描述

每個分區都是一個有序的,不可變的記錄序列,不斷附加到結構化的提交日志中。分區中的記錄每個都分配了一個稱為偏移的順序ID號,它唯一地標識分區中的每個記錄。

Kafka集群持久保存所有已發布的記錄 - 無論是否已使用 - 使用可配置的保留期。例如,如果保留策略設置為兩天,則在發布記錄后的兩天內,它可供使用,之后將被丟棄以釋放空間。Kafka的性能在數據大小方面實際上是恆定的,因此長時間存儲數據不是問題。
在這里插入圖片描述

實際上,基於每個消費者保留的唯一元數據是該消費者在日志中的偏移或位置。這種偏移由消費者控制:通常消費者在讀取記錄時會線性地提高其偏移量,但事實上,由於該位置由消費者控制,因此它可以按照自己喜歡的任何順序消費記錄。例如,消費者可以重置為較舊的偏移量來重新處理過去的數據,或者跳到最近的記錄並從“現在”開始消費。

這些功能組合意味着Kafka 消費者consumers 非常cheap - 他們可以來來往往對集群或其他消費者沒有太大影響。例如,可以使用我們的命令行工具“tail”任何主題的內容,而無需更改任何現有使用者所消耗的內容。

日志中的分區有多種用途。首先,它們允許日志擴展到超出適合單個服務器的大小。每個單獨的分區必須適合托管它的服務器,但主題可能有許多分區,因此它可以處理任意數量的數據。其次,它們充當了並行性的單位 - 更多的是它

1.2.4 Distribution分配

一個Topic的多個partitions,被分布在kafka集群中的多個server上;每個server(kafka實例)負責partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(replicas),每個partition將會被備份到多台機器上,以提高可用性.

基於replicated方案,那么就意味着需要對多個備份進行調度;每個partition都有一個serverleader;leader負責所有的讀寫操作,如果leader失效,那么將會有其他follower來接管(成為新的leader);follower只是單調的和leader跟進,同步消息即可..由此可見作為leaderserver承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味着有多少個"leader",kafka會將"leader"均衡的分散在每個實例上,來確保整體的性能穩定

1.2.5 Producers生產者 和 Consumers消費者

1.2.5.1 Producers生產者

Producers將數據發布到指定的topics 主題。同時Producer 也能決定將此消息歸屬於哪個partition;比如基於round-robin方式或者通過其他的一些算法等。

1.2.5.2 Consumers

本質上kafka只支持Topic.每個consumer屬於一個consumer group;反過來說,每個group中可以有多個consumer.發送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費。

如果所有使用者實例具有相同的使用者組,則記錄將有效地在使用者實例上進行負載平衡。

如果所有消費者實例具有不同的消費者組,則每個記錄將廣播到所有消費者進程。

圖片

分析:兩個服務器Kafka群集,托管四個分區(P0-P3),包含兩個使用者組。消費者組A有兩個消費者實例,B組有四個消費者實例。

Kafka中實現消費consumption 的方式是通過在消費者實例上划分日志中的分區,以便每個實例在任何時間點都是分配的“公平份額”的獨占消費者。維護組中成員資格的過程由Kafka協議動態處理。如果新實例加入該組,他們將從該組的其他成員接管一些分區; 如果實例死亡,其分區將分發給其余實例。

Kafka僅提供分區內記錄的總訂單,而不是主題中不同分區之間的記錄。對於大多數應用程序而言,按分區排序與按鍵分區數據的能力相結合就足夠了。但是,如果您需要對記錄進行總訂單,則可以使用僅包含一個分區的主題來實現,但這將意味着每個使用者組只有一個使用者進程。

1.2.5.3 Consumers kafka確保

發送到partitions中的消息將會按照它接收的順序追加到日志中。也就是說,如果記錄M1由與記錄M2相同的生成者發送,並且首先發送M1,則M1將具有比M2更低的偏移並且在日志中更早出現。

消費者實例按照它們存儲在日志中的順序查看記錄。對於消費者而言,它們消費消息的順序和日志中消息順序一致。

如果Topicreplicationfactor為N,那么允許N-1個kafka實例失效,我們將容忍最多N-1個服務器故障,而不會丟失任何提交到日志的記錄。

1.2.6 架構和zookeeper關系

Kafka 是集群架構的,ZooKeeper是重要組件。
在這里插入圖片描述
ZooKeeper 管理者所有的 TopicPartition
TopicPartition 存儲在 Node 物理節點中,ZooKeeper負責維護這些 Node
在這里插入圖片描述
有2個 Topic,各自有2個 Partition
在這里插入圖片描述
這是邏輯上的形式,但在 Kafka 集群中的實際存儲可能是這樣的
在這里插入圖片描述
Topic APartition #1 有3份,分布在各個 Node 上。
這樣可以增加 Kafka 的可靠性和系統彈性。
3個 Partition #1 中,ZooKeeper 會指定一個 Leader,負責接收生產者發來的消息
在這里插入圖片描述
其他2個 Partition #1 會作為 Follower,Leader 接收到的消息會復制給 Follower
在這里插入圖片描述
這樣,每個 Partition 都含有了全量消息數據。
在這里插入圖片描述
即使某個 Node 節點出現了故障,也不用擔心消息的損壞。
Topic A 和 Topic B 的所有 Partition 分布可能就是這樣的
在這里插入圖片描述
轉載於:https://mp.weixin.qq.com/s/k7DJJGmImcpnaSy9AhAmmQ

1.3 kafka是如何保證消息的有序性

kafka這樣保證消息有序性的:
一個 topic,一個 partition,一個 consumer,內部單線程消費,單線程吞吐量太低,一般不會用這個。(全局有序性)
N 個內存 queue,具有相同 key 的數據都到同一個內存 queue;然后對於 N 個線程,每個線程分別消費一個內存 queue 即可,這樣就能保證順序性。
大家可以看下消息隊列的有序性是怎么推導的:

消息的有序性,就是指可以按照消息的發送順序來消費。有些業務對消息的順序是有要求的,比如先下單再付款,最后再完成訂單,這樣等。假設生產者先后產生了兩條消息,分別是下單消息(M1),付款消息(M2),M1比M2先產生,如何保證M1比M2先被消費呢。

在這里插入圖片描述

為了保證消息的順序性,可以將將M1、M2發送到同一個Server上,當M1發送完收到ack后,M2再發送。如圖:
在這里插入圖片描述

這樣還是可能會有問題,因為從MQ服務器到服務端,可能存在網絡延遲,雖然M1先發送,但是它比M2晚到。
在這里插入圖片描述

那還能怎么辦才能保證消息的順序性呢?將M1和M2發往同一個消費者,且發送M1后,等到消費端ACK成功后,才發送M2就得了。
在這里插入圖片描述

消息隊列保證順序性整體思路就是這樣啦。比如Kafka的全局有序消息,就是這種思想的體現: 就是生產者發消息時,1個Topic只能對應1個Partition,一個 Consumer,內部單線程消費。
但是這樣吞吐量太低,一般保證消息局部有序即可。在發消息的時候指定Partition KeyKafka對其進行Hash計算,根據計算結果決定放入哪個Partition。這樣Partition Key相同的消息會放在同一個Partition。然后多消費者單線程消費指定的Partition
參考鏈接:https://mp.weixin.qq.com/s/gHjuYH6R6Fgfn3WZ8W79Zg

1.4 消息重復

1.4.1 引言

數據重復這個問題其實也是挺正常,全鏈路都有可能會導致數據重復
在這里插入圖片描述

通常,消息消費時候都會設置一定重試次數來避免網絡波動造成的影響,同時帶來副作用是可能出現消息重復。

整理下消息重復的幾個場景:

  • 生產端: 遇到異常,基本解決措施都是重試
    • 場景一:leader分區不可用了,拋 LeaderNotAvailableException 異常,等待選出新 leader 分區。
    • 場景二:Controller 所在 Broker 掛了,拋 NotControllerException 異常,等待 Controller 重新選舉。
    • 場景三:網絡異常、斷網、網絡分區、丟包等,拋 NetworkException 異常,等待網絡恢復。
  • 消費端poll 一批數據,處理完畢還沒提交 offset ,機子宕機重啟了,又會 poll 上批數據,再度消費就造成了消息重復。

1.4.2 解決重復消息

了解下消息的三種投遞語義:

  • 最多一次(at most once): 消息只發一次,消息可能會丟失,但絕不會被重復發送。例如:mqtt 中 QoS = 0。
  • 至少一次(at least once): 消息至少發一次,消息不會丟失,但有可能被重復發送。例如:mqtt 中 QoS = 1
  • 精確一次(exactly once): 消息精確發一次,消息不會丟失,也不會被重復發送。例如:mqtt 中 QoS = 2。

了解了這三種語義,再來看如何解決消息重復,即如何實現精准一次,可分為三種方法:

  • Kafka 冪等性 Producer: 保證生產端發送消息冪等。局限性,是只能保證單分區且單會話(重啟后就算新會話)
  • Kafka 事務: 保證生產端發送消息冪等。解決冪等 Producer 的局限性。
  • 消費端冪等: 保證消費端接收消息冪等

1.4.2.1 Kafka 冪等性 Producer

冪等性指:無論執行多少次同樣的運算,結果都是相同的。即一條命令,任意多次執行所產生的影響均與一次執行的影響相同

冪等性使用示例:在生產端添加對應配置即可

  1. 設置冪等,啟動冪等。
  2. 配置 acks注意:一定要設置 acks=all,否則會拋異常。
  3. 配置 max.in.flight.requests.per.connection 需要 <= 5,否則會拋異常 OutOfOrderSequenceException
    • 0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
    • Kafka >= 1.1, max.in.flight.request.per.connection <= 5
Properties props = new Properties();
props.put("enable.idempotence", ture); // 1. 設置冪等
props.put("acks", "all"); // 2. 當 enable.idempotence 為 true,這里默認為 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 注意

為了更好理解,需要了解下 Kafka 冪等機制:
在這里插入圖片描述

  1. Producer 每次啟動后,會向 Broker 申請一個全局唯一的 pid。(重啟后 pid 會變化,這也是弊端之一)
  2. Sequence Numbe:針對每個 <Topic, Partition> 都對應一個從0開始單調遞增的 Sequence,同時 Broker端會緩存這個 seq num
  3. 判斷是否重復: 拿 <pid, seq num>Broker 里對應的隊列 ProducerStateEntry.Queue(默認隊列長度為 5)查詢是否存在
    如果 nextSeq == lastSeq + 1,即 服務端seq + 1 == 生產傳入seq,則接收。
    如果 nextSeq == 0 && lastSeq == Int.MaxValue,即剛初始化,也接收。
    反之,要么重復,要么丟消息,均拒絕。
    在這里插入圖片描述

這種設計針對解決了兩個問題:

  • 消息重復: 場景 Broker 保存消息后還沒發送 ack 就宕機了,這時候 Producer 就會重試,這就造成消息重復。
  • 消息亂序: 避免場景,前一條消息發送失敗而其后一條發送成功,前一條消息重試后成功,造成的消息亂序。

那什么時候該使用冪等:

  • 如果已經使用 acks=all,使用冪等也可以。
  • 如果已經使用 acks=0 或者 acks=1,說明系統追求高性能,對數據一致性要求不高。不要使用冪等

使用例子
啟動消息者:可以用 Kafka 提供的腳本

# 舉個栗子:topic 需要自己去修改
$ cd ./kafka-2.7.1-src/bin
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

創建 topic : 1副本,2 分區

$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2

# 查看
$ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe

生產者代碼

public class KafkaProducerApplication {

    private final Producer<String, String> producer;
    final String outTopic;

    public KafkaProducerApplication(final Producer<String, String> producer,
                                    final String topic) {
        this.producer = producer;
        outTopic = topic;
    }

    public void produce(final String message) {
        final String[] parts = message.split("-");
        final String key, value;
        if (parts.length > 1) {
            key = parts[0];
            value = parts[1];
        } else {
            key = null;
            value = parts[0];
        }
        final ProducerRecord<String, String> producerRecord
            = new ProducerRecord<>(outTopic, key, value);
        producer.send(producerRecord,
                (recordMetadata, e) -> {
                    if(e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
                    }
                }
        );
    }

    public void shutdown() {
        producer.close();
    }

    public static void main(String[] args) {

        final Properties props = new Properties();

        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        final String topic = "myTopic";
        final Producer<String, String> producer = new KafkaProducer<>(props);
        final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);

        String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
        try {
            List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
            linesToProduce.stream().filter(l -> !l.trim().isEmpty())
                    .forEach(producerApp::produce);
            System.out.println("Offsets and timestamps committed in batch from " + filePath);
        } catch (IOException e) {
            System.err.printf("Error reading file %s due to %s %n", filePath, e);
        } finally {
            producerApp.shutdown();
        }
    }
}

啟動消費者:

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic

1.4.2.2 Kafka 事務

使用 Kafka 事務解決冪等的弊端:單會話且單分區冪等。

事務使用示例:分為生產端 和 消費端

Properties props = new Properties();
props.put("enable.idempotence", ture); // 1. 設置冪等
props.put("acks", "all"); // 2. 當 enable.idempotence 為 true,這里默認為 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 最大等待數
props.put("transactional.id", "my-transactional-id"); // 4. 設定事務 id

Producer<String, String> producer = new KafkaProducer<String, String>(props);

// 初始化事務
producer.initTransactions();

try{
    // 開始事務
    producer.beginTransaction();

    // 發送數據
    producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
 
    // 數據發送及 Offset 發送均成功的情況下,提交事務
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // 數據發送或者 Offset 發送出現異常時,終止事務
    producer.abortTransaction();
} finally {
    // 關閉 Producer 和 Consumer
    producer.close();
    consumer.close();
}

這里消費端 Consumer 需要設置下配置:isolation.level 參數

  • read_uncommitted: 這是默認值,表明 Consumer 能夠讀取到 Kafka 寫入的任何消息,不論事務型 Producer 提交事務還是終止事務,其寫入的消息都可以讀取。如果用了事務型 Producer,那么對應的 Consumer 就不要使用這個值。
  • read_committed: 表明 Consumer 只會讀取事務型 Producer 成功提交事務寫入的消息。當然了,它也能看到非事務型 Producer 寫入的所有消息。

1.4.2.3 消費端冪等

只要消費端具備了冪等性,那么重復消費消息的問題也就解決了。
典型的方案是使用:消息表,來去重:
在這里插入圖片描述
上述demo中,消費端拉取到一條消息后,開啟事務,將消息Id 新增到本地消息表中,同時更新訂單信息。
如果消息重復,則新增操作 insert 會異常,同時觸發事務回滾。

參考鏈接:https://juejin.cn/post/7172897190627508237


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM