Kafka學習之內核原理剖析


一、Producer原理分析

  1、Producer運行流程

  整個生產者客戶端由兩個線程協調運行,這兩個線程分別為主線程和 Sender 線程 (發送線程)。在主線程中由 KafkaProducer 創建消息,然后通過可能的攔截器、序列化器和分區器的作用之后緩存到消息累加器( RecordAccumulator,也稱為消息收集器〉中。 Sender 線程負責從RecordAccumulator 中獲取消息並將其發送到 Kafka 中 。

    

  • ProducerRecord: 消息對象。
  • Interceptor: 攔截器,Kafka一共有兩種攔截器:生產者攔截器和消費者攔截器。生產者攔截器既可以用來在消息發送前做一些准備工作,比如按照某個規則過濾不符合要求的消息、修改消息的內容等,也可以用來在發送回調邏輯前做一些定制化的需求,比如統計類工作。
  • Serializer: 序列化器,把對象轉換成字節數組才能通過網絡發送給Kafka。
  • Partitioner: 分區器,消息經過序列化之后就需要確定它發往的分區,如果消息ProducerRecord中指定了partition字段,那么就不需要分區器的作用,因為partition代表的就是所要發往的分區號。如果消息ProducerRecord中沒有指定partition字段,那么就需要依賴分區器,根據key這個字段來計算partition的值。分區器的作用就是為消息分配分區。默認分區器:DefaultPartitioner,在partition()方法中定義了主要的分區分配邏輯。如果key不為null,那么默認的分區器會對key進行哈希(采用MurmurHash2算法,具備高運算性能及低碰撞率),最終根據得到的哈希值取模(所有分區)來計算分區號,擁有相同key的消息會被寫入同一個分區。如果key為null,那么消息將會以輪詢的方式發往主題內的各個可用分區。
  • RecordAccumulator: 主要用來緩存消息以便Sender線程可以批量發送,進而減少網絡傳輸的資源消耗以提升性能。RecordAccumulator緩存的大小可以通過生產者客戶端參數buffer.memory配置,默認值為33554432B,即32M。如果生產者發送消息的速度超過發送到服務器的速度,則會導致生產者空間不足,這個時候KafkaProducer的send()方法調用要么被阻塞,要么拋出異常,這個取決於參數max.block.ms的配置,此參數的默認值為60000,即60秒。主線程中發送過來的消息都會被迫加到RecordAccumulator的某個雙端隊列(Deque)中,在RecordAccumulator的內部為每個分區都維護了一個雙端隊列,隊列中的內容就是ProducerBatch,即Deque<ProducerBatch>。消息寫入緩存時,追加到雙端隊列的尾部:Sender讀取消息時,從雙端隊列的頭部讀取。
  • InFlightRequests: 請求在從Sender線程發往Kafka之前還會保存到InFlightRequests中,InFlightRequests保存對象的具體形式為Map<Nodeld,Deque>,它的主要作用是緩存了已經發出去但還沒有收到響應的請求(Nodeld表示broker節點的id編號)。InFlightRequests還提供了許多管理類的方法,並且通過配置參數還可以限制每個連接(也就是客戶端與Node之間的連接)最多緩存的請求數。這個配置參數為max.in.flight.requests.per.connection,默認值為5,即每個連接最多只能緩存5個未響應的請求,超過該數值之后就不能再向這個連接發送更多的請求了,除非有緩存的請求收到了響應。InFlightRequests還可以獲得leastLoadedNode,即所有Node中負載最小的那一個。選擇leastLoadedNode發送請求可以使它能夠盡快發出,避免因網絡擁塞等異常而影響整體的進度。比如元數據請求,當需要更新元數據時,會先挑選出leastLoadedNode,然后向這個Node發送MetadataRequest請求來獲取具體的元數據信息。

          

  2、ACK機制

  為保證生產者發送的數據,能可靠的發送到指定的topic,topic的每個partition收到生產者發送的數據后,都需要向生產者發送ack(確認收到),如果生產者收到ack,就會進行下一輪的發送,否則重新發送數據。ACK參數可設置的值為01all

  • 0:代表producer往集群發送數據不需要等到集群的返回,不確保消息發送成功。安全性最低但是效率最高。
  • 1:代表producer往集群發送數據只要leader應答就可以發送下一條,只確保leader發送成功。
  • all:代表producer往集群發送數據需要所有的follower都完成從leader的同步才會發送下一條,確保leader發送成功和所有的副本都完成備份。安全性最高,但是效率最低。

  當ACK=all時,Leader和follower(ISR)落盤才會返回ack,會有數據重復現象,如果在leader已經寫完成,且follower同步完成,但是在返回ack的出現故障,則會出現數據重復現象;極限情況下,這個也會有數據丟失的情況,比如follower和leader通信都很慢,所以ISR中只有一個leader節點,這個時候,leader完成落盤,就會返回ack,如果此時leader故障后,就會導致丟失數據。

二、Consumer原理分析

  消費者( Consumer )負責訂閱 Kafka 中的主題( Topic ),並且從訂閱的主題上拉取消息。 在 Kafka 的消費理念中還有一層消費組( Consumer Group)的概念,每個消費者都有一個對應的消費組。當消息發布到主題后,只會被投遞給訂閱它的每個消費組中的一個消費者 。 每個消費者只能消費所分配到的分區中的消息。換言之,每一個分區只能被一個消費組中的一個消費者所消費 。

    

  1、分區分配策略

  Kafka提供了消費者客戶端參數partition.assignment.strategy來設置消費者與訂閱主題之間的分區分配策略。默認情況下,采用 Range Assignor 分配策略。 Kafka 還提供了另外兩種分配策略: RoundRobinAssignor 和 StickyAssignor 。

  • RangeAssignor: RangeAssignor 分配策略的原理是按照消費者總數和分區總數進行整除運算來獲得一個跨度,然后將分區按照跨度進行平均分配,以保證分區盡可能均勻地分配給所有的消費者 。
  • RoundRobinAssignor: RoundRobinAssignor 分配策略的原理是將消費組內所有消費者及消費者訂閱的所有主題的分區按照字典序排序,然后通過輪詢方式逐個將分區依次分配給每個消費者。
  • StickyAssignor: Kafka從 0.11.x 版本開始引入這種分配策略,它主要有兩個目的 : 分區的分配要盡可能均勻;分區的分配盡可能與上次分配的保持相同。

  2、消息消費

  Kafka中的消費是基於拉模式的,拉模式是消費者主動向服務端發起請求來拉取消息。 Kafka 中的消息消費是一個不斷輪詢的過程,消費者所要做的就是重復地調用 poll()方法 ,而 poll()方法返回的是所訂閱的主題(分區)上的一組消息。

/**
* @see KafkaConsumer#poll(long)
*/
public ConsumerRecords<K, V> poll(long timeout);

  timeout 的設置取決於應用程序對響應速度的要求,比如需要在多長時間內將控制權移交給執行輪詢的應用線程。可以直接將 timeout 設置為 0 , 這樣 poll()方法會立刻返回,而不管是否己經拉取到了消息。如果應用線程唯一的工作就是從 Kafka 中拉取並消費消息,則可以將這個參數設置為最大值 Long.MAX_VALUE 。

  3、位移提交

  對於 Kafka 中的分區而言,它的每條消息都有唯一的 offset,用來表示消息在分區中對應的位置 。 對於消費者而言 ,它也有一個 offset 的概念,消費者使用 offset 來表示消費到分區中某個消息所在的位 置。

  在每次調用 poll()方法時,它返回的是還沒有被消費過的消息集, 要做到這一點,就需要記錄上一次消費時的消費位移 。 消費位移存儲在 Kafka 內 部的主題 consumer offsets 中 。 這里把將消費位移存儲起來(持久化)的動作稱為提交 ,消費者在消費完消息之后需要執行消費位移的提交。

     

  在Kafka中默認的消費位移的提交方式是自動提交,這個由消費者客戶端參數enable.auto.commit配置,默認值為true。當然這個默認的自動提交不是每消費一條消息就提交一次,而是定期提交,這個定期的周期時間由戶端參數auto.commit.interval.ms配置,默認值為5秒,此參數生效的前提是enable.auto.commit參數為true。

  每個consumer會定期將自己消費分區的offset提交給kafka內部topic: consumer_offsets,提交過去的時候,key是consumerGroupId+topic+分區號,value就是當前offset的值,kafka會定期清理topic里的消息,最后就保留最新的那條數據。因為consumer_offsets可能會接收高並發的請求,kafka默認給其分配50個分區(可以通過offsets.topic.num.partitions設置),這樣可以通過加機器的方式擴大並發。

  4、Rebalance機制

  rebalance是指分區的所屬權從一個消費者轉移到另一消費者的行為,它為消費組具備高可用性和伸縮性提供保障。rebalance發生期間,消費組內的消費者是無法讀取消息的。當一個分區被重新分配給另一個消費者時,消費者當前的狀態也會丟失。

  如下情況可能會觸發消費者rebalance

  • consumer所在服務器重啟或宕機了;
  • 動態給topic增加了分區;
  • 消費組訂閱了更多的topic。
public interface ConsumerRebalanceListener{
  //rebalance開始之前和消費者停止讀取消息之后被調用
  // 可以通過這個回調方法
處理消費位移的提交,以此來避免一些不必要的重復消費現象的發生。   // 參數 partitions表示rebalance前所分配到的分區。   void onPartitionsRevoked(Collection<TopicPartition> partitions);   
  //
重新分配分區之后和消費者開始讀取消費之前被調用,參數partitions表示再均衡后所分配到的分區。   void onPartitionsAssigned(Collection<TopicPartition> partitions); }

  5、消費者協調器和組協調器

  每個消費組的子集在服務端對應一個GroupCoordinator對其進行管理,GroupCoordinator是Kafka服務端中用於管理消費組的組件。而消費者客戶端中的ConsumerCoordinator組件負責與GroupCoordinator進行交互。ConsumerCoordinator與GroupCoordinator之間最重要的職責就是負責執行消費者再均衡的操作,包括前面提及的分區分配的工作也是在rebalance期間完成的。一共有如下幾種情形會觸發再均衡的操作:

  • 有新的消費者加入消費組。
  • 有消費者宕機下線。消費者並不一定需要真正下線,例如遇到長時間的GC、網絡延遲導致消費者長時間未向GroupCoordinator發送心跳等情況時,GroupCoordinator會認為消費者己經下線。
  • 有消費者主動退出消費組(發送LeaveGroupRequest請求)。比如客戶端調用了unsubscrible()方法取消對某些主題的訂閱。
  • 消費組所對應的GroupCoorinator節點發生了變更。
  • 消費組內所訂閱的任一主題或者主題的分區數量發生變化。

  當有消費者加入消費組時,消費者、消費組及組協調器之間會經歷一下幾個階段。整個rebalance過程如下:

  第一階段:選擇組協調器(FINDCOORDINATOR)

  組協調器GroupCoordinator: 每個consumergroup都會選擇一個broker作為自己的組協調器coordinator,負責監控這個消費組里的所有消費者的心跳,以及判斷是否宕機,然后開啟消費者rebalance。consumergroup中的每個consumer啟動時會向kafka集群中的某個節點(負載最小的節點:InFlightRequests有記錄)發送FindCoordinatorRequest請求來查找對應的組協調器GroupCoordinator,並跟其建立網絡連接。

  組協調器選擇方式: 通過如下公式可以選出consumer消費的offset要提交到consumer_offsets的哪個分區,這個分區leader對應的broker就是這個consumergroup的coordinator公式: hash(consumergroupid)%consumer_offsets 主題的分區數

  第二階段:加入消費組(JOINGROUP)

  在成功找到消費組所對應的GroupCoordinator之后就進入加入消費組的階段,在此階段的消費者會向GroupCoordinator發送JoinGroupRequest請求,並處理響應。然后GroupCoordinator從一個consumergroup中選擇第一個加入group的consumer作為leader(消費組協調器),把consumergroup情況發送給這個leader,接着這個leader會負責制定分區方案。

  選舉分區分配策略,根據各個消費者呈報的分配策略選舉,過程如下:

  • 收集各個消費者支持的所有分配策略,組成候選集candidates。
  • 每個消費者從候選集candidates中找出第一個自身支持的策略,為這個策略投上一票。
  • 計算候選集中各個策略的選票數,選票數最多的策略即為當前消費組的分配策略。

  每個消費者都向GroupCoordinator發送JoinGroupRequest請求,其中攜帶了各自提案的分配策略和訂閱信息。JoinGroupResonse回執中包含GroupCoordinator中投票選舉出的分配策略的信息,並且只有leader消費者的回執中包含每個消費者的訂閱信息。

  第三階段:同步消費組(SYNCGROUP)

  consumerleader通過給GroupCoordinator發送SyncGroupRequest,接着GroupCoordinator就把分區方案下發給各個consumer,他們會根據指定分區的leaderbroker進行網絡連接以及消息消費。

  6、多線程

  KafkaConsumer是非線程安全的,KafkaConsumer中定義了一個acquire()方法,用來檢測當前是否只有一個線程在操作,若有其他線程正在操作則會拋出ConcurrentModifcationException異常。

三、Broker原理分析

  1、分區管理

  優先副本的選舉:Kafka集群的一個broker中最多只能有它的一個副本。分區使用多副本機制來提升可靠性,但只有leader副本對外提供讀寫服務,而follower副本只負責在內部進行消息的同步。如果一個分區的leader副本不可用,那么就意味着整個分區變得不可用,此時就需要Kafka從剩余的follower副本中挑選一個新的leader副本來繼續對外提供服務。為了能夠有效地治理負載失衡的情況,Kafka引入了優先副本(preferredreplica)的概念。所謂的優先副本是指在AR集合列表中的第一個副本。優先副本的選舉是指通過一定的方式促使優先副本選舉為leader副本,以此來促進集群的負載均衡,這一行為也可以稱為“分區平衡”。

      

bin/kafka-topics.sh --describe --zookeeper localhost:2181 m --topic my-replicated-topic2  //查看分區信息

  結果如下所示(包含了部分Broker下線以及恢復的過程):

HoudeMacBook-Pro:kafka_2.12-2.6.0 houjing$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic //查看分區信息
Topic: my-replicated-topic    PartitionCount: 3    ReplicationFactor: 3    Configs: 
    Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: my-replicated-topic    Partition: 1    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: my-replicated-topic    Partition: 2    Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
HoudeMacBook-Pro:kafka_2.12-2.6.0 houjing$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic //kill Broker1后分區信息
Topic: my-replicated-topic    PartitionCount: 3    ReplicationFactor: 3    Configs: 
    Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,0
    Topic: my-replicated-topic    Partition: 1    Leader: 0    Replicas: 0,1,2    Isr: 0,2
    Topic: my-replicated-topic    Partition: 2    Leader: 2 Replicas: 1,2,0 Isr: 2,0
HoudeMacBook-Pro:kafka_2.12-2.6.0 houjing$ bin/kafka-server-start.sh -daemon config/server2.properties   //重啟broker1
HoudeMacBook-Pro:kafka_2.12-2.6.0 houjing$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic //查看分區信息
Topic: my-replicated-topic    PartitionCount: 3    ReplicationFactor: 3    Configs: 
    Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: my-replicated-topic    Partition: 1    Leader: 0    Replicas: 0,1,2    Isr: 0,2,1
    Topic: my-replicated-topic    Partition: 2    Leader: 2 Replicas: 1,2,0 Isr: 2,0,1

  在Kafka中可以提供分區自動平衡的功能,與此對應的broker端參數是auto.leader.rebalance.enable,此參數的默認值為true,即默認情況下此功能是開啟的。如果開啟分區自動平衡的功能,則Kafka的控制器會啟動一個定時任務,這個定時任務會輪詢所有的broker節點,計算每個broker節點的分區不平衡率 (broker中的不平衡率=非優先副本的leader個數/分區總數)是否超過leader.imbalance.per.broker.percentage參數配置的比值,默認值為10%,如果超過設定的比值則會自動執行優先副本的選舉動作以求分區平衡。執行周期由參數leader.imbalance.check.interval.seconds控制,默認值為300秒。Kafka中kafka-perferred-replica-election.sh腳本提供了對分區leader副本進行重新平衡的功能。

  • 分區重分配:Kafka提供了kafka-reassign-partitions.sh腳本來執行分區重分配的工作,它可以在集群擴容、broker節點失效的場景下對分區進行遷移。
  • 失效副本:正常情況下,分區的所有副本都處於ISR集合,但是難免會有異常發生的情況,從而某些副本被剝離出ISR集合。當follower副本將leader副本LEO(Log End Offset)之前的日志全部同步時,則認為該fillower副本已經追趕上leader副本,此時更新該副本的lastAoughtUpTimeMs標識,Kafka的副本管理器會啟動一個副本過期檢測的定時任務,而這個定時任務會定時檢查當前時間與副本的lastAoughtUpTimeMs差值是否大於參數replica.time.max.ms指定的值,默認10000。

  2、控制器Controller

  在Kafka集群中會有一個或者多個broker,其中有一個broker會被選舉為控制器(KafkaController),它負責管理整個集群中所有分區和副本的狀態。

  • 當某個分區的leader副本出現故障時,由控制器負責為該分區選舉新的leader副本。
  • 當檢測到某個分區的ISR集合發生變化時,由控制器負責通知所有broker更新其元數據信息。
  • 當使用kafka-topics.sh腳本為某個topic增加分區數量時,同樣還是由控制器負責分區的重新分配。

  Kafka中的控制器選舉的工作依賴於Zookeeper,成功競選為控制器的broker會在Zookeeper中創建/controller這個臨時(EPHEMERAL)節點,此臨時節點的內容參考如下:

# brokerid表示稱為控制器的broker的id編號,timestamp表示競選為控制器時的時間戳 
{"version":1,"brokerid":0,"timestamp":"1574831950372"}

  在任意時刻,集群中有且僅有一個控制器。每個broker啟動的時候會去嘗試去讀取/controller節點的brokerid值,如果讀取到brokerid的值不為-1,則表示已經有其它broker節點成功競選為控制器,所以當前broker就會放棄競選:如果Zookeeper中不存在/controller這個節點,或者這個節點中的數據異常,那么就會嘗試去建/controller這個節點,當前broker去創建節點的時候,也有可能其他broker同時去嘗試創建這個節點,只有創建成功的broker才會成為控制器,而創建失敗的broker則表示競選失敗。每個broker都會在內存中保存當前控制器的brokerid值,這個值可以標識為activeControllerId。

  • Partition副本leader的選舉

  分區leader副本的選舉由控制器負責具體實施。當創建分區(創建主題或增加分區都有創建分區的動作)或分區上線(比如分區中原先的leader副本下線,此時分區需要選舉一個新的leader上線來對外提供服務)的時候都需要執行leader的選舉動作。這種策略的基本思路是按照AR集合中副本的順序查找第一個存活的副本,並且這個副本在JSR集合中。一個分區的AR集合在分配的時候就被指定,並且只要不發生重分配的情況,集合內部副本的順序是保持不變的,而分區的ISR集合中副本的順序可能會改變。

##在zookeeper中查看各broker節點信息
[zk: localhost:2181(CONNECTED) 7] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.1.140.146:9092"]
  ,"jmx_port":-1,"host":"10.1.140.146","timestamp":"1603270074840","port":9092,"version":4} [zk: localhost:2181(CONNECTED) 8] get /brokers/ids/1  ##broker1 被kill后 org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids/1 [zk: localhost:2181(CONNECTED) 9] get /brokers/ids/2 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.1.140.146:9094"]
  ,"jmx_port":-1,"host":"10.1.140.146","timestamp":"1603270074178","port":9094,"version":4}
## 在zookeeper中查看分區信息
[zk: localhost:2181(CONNECTED) 15] get /brokers/topics/my-replicated-topic/partitions/2/state
{"controller_epoch":6,"leader":2,"version":1,"leader_epoch":1,"isr":[2,0,1]}
[zk: localhost:2181(CONNECTED) 16] get /brokers/topics/my-replicated-topic/partitions/1/state
{"controller_epoch":6,"leader":0,"version":1,"leader_epoch":1,"isr":[0,2,1]}
[zk: localhost:2181(CONNECTED) 17] get /brokers/topics/my-replicated-topic/partitions/0/state
{"controller_epoch":6,"leader":2,"version":1,"leader_epoch":1,"isr":[2,0,1]}

  3、LEO與HW高水位

  整個消息追加的過程可以概括如下:

  • 生產者客戶端發送消息至leader副本中。
  • 消息被迫加到leader副本的本地日志,並且會更新日志的偏移量。
  • follower副本向leader副本請求同步數據。
  • leader副本所在的服務器讀取本地日志,並更新對應拉取的follower副本的信息。
  • leader副本所在的服務器將拉取結果返回給follower副本。
  • follower副本收到leader副本返回的拉取結果,將消息追加到本地日志中,並更新日志的偏移量信息。

  在Kafka中,高水位(HighWatermark)的作用主要有2個:

  • 定義消息可見性,即用來標識分區下的哪些消息是可以被消費者消費的。
  • 幫助Kafka完成副本同步。

      

  在分區高水位以下的消息被認為是已提交消息,反之就是未提交消息。消費者只能消費已提交消息,即圖中位移小於15的所有消息(不考慮事務)。位移值等於高水位的消息也屬於未提交消息。

  日志末端位移(LEO:Log End Offset)表示副本寫入下一條消息的位移值。同一個副本對象,其高水位值不會大於LEO值。高水位和LEO是副本對象的兩個重要屬性。Kafka所有副本都有對應的高水位和LEO值,而不僅僅是Leader本。只不過Leader副本比較特殊,Kafka使用Leader副本的高水位來定義所在分區的高水位。換句話說,分區的高水位就是其Leader副本的高水位。

  高水位更新機制

  在Leader副本所在的Broker上,還保存了其他Follower副本的LEO值。

      

  Broker0上保存了某分區的Leader副本和所有Follower副本的LEO值,而Broker1上僅僅保存了該分區的某個Follower副本。Kafka把Broker0上保存的這些Follower副本又稱為遠程副本(RemoteReplica)。Kafka副本機制在運行過程中,會更新Broker1上Follower副本的高水位和LEO值,同時也會更新Broker0上Leader副本的高水位和LEO以及所有遠程副本的LEO,但它不會更新遠程副本的高水位值,也就是圖中標記為灰色的部分。

  Broker0上保存這些遠程副本主要作用是: 幫助Leader副本確定其高水位,也就是分區高水位。

  與Leader副本保持同步的判斷的條件有兩個:

  • 該遠程Follower副本在ISR中。
  • 該遠程Follower副本LEO值落后於Leader副本LEO值的時間,不超過Broker端參數replica.lag.time.max.ms的值。如果使用默認值的話,就是不超過10秒。

  取一個partition對應的ISR中最小的LEO(log-end-offset)作為HW,consumer最多只能消費到HW所在的位置。對於leader新寫入的消息,consumer不能立刻消費,leader會等待該消息被所有ISR中的replicas同步后更HW,此時消息才能被consumer消費。這樣就保證了如果leader所在的broker失效,該消息仍然可以從新選舉的leader中獲取。對於來自內部broker的讀取請求,沒有HW的限制。

  副本同步機制

  當producer生產消息至broker后,ISR以及HW和LEO的流轉過程:

      

  Kafka的復制機制既不是完全的同步復制,也不是單純的異步復制。事實上,同步復制要求所有能工作的follower都復制完,這條消息才會被commit,這種復制方式極大的影響了吞吐率。而異步復制方式下,follower異步的從leader復制數據,數據只要被leader寫入log就被認為已經commit,這種情況下如果follower都還沒有復制完,落后於leader時,突然leader宕機,則會丟失數據。而Kafka的這種使用ISR的方式則很好的均衡了確保數據不丟失以及吞吐率。

  結合HW和LEO看下acks=1的情況:

       

四、日志存儲

  Kafka一個分區的消息數據對應存儲在一個文件夾下,以topic名稱+分區號命名,kafka規定了一個分區內的.log文件最大為1G。

      

# 部分消息的offset索引文件,kafka每次往分區發4K(可配置)消息就會記錄一條當前消息的offset到 index文件, # 如果要定位消息的offset會先在這個文件里快速定位,再去log文件里找具體消息 
00000000000000000000.index
# 消息存儲文件,主要存offset和消息體 # LogSegment 的基准偏移量為 0,對應的日志文件為 00000000000000000000.log 
00000000000000000000.log # 消息的發送時間索引文件,kafka每次往分區發4K(可配置)消息就會記錄一條當前消息的發送時間戳與對 應的offset到timeindex文件, # 如果需要按照時間來定位消息的offset,會先在這個文件里查找 00000000000000000000.timeindex 000000000000009900000.index 000000000000009900000.log 000000000000009900000.timeindex

  KafkaBroker有一個參數,log.segment.bytes,限定了每個日志段文件的大小,最大就是1GB。一個日志段文件滿了,就自動開一個新的日志段文件來寫入,避免單個文件過大,影響文件的讀寫性能,這個過程叫做logrolling,正在被寫入的那個日志段文件,叫做activelogsegment。

  1、日志索引

  每個日志分段文件對應了兩個索引文件,主要用來提高查找消息的效率。偏移量索引文件用來建立消息偏移量offset到物理地址之間的映射關系,方便快速定位消息所在的物理文件位置;時間戳索引文件則根據指定的時間戳timestamp來查找對應的偏移量信息。Kafka中的索引文件以稀疏索引(sparseindex)的方式構造消息的索引,它並不保證每個消息在索引文件中都有對應的索引頁。每當寫入一定量(由broker端參數log.index.interval.bytes指定,默認值為4096,即4KB的消息時,偏移量索引文件和時間戳索引文件分別增加一個偏移量索引項和時間戳索引項,增大或減小log.index.interval.bytes的值,對應地可以增加或縮小索引項的密度。

  稀疏索引通過MappedByteBuffer將索引文件映射到內存中,以加快索引的查詢速度。偏移量索引文件中的偏移量是單調遞增的,查詢指定偏移量時,使用二分查找法來快速定位偏移量的位置,如果指定的偏移量不在索引文件中,則會返回小於指定偏移量的最大偏移量。時間戳索引文件中的時間戳也保持嚴格的單調遞增,查詢指定時間戳時,也根據二分查找法來查找不大於該時間戳的最大偏移量.

  • 偏移量索引:每個索引項占用8個字節,分為兩個部分。
    • relativeOffset: 相對偏移量,表示消息相對於baseOffset的偏移量,占用4個字節,當前索引文件的文件名即為baseOffset的值。
    • position: 物理地址,也就是消息在日志分段文件中對應的物理位置,占用4個字節。
  • 時間戳索引:每個索引項占用12個字節,分為兩個部分。
    • timestamp:當前日志分段最大的時間戳,占用8個字節。
    • relativeOffset:時間戳所對應的消息的相對偏移量,占用4個字節。

  如果broker端參數log.message.timestamp.type設置為LogAppendTime,那么消息的時間戳必定能夠保持單調遞增;相反,如果是CreateTime類型則無法保證。

  2、零拷貝

  Kafka使用零拷貝技術來提升性能。所謂的零拷貝是指將數據直接從磁盤文件復制到網卡設備中,而不需要經由應用程序之手。零拷貝大大提高了應用程序的性能,減少了內核和用戶模式之間的上下文切換。對Linux操作系統而言,零拷貝技術依賴於底層的sendfile()方法實現。對應於Java語言,FileChannal.transferTo()方法的底層實現就是sendfile()方法。

      


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM