Kafka的工作原理及過程


一、Broker啟動

  當每個 broker 啟動時,會在 ZooKeeper 中的 /brokers/ids 路徑下創建⼀個節點來注冊⾃⼰,節點 ID 為配置⽂件中的 broker.id 參數,后注冊的 broker 會報 NodeExists 的錯。如果不指定 broker.id 或者指定成 -1,節點 ID 會從 reserved.broker.max.id 這個參數加 1的值開始,這個參數默認值是 1000,所以經常可以看⻅ 1001、1002 的 broker ID。

  kafka的ZooKeeper存儲結構如下圖所示:

       

   每個 broker 除了注冊⾃身之外,還會監聽 /brokers/ids 這個節點,當這個節點下增加或刪除⼦節點時,ZooKeeper 會通知監聽了的 broker。每個 broker 創建的節點都是臨時節點,如果 broker 下線, /brokers/ids 下對應的節點就會被刪除。

  注意:broker 下線,只會刪除 /brokers/ids 下的節點,其它的節點中可能還包含這個broker 的 ID,⽐如 /brokers/toics 下的節點會記錄每個分區的副本存儲在哪些 broker 上,這些節點不會刪除這個 broker 的 ID,因為這個 broker 還有可能恢復,如果恢復不了,可以⽤相同的 ID 啟動⼀個新的 broker,新的 broker 會代替原來 broker 的位置,開始同步數據。

二、刪除Topic

  建議 設置 auto.create.topics.enable = false

  建議設置 delete.topic.enable = true

  刪除流程如下圖所示:

      

三、消息的路由策略:

    在通過API方式發布消息時,生產者是以Record為消息進行發布的,Record中包含Key和Value,其中value就是我們真正要使用的消息,而key是用於路由消息要存放的位置的。

    消息要放入到哪個partition並不是隨機的,而是按照以下路由策略進行處理的:

      如果制定了partition,則直接寫入指定的partition

      如果沒有指定partition但是指定了key,則通過key的hash值與partition數量進行取模,取模結果就是partition的索引

      如果partition和key都未指定,則使用輪詢算法選出一個partition

四、消息分區算法

  Kafka中提供了多重分區分配算法(PartitionAssignor)的實現:RangeAssignor、RoundRobinAssignor、StickyAssignor。PartitionAssignor接⼝⽤於⽤戶定義實現分區分配算法,以實現Consumer之間的分區分配。Kafka默認采⽤RangeAssignor的分配算法。

  1、RangeAssignor

    RangeAssignor策略的原理是按照消費者總數和分區總數進⾏整除運算來獲得⼀個跨度,然后將分區按照跨度進⾏平均分配,以保證分區盡可能均勻地分配給所有的消費者。對於每⼀個Topic,RangeAssignor策略會將消費組內所有訂閱這個Topic的消費者按照名稱的字典序排序,然后為每個消費者划分固定的分區范圍,如果不夠平均分配,那么字典序靠前的消費者會被多分配⼀個分區。

    簡單的說,就是平均分配,如果不能平均分配,靠前的分配的多,但是多的和少的相差不會超過一,如果消費者大於分區,那么后面的消費者則分不到partition。

    用一張圖簡單明了的展示一下這種分配策略。

      

   2、RoundRobinAssignor

    RoundRobinAssignor的分配策略是將消費組內訂閱的所有Topic的分區及所有消費者進⾏排序后盡量均衡的分配(RangeAssignor是針對單個Topic的分區進⾏排序分配的)。如果消費組內,消費者訂閱的Topic列表是相同的(每個消費者都訂閱了相同的Topic),那么分配結果是盡量均衡的(消費者之間分配到的分區數的差值不會超過1)。

    如下面的兩張圖片所示,前面的是RangeAssignor,多topic的情況,其仍然不能保證均衡的消費,其只是保證同一個topic盡可能的均衡;后面的是RoundRobinAssignor,其不但保證了單topic的均衡消費,也保證了多topic的均衡消費。

      

  3、StickyAssignor

    盡管RoundRobinAssignor已經在RangeAssignor上做了⼀些優化來更均衡的分配分區,但是在⼀些情況下依舊會產⽣嚴重的分配偏差,從字⾯意義上看,Sticky是“粘性的”,可以理解為分配結果是帶“粘性的”——每⼀次分配變更相對上⼀次分配做最少的變動(上⼀次的結果是有粘性的)。

    其⽬標有兩點:1. 分區的分配盡量的均衡;2. 每⼀次重分配的結果盡量與上⼀次分配結果保持⼀致。

    StickyAssignor的分配結果如下圖所示(增加RoundRobinAssignor分配作為對⽐):

       

     上⾯的例⼦中,Sticky模式原來分配給C0、C2的分區都沒有發⽣變動,且最終C0、C2達到的均衡的⽬的。

   4、⾃定義分區策略

    只需要繼承AbstractPartitionAssignor並復寫其中⽅法即可(當然也可以直接實現PartitionAssignor接⼝),其中有兩個⽅法需要復寫:
//分區分配⽅案的實現
Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription>
subscriptions);
//表示了這個分配策略的唯⼀名稱
String name();

    ⽽name()⽅法則表示了這個分配策略的唯⼀名稱,⽐如之前提到的range,roundrobin和sticky,這個名字會在和GroupCoordinator的通信中返回,通過它consumer leader來確定整個group的分區⽅案(分區策略是由group中的consumer共同投票決定的,誰使⽤的多,就是⽤哪個策略)。

// 指定分區分配策略
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RangeAssignor");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

 

五、消息寫入算法

    

     1、producer向broker集群提交連接請求,其所連接上的任意一個broker都會向其發送broker controller的通信URL,即broker controller主機配置文件中的listeners地址

    2、當producer指定了要生產消息的topic后,其會向broker contriller發送請求,請求當前topic的所有partition leader

    3、broker controller在接收到請求后,會從zk服務器中查找指定topic的所有partition leader返回給producer

    4、producer在接收到partition leader列表后,會根據路由策略找到對應的partition leader,將消息發送該partition leader

     5、leader將消息寫入log,並通知ISR中的followers

    6、ISR中的follower從leader中同步消息后向leader發送ACK消息

    7、leader收到了所有ISR中的follower的ACK后,增加HW,表示消費者可以消費到該位置;如果leader在等待的follower的ACK超時了,發現還有follower沒有發送ACK,則會將這些沒有發送ACK的follower從ISR中剔除,然后再增加HW

六、HW(高水位)

    上面說到leader收到了所有ISR中的follower的ACK后,就會增加HW,這里的HW是高水位的意思,表示consumer可以消費到的最高partition偏移量。

    HW保證了Kafka集群中消息的一致性,確切地說,是在broker集群正常運轉的情況下,保證了partition的follower和leader之間數據的一致性。

    LEO,Log End Offset,日志最后消息的偏移量,消息是被寫入到Kafka的日志文件中的,這是當前最后一個寫入的消息在partition中的偏移量。

    對於新寫入的消息,consumer是不能立刻消費的,leader會等待該消息被所有的ISR中的partition follower同步后才會更新HW,此時消息才能被consumer消費。

    

  1、HW的更新機制

      

    每個broker都存儲着當前partition的HW和LEO,但Partition leader中同時還存儲着所有Partition Follower的副本數據。

    Leader 副本的HW更新原則:取當前leader副本的LEO和所有remote副本的LEO的最⼩值

    Follower副本的HW更新原則:取leader副本發送的HW和⾃身的LEO中的最⼩值

      

     上圖演示了⽣產者寫⼊數據到Kafka的leader副本,然后同步到follower副本的流程。副本同步機制流程:

       (1)⽣產者寫⼊消息到leader副本

      (2)leader副本LEO值更新

      (3)follower副本嘗試拉取消息,發現有消息可以拉取,更新⾃身LEO

      (4)follower副本繼續嘗試拉取消息,這時會更新remote副本LEO,同時會更新leader副本的HW

      (5)完成4步驟后,leader副本會將已更新過的HW發送給所有follower副本

      (6)follower副本接收leader副本HW,更新⾃身的HW

七、HW截斷機制

  1、HW機制的缺陷

    如果partition leader接收到了新的消息,ISR中其他follower正在同步過程中,還未同步完畢leader就掛了,此時就需要選舉新的leader,若沒有HW截斷機制,將會導致partition中Leader與Follower數據不一致。

    當原leader宕機恢復后,將其LEO回退到宕機時的HW,然后再與新的Leader進行數據同步,這種機制稱為HW截斷機制。

    HW機制會引起數據丟失。

  2、HW截斷機制

    (1)follower 故障

      follower 發⽣故障后會被臨時踢出 ISR,待該 follower 恢復后,follower 會讀取本地磁盤 記錄的上次的 HW,並將 log ⽂件⾼於 HW 的部分截取掉,從 HW 開始向 leader 進⾏同步。 等該 follower 的LEO ⼤於等於該 Partition 的 HW,即 follower 追上 leader 之后,就可以重 新加⼊ ISR 了。

    (2)leader 故障

      該機制在leader出現宕機情況然后⼜恢復時,可以防 ⽌ partition leader 與 follower 間出現數據不⼀致。當原 Leader 宕機后⼜恢復時,將其 LEO 回退到其宕機時的 HW,然后再與新的 Leader 進⾏數據同步,這種機制稱為 HW 截斷機制。

    注意:這只能保證副本之間的數據⼀致性,並不能保證數據不丟失或者不重復。

八、消息發送的可靠性機制

    生產者向Kafka發送消息時,可以選擇可靠性級別,通過acks參數的值進行設置

    0:異步發送,生產者向kafka發送消息而不需要kafka反饋成功ack,該方式效率最高,但是可靠性最低,可能會存在數據丟失的問題。

    1:同步發送,默認值。生產者發送消息給kafka,broker的partition leader在收到消息后,會反饋ack,生產者收到后才會在發送消息,如果一直未收到kafka的ack,則生產者會認為消息發送失敗,會重發消息。該種情況,仍然可能存在數據丟失的問題,因為如果partition leader收到消息並返回了ack,但是在同步partition follower時leader發生宕機,此時需要選舉新的leader,即HW截斷機制的發生。

    -1:同步發送,其值等同於all,生產者發送消息給kafka,kafka收到消息后,要等到ISR列表中的所有副本都同步消息完成后,才向生產者發送ack。該模型的可靠性最高,很少出現數據丟失的情況,但是可能出現部分follower重復接收消息的情況(不是重復消息)。

九、消費者消費過程解析

    

    1、consumer向broker集群提交連接請求,其所連接上的任意broker都會向其發送broker controller的通信URL,即broker controller主機配置文件中的listeners。

    2、當consumer指定了要消費的topic后,其會向broker controller發送poll請求

    3、broker controller會為consumer分配一個或這幾個partition leader,並將該partition的當前offset發送給consumer

    4、consumer會按照broker controller分配的partition對其中的消息進行消費

    5、當consumer消費完該條數據后,消費者會向broker發送一個消息已被消費的反饋,即該消息的offset。

    6、當broker接收到consumer的offset后,會將其更新到__consumer_offset中

    7、以上過程一直重復,直到消費者停止請求消息;消費者可以重置offset,從而可以靈活的消費存儲在broker上的消息

十、Partititon Leader選舉范圍

    當leader掛了后,broker controller會從ISR中選一個follower成為新的leader,但是如果所有的follower都掛了怎么辦?可以通過unclean.leader.election.enable的取值來設置leader的選舉范圍。

    false:必須等待ISR列表中由副本活過來才進行新的選舉,該策略可靠性有保證,但是可用性低

    true:在ISR中沒有副本存活的情況下,可以選擇任何一個該topic的partition作為新的leader,該策略可用性高,但是可靠性沒有保證,可能會引發大量的消息丟失。

十一、重復消費問題即解決方案

    重復消費最常見的有兩種情況:同一個consumer的重復消費和不同consumer的重復消費

    同一個consumer的重復消費:

      當consumer由於消費能力較低而引發消費超時時,則可能會引發重復消費。

      解決方案:可以減少讀取的消息個數,也可以演唱自動提交的時間,還可以將自動提交轉變為手動提交。

    不同consumer的重復消費:

      當consumer消費了消息但還未提交offset時宕機,則這些已被消費過的消息會被重復消費。

十二、位移重放

  消費者在消費消息時,僅僅是從磁盤⽂件上讀取數據⽽已,是只讀的操作,因此消費者不會刪除消息數據。同時,由於位移數據是由消費者控制的,因此它能夠很容易地修改位移的值,實現重復消費歷史數據的功能。

  1、auto.offset.reset

    kafka中沒有offset時,不論是什么原因,offset沒了,這時auto.offset.reset配置就會起作⽤,auto.offset.reset值含義解釋:

      earliest:當各分區下有已提交的offset時,從提交的offset開始消費;⽆提交的offset時,從頭開始消費

      latest:當各分區下有已提交的offset時,從提交的offset開始消費;⽆提交的offset時,消費新產⽣的該分區下的數據

      none:topic各分區都存在已提交的offset時,從offset后開始消費;只要有⼀個分區不存在已提交的offset,則拋出異常

    默認建議⽤earliest。設置該參數后 kafka出錯后重啟,找到未消費的offset可以繼續消費。

  2、更新OffsetTopic的作⽤域

    --all-topics :為consumer group下所有topic的所有分區調整位移

    --topic t1 --topic t2 :為指定的若⼲個topic的所有分區調整位移

    --topic t1:0,1,2 :為指定的topic分區調整位移

  3、重置策略

    --to-earliest :Earliest 策略表示將位移調整到主題當前最早位移處。這個最早位移不⼀定就是 0,因為在⽣產環境中,很久遠的消息會被 Kafka ⾃動刪除,所以當前最早位移很可能是⼀個⼤於 0 的值。如果你想要重新消費主題的所有消息,那么可以使⽤ Earliest 策略。

    --to-latest :Latest 策略表示把位移重設成最新末端位移。如果你總共向某個主題發送了 15 條消息,那么最新末端位移就是 15。如果你想跳過所有歷史消息,打算從最新的消息處開始消費的話,可以使⽤ Latest 策略。

    --to-current :Current 策略表示將位移調整成消費者當前提交的最新位移。有時候你可能會碰到這樣的場景:你修改了消費者程序代碼,並重啟了消費者,結果發現代碼有問題,你需要回滾之前的代碼變更,同時也要把位移重設到消費者重啟時的位置,那么,Current 策略就可以幫你實現這個功能。

    --to-offset <offset> :Specified-Offset 策略則是⽐較通⽤的策略,表示消費者把位移值調整到你指定的位移處。這個策略的典型使⽤場景是,消費者程序在處理某條錯誤消息時,你可以⼿動地“跳過”此消息的處理。在實際使⽤過程中,可能會出現 corrupted 消息⽆法被消費的情形,此時消費者程序會拋出異常,⽆法繼續⼯作。⼀旦碰到這個問題,你就可以嘗試使⽤ Specified-Offset 策略來規避。

    --shift-by N : 把位移調整到當前位移 + N處,注意N可以是負數,表示向前移動;如果說 Specified-Offset 策略要求你指定位移的絕對數值的話,那么 Shift-By-N 策略指定的就是位移的相對數值,即你給出要跳過的⼀段消息的距離即可。這⾥的“跳”是雙向的,你既可以向前“跳”,也可以向后“跳”。⽐如,你想把位移重設成當前位移的前 100 條位移處,此時你需要指定N 為 -100。

    --to-datetime <datetime> :把位移調整到⼤於給定時間的最早位移處,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,⽐如2017-08-04T00:00:00.000;DateTime 允許你指定⼀個時間,然后將位移重置到該時間之后的最早位移處。常⻅的使⽤場景是,你想重新消費昨天的數據,那么你可以使⽤該策略重設位移到昨天 0點。

    --by-duration <duration> :把位移調整到距離當前時間指定間隔的位移處,duration格式是PnDTnHnMnS,⽐如PT0H5M0S。Duration 策略則是指給定相對的時間間隔,然后將位移調整到距離當前給定時間間隔的位移處,具體格式是 PnDTnHnMnS。如果你熟悉 Java 8 引⼊的 Duration 類的話,你應該不會對這個格式感到陌⽣。它就是⼀個符合 ISO-8601 規范的 Duration 格式,以字⺟ P 開頭,后⾯由 4 部分組成,即 D、H、M 和 S,分別表示天、⼩時、分鍾和秒。舉個例⼦,如果你想將位移調回到 15分鍾前,那么你就可以指定 PT0H15M0S

    --from-file <file> :從CSV⽂件中讀取調整策略

  4、確定執⾏⽅案

    什么參數都不加:只是打印出位移調整⽅案,不具體執⾏

    --execute :執⾏真正的位移調整

    --export :把位移調整⽅案按照CSV格式打印,⽅便⽤戶成csv⽂件,供后續直接使⽤

  5、注意事項

    (1)如果kafka 開啟了認證、授權的操作,需要配置賦予了相應權限的⽤戶。

    (2)需要制定對應的Consumer Group 的id,重置的是Consumer Group 的位移

    (3)consumer group狀態必須是inactive的,即不能是處於正在⼯作中的狀態

    (4)要關閉kakfa的⾃動位移提交功能

  6、API

    實現消息重放需要用到KafkaConsumer的seek方法或者seekToEnd方法。

      

    (1)earliest實現

        首先創建的消費者程序,要禁⽌⾃動提交位移

        然后組 ID 要設置成要重設的消費者組的組 ID

        其次調⽤ seekToBeginning ⽅法時,需要⼀次性構造主題的所有分區對象。

        最重要的是,⼀定要調⽤帶⻓整型的 poll ⽅法,⽽不要調⽤consumer.poll(Duration.ofSecond(0))。

      代碼樣例如下所示:

consumer.subscribe(Collections.singleton(topic));
consumer.poll(0);
consumer.seekToBeginning(
consumer.partitionsFor(topic).stream().map(partitionInfo ->
  new TopicPartition(topic, partitionInfo.partition()))
  .collect(Collectors.toList())
);

    (2)latest實現

consumer.seekToEnd(
consumer.partitionsFor(topic).stream().map(partitionInfo ->
  new TopicPartition(topic, partitionInfo.partition()))
  .collect(Collectors.toList())
);

    (3)Current實現

      實現 Current 策略的⽅法很簡單,需要借助 KafkaConsumer 的 committed ⽅法來獲取當前提交的最新位移

consumer.partitionsFor(topic).stream().map(info ->
  new TopicPartition(topic, info.partition()))
    .forEach(tp -> {
      long committedOffset = consumer.committed(tp).offset();
      consumer.seek(tp, committedOffset);
});

     (4)Specified-Offset實現

long targetOffset = 1234L;
for (PartitionInfo info : consumer.partitionsFor(topic)) {
  TopicPartition tp = new TopicPartition(topic, info.partition());
  consumer.seek(tp, targetOffset);
}

     (5)Shift-By-N 實現

for (PartitionInfo info : consumer.partitionsFor(topic)) {
  TopicPartition tp = new TopicPartition(topic, info.partition());
  // 假設向前跳 123 條消息
  long targetOffset = consumer.committed(tp).offset() + 123L;
  consumer.seek(tp, targetOffset);
}

    (6)datatime 實現

      如果要實現 DateTime 策略,需要借助另⼀個⽅法:KafkaConsumer. offsetsForTimes ⽅法。假設要重設位移到 2020 年 7 ⽉ 20 ⽇晚上 8 點,那么具體代碼如下:

long ts = LocalDateTime.of(2020, 7, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
Map<TopicPartition, Long> timeToSearch =consumer.partitionsFor(topic).stream().map(info ->
  new TopicPartition(topic, info.partition()))
  .collect(Collectors.toMap(Function.identity(), tp -> ts)
);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {   consumer.seek(entry.getKey(), entry.getValue().offset()); }

    (7)Duration 實現

      位移回調30分鍾前:

Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream()
   .map(info -> new TopicPartition(topic, info.partition()))
   .collect(Collectors.toMap(Function.identity(), tp ->
    System.currentTimeMillis() - 30 * 1000 * 60)
);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {   consumer.seek(entry.getKey(), entry.getValue().offset()); }

    總之,使⽤ Java API 的⽅式來實現重設策略的主要⼊⼝⽅法,就是 seek ⽅法。

  7、kafka-consumer-groups

    位移重設還有另⼀個重要的途徑:通過 kafka-consumer-groups 腳本。需要注意的是,這個功能是在Kafka 0.11 版本中新引⼊的。這就是說,如果你使⽤的 Kafka 是 0.11 版本之前的,那么你只能使⽤API 的⽅式來重設位移。

    (1)Earliest 

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --all-topics --to-earliest –execute

     (2)Latest

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --all-topics --to-latest --execute

     (3)Current

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --all-topics --to-current --execute

     (4)Specified-Offset

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --all-topics --to-offset <offset> --execute

     (5)Shift-By-N

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --shift-by <offset_N> --execute

     (6)DateTime

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute

    (7)Duration

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --by-duration PT0H30M0S --execute

 十三、消息壓縮

  壓縮就是⽤時間去換空間的經典 trade-off 思想,具體來說就是⽤ CPU 時間去換磁盤空間或⽹絡I/O 傳輸量,希望以較⼩的 CPU 開銷帶來更少的磁盤占⽤或更少的⽹絡 I/O 傳輸。

  消息格式有兩⼤類消息格式,分別稱之為V1版本和V2版本。V2版本是Kafka 0.11.0.0中正式引⼊的,V2版本都⽐V1版本節省磁盤空間,當啟⽤壓縮時,這種節省空間的效果更加明顯。

  在kafka中Producer 端壓縮、Broker 端保持、Consumer 端解壓縮。

  ⽣產者程序中配置compression.type參數即表示啟⽤指定類型的壓縮算法。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// 開啟GZIP壓縮
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);

  這⾥⽐較關鍵的代碼⾏是props.put(“compression.type”, “gzip”),它表明該Producer的壓縮算法使⽤的是GZIP。這樣Producer啟動后⽣產的每個消息集合都是經GZIP壓縮過的,故⽽能很好地節省⽹絡傳輸帶寬以及Kafka Broker端的磁盤占⽤。在⽣產者端啟⽤壓縮是很⾃然的想法,那為什么說在Broker端也可能進⾏壓縮呢?兩種例外情況就可能讓Broker重新壓縮消息:

    (1)Broker端指定了和Producer端不同的壓縮算法

    (2)Broker端發⽣了消息格式轉換

  各種壓縮算法對⽐

    吞吐量對比:LZ4 > Snappy > zstd / GZIP
    壓縮⽐對比:zstd > LZ4 > GZIP > Snappy

十四、⽂件存儲機制

  消息在磁盤上都是以⽇志的形式保存的,我們這⾥說的⽇志是存放在(config/server.properties:log.dirs=/Users/hadoop/kafka-logs)⽬錄中的消息⽇志,即partition與segment。

  1、存儲信息查看  

    首先創建一個3個分區、3個備份的topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replicationfactor 3 --partitions 3 --topic two

    可以查看一下其存儲內容

    (1)/brokers/ids⽬錄

    (2)每個id的數據內容為當前主機的信息

    (3)/brokers/topics

      

  2、存儲結構

    

  3、segment

    log⽂件⼤⼩可以通過log.segment.bytes參數設定,默認值是1073741824。

    index⽂件的的格式如下圖所示,每個索引項占⽤8個字節,分為兩個部分:

      1) relativeOffset:相對偏移量,表示消息相對於baseOffset的偏移量,占⽤4個字節,當前索引⽂件的⽂件名即為baseOffset的值。

      2) position:物理地址,也就是消息在⽇志分段⽂件中的物理位置,占⽤4個字節

    示例: 查找偏移量為23的信息

      

    以上是最簡單的⼀種情況。如果要查找偏移量為268的消息,那么應該怎么辦呢?⾸先是定位到baseOffset為251的⽇志分段,然后計算相對偏移量relavtiveOffset=268-251=17,之后再在對應的索引⽂件中知道不⼤於17的索引項,最后根據索引項中的postition定位到具體的⽇志分段⽂件位置開始查找消息。

     index⽂件中並沒有為數據⽂件中的每條消息都建⽴索引,⽽是采⽤了稀疏存儲的⽅式(跳躍表),每隔⼀定字節的數據建⽴⼀條索引。 這樣避免了索引⽂件占⽤過多的空間,從⽽可以將索引⽂件保留在內存中。 

  4、mesage

    

    解釋說明:

      8 byte offset:在parition(分區)內的每條消息都有⼀個有序的id號,這個id號被稱為偏移(offset),它可以唯⼀確定每條消息在parition(分區)內的位置。即offset表示partiion的第多少message。

      4 byte message size:message⼤⼩

      4 byte CRC32:⽤crc32校驗message

      1 byte “magic":表示本次發布Kafka服務程序協議版本號

      1 byte “attributes":表示為獨⽴版本、或標識壓縮類型、或編碼類型。

      4 byte key length:表示key的⻓度,當key為-1時,K byte key字段不填

      K byte key:可選value

      4 bytes payload:表示實際消息數據。

  5、⾼效⽂件存儲設計特點

    Kafka把topic中⼀個parition⼤⽂件分成多個⼩⽂件段,通過多個⼩⽂件段,就容易定期清除或刪除已經消費完⽂件,減少磁盤占⽤。

    通過索引信息可以快速定位message

    通過index元數據全部映射到memory,可以避免segment file的IO磁盤操作。

    通過索引⽂件稀疏存儲,可以⼤幅降低index⽂件元數據占⽤空間⼤⼩。

    順序寫: 操作系統每次從磁盤讀寫數據的時候,需要先尋址,也就是先要找到數據在磁盤上的物理位置,然后再進⾏數據讀寫,如果是機械硬盤,尋址就需要較⻓的時間。kafka的設計中,數據其實是存儲在磁盤上⾯,⼀般來說,會把數據存儲在內存上⾯性能才會好。但是kafka⽤的是順序寫,追加數據是追加到末尾,磁盤順序寫的性能極⾼,在磁盤個數⼀定,轉數達到⼀定的情況下,基本和內存速度⼀致隨機寫的話是在⽂件的某個位置修改數據,性能會較低。

    零拷⻉

十五、日志清理策略

  kafka log的清理策略有兩種:delete,compact,默認是delete,這個對應了kafka中每個topic對於record的管理模式:

    delete:⼀般是使⽤按照時間保留的策略,當不活躍的segment的時間戳是⼤於設置的時間的時候,當前segment就會被刪除

    compact: ⽇志不會被刪除,會被去重清理,這種模式要求每個record都必須有key,然后kafka會按照⼀定的時機清理segment中的key,對於同⼀個key只保留最新的那個key.同樣的,compact也只針對不活躍的segment

cleanup.policy: delete
cleanup.policy: compact

   1、delete 相關配置

    假如對某個topic(假設為user_topic)設置了 cleanup.policy: delete,那么當前topic使⽤的log刪除策略就是 delete,這個策略會周期性的檢查partion中的不活躍的segment,根據配置采⽤兩種⽅式刪除⼀些舊的segment。

      retention.bytes:總的segment的⼤⼩限制,達到這個限制后會刪除舊的segment,默認值為-1,就是不會刪除

      retention.ms:當前時間 - segment的最后寫⼊record的時間 > retention.ms 的segment會被刪除,默認是168h, 7天

    ⼀些其他的輔助性配置:

      log.retention.check.interval.ms: 每隔多久檢查⼀次是否有可以刪除的log,默認是300s,5分鍾這個是broker級別的設置

      file.delete.delay.ms: 在徹底刪除⽂件前保留的時間,默認為1分鍾 這個是broker級別的設置

    在delete的⽇志策略下,⽇志保留3天。

retention.ms: 259200000

  2、⽇志清理compact策略

    使⽤場景:⽇志清理的compact策略,對於那種需要留存⼀份全量數據的需求⽐較有⽤,什么意思呢,⽐如,我⽤flink計算了所有⽤戶的粉絲數,⽽且每5分鍾更新⼀次,結果都存儲到kafka當中。這個時候kafka相當於是⼀個數據總線,任何需要⽤戶粉絲數的業務部⻔都可以從kafka中拿到這個數據。這個時候如果數據的保存使⽤delete策略,為了保存所有⽤戶的粉絲數,只能設置不刪除,也就是這樣的話,數據會⽆限膨脹,⽽且,很多數據是⽆意義的,因為業務⽅從kafka中消費數據的時候,實際上只是想知道⽤戶的當前粉絲數是多少,不關注⼀個⽉前這個⽤戶有多少粉絲數,但是這些數據都在kafka中存儲,會造成⽆意義的消費。kafka提供了⼀種叫做compact的清理策略,這個策略可以很好的幫助我們應對這種情況。kafka的compact 策略要求每個record都要有key,kafka是根據key來進⾏去重合並的。每個key⾄少保留⼀個最新的值。

    compact的⼯作模式:對於每⼀個kafka partition的⽇志,以segment為單位,都會被分為兩部分,已清理和未清理的部分。同時,未清理的那部分⼜分為可以清理的和不可清理的。同時對於清理過后的segment如果太⼩,kafka也會有⼀定的策略去合並這些segemnt,防⽌

segment碎⽚化。

    通過配置cleanup.policy: compact來開啟compact的⽇志清理策略。配套的配置還有:

      min.cleanable.dirty.ratio: 可以進⾏compact的臟數據的⽐例,dirtyRatio = dirtyBytes /(cleanBytes + dirtyBytes) 其中dirtyBytes表示可清理部分的⽇志⼤⼩,cleanBytes表示已清理部分的⽇志⼤⼩。這個配置也是為了提升清理的性價⽐設置的,因為清理數據需要對磁盤進⾏讀寫,開銷並不⼩,如果你的數據只有很⼩的重復⽐例,實際上是沒有清理的必要的。這個值默認是0.5 也就是臟了的數據達到了總數據的50%才會清理,⼀般情況下我如果開啟了compact策略,都會將這個值設置為0.1,感覺這樣對於想要消費當前topic的業務⽅更加友好。

      min.compaction.lag.ms: 這個設置了在⼀條消息在被produer發送到kafka當中之后,多久時間以內不會被compact,為了滿⾜有些想要獲取⼀定時間內的歷史快照的業務,默認是0,就是不會根據消息投遞的時間來決定消息是否應該被compacted

    tombstone 消息:

      在compact下,還有⼀類⽐較特殊的消息,只有key,value值為null的消息,這⼀類消息如果合並了實際上也是沒有意義的,因為沒有值,所以kafka在compact的時候會刪除value為null的消息,但是並不是在第⼀次去重的時候⽴刻刪除,⽽是允許存儲的更久⼀些。有⼀個特殊的配置來處理。

      delete.retention.ms: 這個配置就是專⻔針對tombstone類型的消息進⾏設置的。默認為24⼩時,也就是這個tombstone在當次compact完成后並不會被清理,在下次compact的時候,他的最后修改時間+delete.retention.ms<當前時間,才會被刪掉。

    簡單總結compact的配置(kafka啟⽤delete的清理策略的時候需要注意配置)

      cleanup.policy: compact

      segment.bytes: 每個segment的⼤⼩,達到這個⼤⼩會產⽣新的segment, 默認是1G

      segment.ms: 配置每隔 ms產⽣⼀個新的segment,默認是168h,也就是7天

      retention.bytes: 總的segment的⼤⼩限制,達到這個限制后會刪除舊的segment,默認值為-1,就是不會刪除

      retention.ms: segment的最后寫⼊record的時間-當前時間 > retention.ms 的segment會被刪除,默認是168h, 7天

      min.cleanable.dirty.ratio: 臟數據可以容忍的⽐例,如果你的機器性能可以,⽽且數據量較⼤的話,建議這個值設置更⼩⼀些,對consumer更友好

      min.compaction.lag.ms: 看業務有需要的話可以設置

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM