kafka消息的分發與消費


關於 Topic 和 Partition:

  Topic:

在 kafka 中,topic 是一個存儲消息的邏輯概念,可以認為是一個消息集合。每條消息發送到 kafka 集群的消息都有一個類別。物理上來說,不同的 topic 的消息是分開存儲的,每個 topic 可以有多個生產者向它發送消息,也可以有多個消費者去消費其中的消息。

  Partition:

  每個 topic 可以划分多個分區(每個 Topic 至少有一個分區),同一 topic 下的不同分區包含的消息是不同的。每個消息在被添加到分區時,都會被分配一個 offset(稱之為偏移量),它是消息在此分區中的唯一編號,kafka 通過 offset保證消息在分區內的順序,offset 的順序不跨分區,即 kafka只保證在同一個分區內的消息是有序的。下圖中,對於名字為 test 的 topic,做了 3 個分區,分別是p0、p1、p2.

➢ 每一條消息發送到 broker 時,會根據 partition 的規則選擇存儲到哪一個 partition。如果 partition 規則設置合理,那么所有的消息會均勻的分布在不同的partition中,這樣就有點類似數據庫的分庫分表的概念,把數據做了分片處理。

  Topic&Partition 的存儲:

  Partition 是以文件的形式存儲在文件系統中,比如創建一個名為 firstTopic 的 topic,其中有 3 個 partition,那么在kafka 的數據目錄(/tmp/kafka-log)中就有 3 個目錄,firstTopic-0~3,命名規則是<topic_name>-<partition_id>,創建3個分區的topic:

sh kafka-topics.sh --create --zookeeper 192.168.254.135:2181 --replication-factor 1 --partitions 3 --topic firstTopic

kafka 消息分發策略:

  消息是 kafka 中最基本的數據單元,在 kafka 中,一條消息由 key、value 兩部分構成,在發送一條消息時,我們可以指定這個 key,那么 producer 會根據 key 和 partition 機制來判斷當前這條消息應該發送並存儲到哪個 partition 中。我們可以根據需要進行擴展 producer 的 partition 機制。

   我們可以通過如下代碼來實現自己的分片策略:

public class MyPartition implements Partitioner {//實現Partitioner接口

    private Random random=new Random();
@Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //獲得分區列表 List<PartitionInfo> partitionInfos=cluster.partitionsForTopic(topic); int partitionNum=0; if(key==null){ partitionNum=random.nextInt(partitionInfos.size()); //隨機分區 }else{ partitionNum=Math.abs((key.hashCode())%partitionInfos.size()); } System.out.println("key ->"+key+"->value->"+value+"->"+partitionNum); return partitionNum; //指定發送的分區值 } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }

  然后基於之前的代碼在producer上需要在消息發送端增加配置:指定自己的partiton策略

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.gupaoedu.kafka.MyPartition");

消息默認的分發機制:

  默認情況下,kafka 采用的是 hash 取模的分區算法。如果Key 為 null,則會隨機分配一個分區。這個隨機是在這個參數”metadata.max.age.ms”的時間范圍內隨機選擇一個。對於這個時間段內,如果 key 為 null,則只會發送到唯一的分區。這個值在默認情況下是 10 分鍾更新一次。關 於 Metadata ,簡單理解就是Topic/Partition 和 broker 的映射關系,每一個 topic 的每一個 partition,需要知道對應的 broker 列表是什么,leader是誰、follower 是誰。這些信息都是存儲在 Metadata 這個類里面。

消費端如何消費指定的分區:

  通過下面的代碼,就可以消費指定該 topic 下的 0 號分區。其他分區的數據就無法接收。

//消費指定分區的時候,不需要再訂閱
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消費指定的分區
TopicPartition topicPartition=new TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartition));

消息的消費原理:

   在實際生產過程中,每個 topic 都會有多個 partitions,多個 partitions 的好處在於,一方面能夠對 broker 上的數據進行分片有效減少了消息的容量從而提升 io 性能。另外一方面,為了提高消費端的消費能力,一般會通過多個consumer 去消費同一個 topic ,也就是消費端的負載均衡機制,也就是我們接下來要了解的,在多個 partition 以及多個 consumer 的情況下,消費者是如何消費消息的?kafka 存在 consumer group的 概 念 , 也 就是 group.id 一 樣 的 consumer ,這些consumer 屬於一個 consumer group,組內的所有消費者協調在一起來消費訂閱主題的所有分區。當然每一個分區只能由同一個消費組內的 consumer 來消費,那么同一個consumer group 里面的 consumer 是怎么去分配該消費哪個分區里的數據的呢?舉個簡單的例子就是如果存在的分區輸,即partiton的數量於comsumer數量一致的時候,每個comsumer對應一個分區,如果comsumer數量多於分區,那么多出來的數量的comsumer將不工作,相反則是其中將會有comsumer消費多個分區。

  分區分配策略:

  在 kafka 中,存在兩種分區分配策略,一種是 Range(默認)、另 一 種 另 一 種 還 是 RoundRobin ( 輪 詢 )。 通 過comsumer的配置partition.assignment.strategy 這個參數來設置。

  Range strategy(范圍分區): 

  Range 策略是對每個主題而言的,首先對同一個主題里面的分區按照序號進行排序,並對消費者按照字母順序進行排序。假設我們有 10 個分區,3 個消費者,排完序的分區將會是 0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消費者線程排完序將會是C1-0, C2-0, C3-0。然后將 partitions 的個數除於消費者線程的總數來決定每個消費者線程消費幾個分區。如果除不盡,那么前面幾個消費者線程將會多消費一個分區。比如我們有 10 個分區,3 個消費者線程, 10 / 3 = 3,而且除不盡,那么消費者線程 C1-0 將會多消費一個分區,所以最后分區分配的結果看起來是這樣的:

  C1-0 將消費 0, 1, 2, 3 分區

  C2-0 將消費 4, 5, 6 分區

  C3-0 將消費 7, 8, 9 分區

假如我們有 11 個分區,那么最后分區分配的結果看起來是這樣的:

  C1-0 將消費 0, 1, 2, 3 分區

  C2-0 將消費 4, 5, 6, 7 分區

  C3-0 將消費 8, 9, 10 分區

假如我們有 2 個主題(T1 和 T2),分別有 10 個分區,那么最后分區分配的結果看起來是這樣的:

  C1-0 將消費 T1 主題的 0, 1, 2, 3 分區以及 T2 主題的 0, 1, 2, 3 分區

  C2-0 將消費 T1 主題的 4, 5, 6 分區以及 T2 主題的 4, 5, 6 分區

  C3-0 將消費 T1 主題的 7, 8, 9 分區以及 T2 主題的 7, 8, 9 分區

可以看出,C1-0 消費者線程比其他消費者線程多消費了 2 個分區,這就是 Range strategy 的一個很明顯的弊端.

  RoundRobin strategy(輪詢分區):

  輪詢分區策略是把所有 partition 和所有 consumer 線程都列出來,然后按照 hashcode 進行排序。最后通過輪詢算法分配 partition 給消費線程。如果所有 consumer 實例的訂閱是相同的,那么 partition 會均勻分布。假如按照 hashCode 排序完的 topic&partitions 組依次為 T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我們的消費者線程排序為 C1-0, C1-1, C2-0, C2-1,最后分區分配的結果為:

  C1-0 將消費 T1-5, T1-2, T1-6 分區;

  C1-1 將消費 T1-3, T1-1, T1-9 分區;

  C2-0 將消費 T1-0, T1-4 分區;

  C2-1 將消費 T1-8, T1-7 分區;

  使用輪詢分區策略必須滿足兩個條件

    1. 每個主題的消費者實例具有相同數量的流

    2. 每個消費者訂閱的主題必須是相同的

  什么時候會觸發這個策略呢?當出現以下幾種情況時,kafka 會進行一次分區分配操作,也就是 kafka consumer 的 rebalance。

  1. 同一個 consumer group 內新增了消費者。

  2. 消費者離開當前所屬的 consumer group,比如主動停機或者宕機。

  3. topic 新增了分區(也就是分區數量發生了變化)。

  4.消費者主動取消訂閱topic

  kafka consuemr 的 rebalance 機制規定了一個 consumer group 下的所有 consumer 如何達成一致來分配訂閱 topic的每個分區。而具體如何執行分區策略,就是前面提到過的兩種內置的分區策略。而 kafka 對於分配策略這塊,提供了可插拔的實現方式, 也就是說,除了這兩種之外,我們還可以創建自己的分配機制。可以通過繼承 AbstractPartitionAssignor 抽象類實現 assign 來做到。

  誰來執行 Rebalance 以及管理 consumer 的 group 呢?

  Kafka 提供了一個角色:coordinator(協調員) 來執行對於 consumer group 的管理,當 consumer group 的第一個 consumer 啟動的時候,它會去和 kafka server(broker) 確定誰是它們組的 coordinator。之后該 group 內的所有成員都會和該 coordinator 進行協調通信。consumer group 如何確定自己的 coordinator 是誰呢? 消費 者 向 kafka 集 群 中 的 任 意 一 個 broker 發 送 一 個GroupCoordinatorRequest 請求,服務端會返回一個負載最 小 的 broker 節 點 的 id , 並 將 該 broker 設 置 為coordinator。在 rebalance 之前,需要保證 coordinator 是已經確定好了的,整個 rebalance 的過程分為兩個步驟 ,一個是JoinGroup 的過程,在這個過程之后會進入一個Synchronizing Group State 階段。那么這兩個階段都做了什么呢?

  JoinGroup 的過程:

  表示加入到 consumer group 中,在這一步中,所有的成員都會向 coordinator 發送 joinGroup 的請求。一旦所有成員都發送了 joinGroup 請求,那么 coordinator 會選擇一個 consumer 擔任 leader 角色,並把組成員信息和訂閱信息發送消費者。下圖就是描述了這么一個過程,並且請求與響應中攜帶的一些重要的信息。

  protocol_metadata: 序列化后的消費者的訂閱信息

  leader_id: 消費組中的消費者,coordinator 會選擇一個座位 leader,對應的就是 member_id

  member_metadata 對應消費者的訂閱信息

  members:consumer group 中全部的消費者的訂閱信息

  generation_id:年代信息,類似於 zookeeper 的時候的 epoch 是一樣的,對於每一輪 rebalance ,generation_id 都會遞增。主要用來保護 consumer group。隔離無效的 offset 提交。也就是上一輪的      consumer 成員無法提交 offset 到新的 consumer group 中。 

   Synchronizing Group State 階段:

  進入了 Synchronizing Group State階段,主要邏輯是向 GroupCoordinator 發 送SyncGroupRequest 請求,並且處理 SyncGroupResponse響應,簡單來說,就是 leader 將消費者對應的 partition 分配方案同步給 consumer group 中的所有 consumer,每個消費者都會向 coordinator 發送 syncgroup 請求,不過只有 leader 節點會發送分配方案,其他消費者只是打打醬油而已。當 leader 把方案發給 coordinator 以后,coordinator 會把結果設置到 SyncGroupResponse 中。這樣所有成員都知道自己應該消費哪個分區。

   member_assignment :在syncGroup發送請求的時候,只有leader角色的comsumer才會去發送這個信息,而其他消費端是空的。然后會通過coordinator去分發給各個comsumer。

  ➢ consumer group 的分區分配方案是在客戶端執行的!Kafka 將這個權利下放給客戶端主要是因為這樣做可以有更好的靈活性

offset :

  每個 topic可以划分多個分區(每個 Topic 至少有一個分區),同一topic 下的不同分區包含的消息是不同的。每個消息在被添加到分區時,都會被分配一個 offset(稱之為偏移量),它是消息在此分區中的唯一編號,kafka 通過 offset 保證消息在分區內的順序,offset 的順序不跨分區,即 kafka 只保證在同一個分區內的消息是有序的; 對於應用層的消費來說,每次消費一個消息並且提交以后,會保存當前消費到的最近的一個 offset。那么 offset 保存在哪里?

  這個重要的topic我們是不允許其出現單點故障的,所以我們需要在其生成都時候就創建副本,可是默認副本數是1 ,我們可以通過調整參數去修改:

offsets.topic.replication.factor=3

  offset 在哪里維護?

  在 kafka 中,提供了一個__consumer_offsets_* 的一個topic , 把 offset 信 息 寫 入 到 這 個 topic 中 。__consumer_offsets——保存了每個 consumer group某一時刻提交的 offset 信息。__consumer_offsets 默認有50 個分區。可以在 /tmp/kafka-logs/ 下查看。那么如何查看對應的 consumer_group 保存在哪個分區中呢?

  通過Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 由 於 默 認 情 況 下groupMetadataTopicPartitionCount 有 50 個分區,計算得到的結果為:4, 意味着當前的 consumer_group 的位移信息保存在__consumer_offsets 的第 4個分區,執行如下命令,可以查看當前 consumer_goup 中的offset 位移信息,消費端需保持連接狀態。

sh kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 4 --broker-list 192.168.254.135:9092,192.168.254.136:9092,192.168.254.137:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

  或者 2.11-2.3.0版本。需要確保listeners=PLAINTEXT://192.168.1.101:9092 。外部代理地址 advertised.listeners=PLAINTEXT://192.168.1.101:9092都已經修改,且消費者已經有所消費,否者會卡着。

sh /mysoft/kafka/bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 35 --bootstrap-server 192.168.254.135:9092,192.168.254.136:9092,192.168.254.137:9092 --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'

執行語句可以看到如下結果:

  這個意思就是 KafkaConsumerDemo1 消費組在 testTopic 中現在的 offsets 現在是 111.

動態增加Topic的副本(Replication):

  對於 __consumer_offsets 這個topic,在我們沒有修改配置的情況下其默認的副本數量是 1 ,這種情況會出現的問題是消費組所對應的機器掛了會導致某一些消費者無法繼續消費,當服務重啟后,我們可以進行動態擴容副本數量。

  首先我們需要運行以下命令查看指定 Topic的情況:

sh kafka-topics.sh --topic __consumer_offsets  --describe --zookeeper 192.168.1.101:2181

  執行后會出現以下信息:

  緊接着,我們需要准備一個擴容的Json 文件(replication.json):

{
    "version": 1, 
    "partitions": [
        {
            "topic": "__consumer_offsets", //哪個topic
            "partition": 35, //指定哪個分區
            "replicas": [//這里是機器的Id
                1, 
                2, 
                3
            ]
        },
        {
            "topic": "__consumer_offsets", //哪個topic
            "partition": 36, //指定哪個分區
            "replicas": [//這里是機器的Id
                1, 
                2, 
                3
            ]
        }//........可以多個
    ]
}

  接下去需要執行以下命令:

sh kafka-reassign-partitions.sh --zookeeper 192.168.1.101:2181 --reassignment-json-file replication.json --execute   

  執行完會出現:

  可以執行以下命令驗證執行結果:sh kafka-reassign-partitions.sh --zookeeper 192.168.1.101:2181 --reassignment-json-file replication.json --verify

  接着可以去zookeeper上查看該分區的副本情況:

  或者直接到kafka Topic數據目錄下查看即可。

消息的存儲:

   首先我們需要了解的是,kafka 是使用日志文件的方式來保存生產者和發送者的消息,每條消息都有一個 offset 值來表示它在分區中的偏移量。Kafka 中存儲的一般都是海量的消息數據,為了避免日志文件過大,Log 並不是直接對應在一個磁盤上的日志文件,而是對應磁盤上的一個目錄,這個目錄的命名規則是<topic_name>_<partition_id>比如創建一個名為 firstTopic 的 topic,其中有 3 個 partition,那么在 kafka 的數據目錄(/tmp/kafka-log,這里可以通過server.properties中的log.dirs=/tmp/kafka-logs去修改)中就有 3 個目錄,firstTopic-0~3多個分區在集群中的分配 如果我們對於一個 topic,在集群中創建多個 partition,那么 partition 是如何分布的呢?

1.將所有 N Broker 和待分配的 i 個 Partition 排序
2.將第 i 個 Partition 分配到第(i mod n)個 Broker 上

  結合前面講的消息分發策略,就應該能明白消息發送到 broker 上,消息會保存到哪個分區中,並且消費端應該消費哪些分區的數據了。

冪等性:

  所謂的冪等,簡單說就是對接口的多次調用所產生的結果和調用一次是一致的。在0.11.0.0版本引入了創建冪等性Producer的功能。僅需要設置props.put(“enable.idempotence”,true),或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。enable.idempotence設置成true后,Producer自動升級成冪等性Producer。Kafka會自動去重。Broker會多保存一些字段。當Producer發送了相同字段值的消息后,Broker能夠自動知曉這些消息已經重復了。作用范圍:

  1. 只能保證單分區上的冪等性,即一個冪等性Producer能夠保證某個主題的一個分區上不出現重復消息。
  2. 只能實現單回話上的冪等性,這里的會話指的是Producer進程的一次運行。當重啟了Producer進程之后,冪等性不保證。

事務型消息:

  Kafka在0.11版本開始提供對事務的支持,提供是read committed隔離級別的事務。保證多條消息原子性地寫入到目標分區,同時也能保證Consumer只能看到事務成功提交的消息。

事務性Producer:

  保證多條消息原子性地寫入到多個分區中。這批消息要么全部成功,要不全部失敗。事務性Producer也不懼進程重啟。設置:

properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//開啟enable.idempotence = true
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-id");//設置Producer端參數 transactional.id

  除此之外,還要加上調用事務API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,分別應對事務的初始化、事務開始、事務提交以及事務終止。如下:

// kafka 事務型消息
producer.initTransactions(); try {   producer.beginTransaction();   producer.send(record1);   producer.send(record2);   producer.commitTransaction(); } catch (KafkaException e) {   producer.abortTransaction(); }

  這段代碼能保證record1和record2被當做一個事務同一提交到Kafka,要么全部成功,要么全部寫入失敗。

Consumer端的設置:

  設置 isolation.level參數,目前有兩個取值:

  1. read_uncommitted:默認值表明Consumer端無論事務型Producer提交事務還是終止事務,其寫入的消息都可以讀取。
  2. read_committed:表明Consumer只會讀取事務型Producer成功提交事務寫入的消息。注意,非事務型Producer寫入的所有消息都能看到。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM