Apache Kafka 源碼剖析


Getting Start

下載

優點和應用場景

  • Kafka消息驅動,符合發布-訂閱模式,優點和應用范圍都共通
  • 發布-訂閱模式優點
    1. 解耦合 : 兩個應用不需要相互調用
    2. 可擴展性 : 消費者的個數可實時擴展
    3. 實時性 : 消費者能實時的獲取生產者發布的事件
    4. 高效 :減少由於多個消費者請求數據造成的數據計算帶來的資源消耗
    5. 異步通訊 :發布-訂閱模式是天生的異步通訊
  • Kafka其他優點
    1. 持久化 : 消息丟失的可控性極高
    2. 高性能 : 磁盤順序讀寫性能比內存隨機讀寫還高,每秒10萬條消息
    3. 高吞吐量 :每秒上百MB的吞吐量
    4. 順序性
  • 發布-訂閱模式應用范圍
    1. 適合數據一被生產,就需要被處理的情況
    2. 適合數據具有潛在消費者的情況
    3. 適合無論有沒有消費者,數據都在生產的情況
    4. 不適合對數據的處理時間有特殊限定的情況
  • 應用場景
    1. 最為消息中間件,實現消息隊列和消息的發布-訂閱,消息驅動的服務
    2. 數據總線,一對多的模式
    3. 日志收集,消息中間件的一種應用
    4. 數據庫主從同步

核心概念

  • Broker
    1. 一個Kafka server就是一個Broker
    2. 一般情況下,一個Broker獨占一台服務器,發揮微服務的優勢
    3. 服務器資源有限的情況下,需要設計出Broker/Topic/Partition/Replica的最優分配策略,從而充分利用服務器資源
    4. 一個broker可以有多個topic
  • Topic
    1. 存儲消息的邏輯上的消息集合
    2. 每個Topic有多個分區
  • 分區 Partition
    1. 同一個Topic的不同分區分配在不同Broker上,也就是一個分區一個服務器
    2. 不同Topic的分區可以共享一個服務器
    3. 同一個分區的消息是有序的,通過維護offset實現
    4. 相同key的消息會被發布到同一個分區
    5. 同一個分區的消息會被一個消費組里固定的一個消費者獨占消費
    6. 通過增加分區來增加並行處理能力
    7. 每個分區可以有多個副本
  • 消費組 Consumer Group
    1. 實現一個消息只被同組的一個消費者獨占消費
    2. 消費組里的消費者有變化的時候會觸發Rebalance操作重新分配分區與消費者的對應關系
    3. Rebalance操作實現了分區消費的故障轉移
    4. 通過增加分區和消費組里的消費者數量來水平擴展,理想情況一對一,也可以一對多,最好不要多對一,造成浪費
  • 副本 Replica
    1. 同一個分區的不同副本分配在不同Broker上,但是這些Broker可以是在同一台服務器上,也可以不是
    2. 副本是一個熱備份設計,會選舉一個作為Leader,提供對外服務
    3. Fllower副本批量的從Leader副本同步消息
  • HW & LEO
    1. HW是所有ISR副本都有的最新offset,HW之前的消息在所有副本中都存在,HW由Leader副本維護
    2. 所有消費者只能獲取HW之前的消息,這樣保證了Leader副本不可用的情況下,所有消費者的狀態是一致的
    3. LEO是每個副本各自的最新offset
  • ISR集合
    1. 滿足兩個條件的副本會被選入ISR可用副本集合
      1. 副本與Zookeeper連接
      2. 副本的LEO與Leader副本的LEO差值不超過閾值
    2. ISR集合保證了Kafka不會被故障副本拖累,也保證了Leader的HW與LEO的差值在閾值內
  • 生產者
    1. 異步提交
      1. acks=0 : 生產者只管提交,不會等待Leader副本返回,不保證數據不丟失
    2. 同步提交
      1. acks=1 : 默認設置,生產者等待Leader副本返回成功,保證數據在Leader中部丟失,但是不保證重新選舉后數據不丟失
    3. 同步復制
      1. acks=all : 生產者等待所有副本同步消息后才算提交成功,保證數據不丟失,性能低
  • Log
    1. 一個副本對應一個Log,用於持久化數據,Kafka采用順序讀寫的方式,性能高
    2. 一個Log里有多個Segment,每個Segment有一個日志文件和一個索引文件
    3. 日志文件的大小有限制,超出后會生成新的Segment
    4. 日志消息保留策略有兩種
      1. 消息的保留時間超過指定時間,可以被刪除
      2. Topic的存儲滿,可以開始刪除最舊的消息
      3. 保留策略可以全局配置,也可以按Topic配置
    5. 日志壓縮
      1. 開啟日志壓縮后,相同的key會被定期合並,只保留最新的value

Kafka/zookeeper 命令

  1. 啟動Zookeeper
    1. ./zookeeper-server-start.sh ../config/zookeeper.properties
  2. 啟動Kafka
    1. ./kafka-server-start.sh ../config/server.properties
  3. 查看Topic
    1. ./kafka-topics.sh --list --zookeeper localhost:9860
  4. 刪除Topic
    1. ./kafka-topics.sh --delete --zookeeper localhost:9860 /kafka --topic test
    2. 不會立馬刪除topic
  5. 查看Topic的詳細信息
    1. ./kafka-topics.sh --zookeeper localhost:9860 --topic test--describe
  6. 查看zk信息
    1. ./zookeeper-shell.sh 127.0.0.1:9860
  7. 生產數據
    1. ./kafka-console-producer.sh --broker-list cvatap3d.nam.nsroot.net:9801 --topic midcurve-ds
  8. 消費數據
    1. ./kafka-console-consumer.sh --zookeeper localhost:9860 --topic midcurve-ds-subscribe --from-beginning

Kafka集群

zookeeper集群配置 : zookeeper.properties

  • clientPort=2180
    1. 端口號
  • dataDir=/tmp/zookeeper
    1. 集群信息記錄目錄,清空目錄可以重置zookeeper
    2. 如果需要在同一台server上啟動多個node,這個路徑必須不同
  • tickTime=2000
    1. zookeeper副本與leader之間維護心跳的頻率
  • initLimit=5
    1. zookeeper的leader初始化連接follower時等待多少個tickTime時間的心跳,超時副本連接失敗
  • syncLimit=2
    1. leader與follower之間發送消息,請求和應答超時是多少個tickTime
  • server.0=cvatap3d.nam.nsroot.net:2888:3888
  • server.1=cvatap3d.nam.nsroot.net:2889:3889
  • server.2=cvatap3d.nam.nsroot.net:2890:3890
    1. 第一個啟動的為leader
    2. zookeeper集群數量必須是基數3,5,7...
    3. 0,1,2是服務id,需要在對應的dataDir=/tmp/zookeeper下面創建myid文件,內容就是服務id,比如0
    4. ip或者host都可以
    5. 后面兩個端口是zookeeper內部通訊使用
      1. 第一個端口是用於副本與Leader建立TCP連接
      2. 第二個端口是用於Leader選舉的TCP端口

Kafka配置 : server.properties

  • broker.id=0
    1. 同一個zookeeper集群下的broker的id必須唯一
  • log.dirs=/tmp/kafka-logs
    1. 啟動kafka會從zookeeper下載配置到log目錄
    2. 如果修改了server.properties可能因為配置與存儲的配置不匹配導致啟動失敗,這時候可以刪除這個目錄
    3. 如果需要在同一台server上啟動多個broker,這個路徑必須不同
  • zookeeper.connect=localhost:2181
    1. zookeeper集群,以逗號隔開
  • listeners=PLAINTEXT://cvatap3d.nam.nsroot.net:9093
    1. broker的host:port

zookeeper與kafka

  • Kafka將Broker信息注冊到zookeeper
    1. zookeeper會維護topic與broker的關系,選舉Leader
    2. 監控partition leader存活性,發現Leader異常會重新選舉Leader
    3. 當異常Broker恢復后,會在一段時間后重新分配Leader
    4. Broker從zookeeper獲取集群中其它Broker信息
  • Consumer端將自己注冊到zookeeper
    1. 用來獲取broker列表
    2. 並和partition leader建立socket連接
    3. 在Consumer Group發生變化時進行rebalance
    4. Zookeeper管理consumer的offset跟蹤當前消費的offset。
  • Producer端將自己注冊到zookeeper
    1. 用來獲取broker列表和分區狀態,從而將消息發布到正確的Broker
    2. Zookeeper不管理producer

Kafka使用經驗總結

Consumer

  • Consumer默認自動提交Offset,並且是一獲取便提交,默認間隔5秒
    1. 當發生錯誤重啟,如果你當消費能力強,可能造成重復消費5秒內當Offset
    2. 如果消費能力比較弱,也可能提交的Offset沒有消費完,造成Offset丟失
  • 消費者使用pull的方式去拿消息
    1. 簡化kafka實現,消費者自己控制消費進度,不會有消息積壓的壓力
    2. kafka通過長輪詢/長連接來提高pull的實時性
    3. 可以設置消費端緩存消息大小:queue.buffering.max.meesages ,在自動提交模式下緩存大小需要適當控制

Kafka強一致性保證

  • Producer同步復制,性能下降
  • Kafka冪等設置
    1. Kafka使用Produce Id和sequence number實現冪等,判斷一次提交的所有消息的seq num一樣,Produce Id由zookeeper隨機生成,每次不一樣
    2. 單分區冪等,不支持分區冪等,也就是當重新分配key與分區關系當時候不支持冪等
    3. 單會話冪等,不提供重啟Prodicer后單冪等
    4. Kafka當冪等大部分情況下有效,單不能完全信任
    5. enabled.idempotence=true, 此時就會默認把acks設置為all,所以不需要再設置acks屬性了。也就是說冪等自動開啟同步復制?
  • 消費者手動提交offset
  • 分區事物管理
  • 安全關閉
    1. producer.close():優先把消息處理完畢,優雅退出。
    2. producer.close(timeout): 超時時,強制關閉。
  • 可重試/不可重試異常區別對待

Spring Cloud Stream 使用Kafka

Producer 生產數據

  • 隨機發送
    1. spring.cloud.stream.bindings.[channelName].producer.partitionKeyExpression不設置
    2. spring.cloud.stream.bindings.[channelName].producer.partitionCount=10
      1. 如果partitionCount > Partition數量, 會報錯,但如果autoAddPartitions=true,則不報錯而自動添加Partition,與instanceCount無關
      2. 如果如果partitionCount < Partition數量,則會被Partition數量覆蓋
  • 定向發送
    1. spring.cloud.stream.bindings.[channelName].producer.partitionKeyExpression=payload.currency
    2. 當Partition變多后,重啟會重新分配,但是不重啟的情況下還是保持不變,也就是說消費者自動添加Partiton后,還需要重啟生產者

Consumer Offset管理

  • Offset的起駛位置
    1. spring.cloud.stream.kafka.bindings.channelName.consumer.resetOffsets
      1. true : 每次從startOffset開始
      2. false:從Consumer當前位置開始
    2. spring.cloud.stream.kafka.bindings.channelName.consumer.startOffset
      1. 新消費組都起駛位置,latest=0, earliest=lastOffset+1
  • 不設置組名:groupId=anonymous
    1. resetOffsets 默認true, startOffset 默認latest, 所以默認每次都從最新都消息消費
    2. 無法設置resetOffsets=false,因為無法知道它當前Offset,但是可以改變startOffset
  • 設置組名:
    1. spring.cloud.stream.bindings. .group
    2. resetOffsets 默認false, startOffset 默認earliest, 所以新消費組默認從0開始消費,並且擁有記錄Offset能力
  • Offset 提交
    1. spring.cloud.stream.kafka.bindings.channelName.consumer.autoCommitOffset
      1. 默認true, 自動提交Offset
      2. false會在Message對象都header字段里添加一個kafka_acknowledgment對象,可以用來手動提交offset
      3. 但是它的AckMode.MANUAL並不是立刻提交的,而是所有pull到的Offset都處理后批量自動提交,所以只能控制哪些Offset需要提交,不能控制什么時候提交
    2. spring.cloud.stream.kafka.bindings.channelName.consumer.ackEachRecord
      1. 默認false:當所有一次pull到的Offset都消費完里之后(@StreamListener都方法執行完),才會自動提交Offset
      2. true: 每一個消息消費完都提交Offset
    3. Offset管理的是最后提交的Offset,而不是處理好的Offset的list

Consumer Partition分配

  • 自動分配
    1. spring.cloud.stream.kafka.bindings.channelName.consumer.autoRebalanceEnabled=true
    2. 自動分配Partition給消費組成員,並且會在當前pull的消息被處理完后才分配,有效避免消息被重復消費,但是也不能完全信任,比如消息處理緩慢造成心跳失敗或者pull輪詢使得它直接認為成員丟失而進行rebalance,但是數據其實還在處理
    3. 如果Comsumer數量 > Partition數量, Consumer會閑置, 但如果autoAddPartitions=true,會根據 max(instanceCount*concurrency,minPartitionCount)自動添加,並在一段時間后自動分配
    4. 自動分配與instanceCount/instanceIndex。對concurrency的處理和手動分配一樣
  • 手動分配
    1. autoRebalanceEnabled=false
    2. spring.cloud.stream.instanceCount : 根據Partition數量/(instanceCountconcurrency)數量來決定分配到這個instance的Partition。如果instanceCountconcurrency > Partition數量, 會報錯, 但如果autoAddPartitions=true,則不報錯而自動添加Partition
    3. spring.cloud.stream.instanceIndex : iinstanceIndex必須在indexCount范圍內。如果index一樣,將消費相同Partition的消息,這樣就違反了一個Partition只能被一個Comsumer Member消費的原則,造成消息的重復消費
    4. spring.cloud.stream.bindings.channelName.consumer.partitioned????????
  • Instance並發消費
    1. spring.cloud.stream.bindings.channelName.consumer.concurrency=10
    2. 同一個instance的同一個channel的多個消費者Listener也會消費相同Partition的消息,並且是同步處理,造成低性能的消息的重復消費,與concurrency無關,也就是說spring的一個instance的一個channel就只能有一個consumer的@StreamListener。是否可以定義多個channel監聽同一個topic來實現instance級別的concurrency?
    3. concurrency=10代表你想在一個instance中啟動10consumer線程去處理10個partition的message

數據強一致性

  • 保證消息至少被發送一次
  • 保證消息只被發送一次
  • 保證消息至少被消費一次
    1. ackEachRecord=false保證消息至少被消費一次,但是可以有一整批消息會被消費多次
    2. ackEachRecord=true保證消息至少被消費一次,而且保證Consumer故障減少后?消息不會被重復消費,但是不保證增加Consumer
  • 保證消息只被消費一次

性能優化

在線水平伸縮


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM