Getting Start
下載
優點和應用場景
- Kafka消息驅動,符合發布-訂閱模式,優點和應用范圍都共通
- 發布-訂閱模式優點
- 解耦合 : 兩個應用不需要相互調用
- 可擴展性 : 消費者的個數可實時擴展
- 實時性 : 消費者能實時的獲取生產者發布的事件
- 高效 :減少由於多個消費者請求數據造成的數據計算帶來的資源消耗
- 異步通訊 :發布-訂閱模式是天生的異步通訊
- Kafka其他優點
- 持久化 : 消息丟失的可控性極高
- 高性能 : 磁盤順序讀寫性能比內存隨機讀寫還高,每秒10萬條消息
- 高吞吐量 :每秒上百MB的吞吐量
- 順序性
- 發布-訂閱模式應用范圍
- 適合數據一被生產,就需要被處理的情況
- 適合數據具有潛在消費者的情況
- 適合無論有沒有消費者,數據都在生產的情況
- 不適合對數據的處理時間有特殊限定的情況
- 應用場景
- 最為消息中間件,實現消息隊列和消息的發布-訂閱,消息驅動的服務
- 數據總線,一對多的模式
- 日志收集,消息中間件的一種應用
- 數據庫主從同步
核心概念
- Broker
- 一個Kafka server就是一個Broker
- 一般情況下,一個Broker獨占一台服務器,發揮微服務的優勢
- 服務器資源有限的情況下,需要設計出Broker/Topic/Partition/Replica的最優分配策略,從而充分利用服務器資源
- 一個broker可以有多個topic
- Topic
- 存儲消息的邏輯上的消息集合
- 每個Topic有多個分區
- 分區 Partition
- 同一個Topic的不同分區分配在不同Broker上,也就是一個分區一個服務器
- 不同Topic的分區可以共享一個服務器
- 同一個分區的消息是有序的,通過維護offset實現
- 相同key的消息會被發布到同一個分區
- 同一個分區的消息會被一個消費組里固定的一個消費者獨占消費
- 通過增加分區來增加並行處理能力
- 每個分區可以有多個副本
- 消費組 Consumer Group
- 實現一個消息只被同組的一個消費者獨占消費
- 消費組里的消費者有變化的時候會觸發Rebalance操作重新分配分區與消費者的對應關系
- Rebalance操作實現了分區消費的故障轉移
- 通過增加分區和消費組里的消費者數量來水平擴展,理想情況一對一,也可以一對多,最好不要多對一,造成浪費
- 副本 Replica
- 同一個分區的不同副本分配在不同Broker上,但是這些Broker可以是在同一台服務器上,也可以不是
- 副本是一個熱備份設計,會選舉一個作為Leader,提供對外服務
- Fllower副本批量的從Leader副本同步消息
- HW & LEO
- HW是所有ISR副本都有的最新offset,HW之前的消息在所有副本中都存在,HW由Leader副本維護
- 所有消費者只能獲取HW之前的消息,這樣保證了Leader副本不可用的情況下,所有消費者的狀態是一致的
- LEO是每個副本各自的最新offset
- ISR集合
- 滿足兩個條件的副本會被選入ISR可用副本集合
- 副本與Zookeeper連接
- 副本的LEO與Leader副本的LEO差值不超過閾值
- ISR集合保證了Kafka不會被故障副本拖累,也保證了Leader的HW與LEO的差值在閾值內
- 滿足兩個條件的副本會被選入ISR可用副本集合
- 生產者
- 異步提交
- acks=0 : 生產者只管提交,不會等待Leader副本返回,不保證數據不丟失
- 同步提交
- acks=1 : 默認設置,生產者等待Leader副本返回成功,保證數據在Leader中部丟失,但是不保證重新選舉后數據不丟失
- 同步復制
- acks=all : 生產者等待所有副本同步消息后才算提交成功,保證數據不丟失,性能低
- 異步提交
- Log
- 一個副本對應一個Log,用於持久化數據,Kafka采用順序讀寫的方式,性能高
- 一個Log里有多個Segment,每個Segment有一個日志文件和一個索引文件
- 日志文件的大小有限制,超出后會生成新的Segment
- 日志消息保留策略有兩種
- 消息的保留時間超過指定時間,可以被刪除
- Topic的存儲滿,可以開始刪除最舊的消息
- 保留策略可以全局配置,也可以按Topic配置
- 日志壓縮
- 開啟日志壓縮后,相同的key會被定期合並,只保留最新的value
Kafka/zookeeper 命令
- 啟動Zookeeper
- ./zookeeper-server-start.sh ../config/zookeeper.properties
- 啟動Kafka
- ./kafka-server-start.sh ../config/server.properties
- 查看Topic
- ./kafka-topics.sh --list --zookeeper localhost:9860
- 刪除Topic
- ./kafka-topics.sh --delete --zookeeper localhost:9860 /kafka --topic test
- 不會立馬刪除topic
- 查看Topic的詳細信息
- ./kafka-topics.sh --zookeeper localhost:9860 --topic test--describe
- 查看zk信息
- ./zookeeper-shell.sh 127.0.0.1:9860
- 生產數據
- ./kafka-console-producer.sh --broker-list cvatap3d.nam.nsroot.net:9801 --topic midcurve-ds
- 消費數據
- ./kafka-console-consumer.sh --zookeeper localhost:9860 --topic midcurve-ds-subscribe --from-beginning
Kafka集群
zookeeper集群配置 : zookeeper.properties
- clientPort=2180
- 端口號
- dataDir=/tmp/zookeeper
- 集群信息記錄目錄,清空目錄可以重置zookeeper
- 如果需要在同一台server上啟動多個node,這個路徑必須不同
- tickTime=2000
- zookeeper副本與leader之間維護心跳的頻率
- initLimit=5
- zookeeper的leader初始化連接follower時等待多少個tickTime時間的心跳,超時副本連接失敗
- syncLimit=2
- leader與follower之間發送消息,請求和應答超時是多少個tickTime
- server.0=cvatap3d.nam.nsroot.net:2888:3888
- server.1=cvatap3d.nam.nsroot.net:2889:3889
- server.2=cvatap3d.nam.nsroot.net:2890:3890
- 第一個啟動的為leader
- zookeeper集群數量必須是基數3,5,7...
- 0,1,2是服務id,需要在對應的dataDir=/tmp/zookeeper下面創建myid文件,內容就是服務id,比如0
- ip或者host都可以
- 后面兩個端口是zookeeper內部通訊使用
- 第一個端口是用於副本與Leader建立TCP連接
- 第二個端口是用於Leader選舉的TCP端口
Kafka配置 : server.properties
- broker.id=0
- 同一個zookeeper集群下的broker的id必須唯一
- log.dirs=/tmp/kafka-logs
- 啟動kafka會從zookeeper下載配置到log目錄
- 如果修改了server.properties可能因為配置與存儲的配置不匹配導致啟動失敗,這時候可以刪除這個目錄
- 如果需要在同一台server上啟動多個broker,這個路徑必須不同
- zookeeper.connect=localhost:2181
- zookeeper集群,以逗號隔開
- listeners=PLAINTEXT://cvatap3d.nam.nsroot.net:9093
- broker的host:port
zookeeper與kafka
- Kafka將Broker信息注冊到zookeeper
- zookeeper會維護topic與broker的關系,選舉Leader
- 監控partition leader存活性,發現Leader異常會重新選舉Leader
- 當異常Broker恢復后,會在一段時間后重新分配Leader
- Broker從zookeeper獲取集群中其它Broker信息
- Consumer端將自己注冊到zookeeper
- 用來獲取broker列表
- 並和partition leader建立socket連接
- 在Consumer Group發生變化時進行rebalance
- Zookeeper管理consumer的offset跟蹤當前消費的offset。
- Producer端將自己注冊到zookeeper
- 用來獲取broker列表和分區狀態,從而將消息發布到正確的Broker
- Zookeeper不管理producer
Kafka使用經驗總結
Consumer
- Consumer默認自動提交Offset,並且是一獲取便提交,默認間隔5秒
- 當發生錯誤重啟,如果你當消費能力強,可能造成重復消費5秒內當Offset
- 如果消費能力比較弱,也可能提交的Offset沒有消費完,造成Offset丟失
- 消費者使用pull的方式去拿消息
- 簡化kafka實現,消費者自己控制消費進度,不會有消息積壓的壓力
- kafka通過長輪詢/長連接來提高pull的實時性
- 可以設置消費端緩存消息大小:queue.buffering.max.meesages ,在自動提交模式下緩存大小需要適當控制
Kafka強一致性保證
- Producer同步復制,性能下降
- Kafka冪等設置
- Kafka使用Produce Id和sequence number實現冪等,判斷一次提交的所有消息的seq num一樣,Produce Id由zookeeper隨機生成,每次不一樣
- 單分區冪等,不支持分區冪等,也就是當重新分配key與分區關系當時候不支持冪等
- 單會話冪等,不提供重啟Prodicer后單冪等
- Kafka當冪等大部分情況下有效,單不能完全信任
- enabled.idempotence=true, 此時就會默認把acks設置為all,所以不需要再設置acks屬性了。也就是說冪等自動開啟同步復制?
- 消費者手動提交offset
- 分區事物管理
- 安全關閉
- producer.close():優先把消息處理完畢,優雅退出。
- producer.close(timeout): 超時時,強制關閉。
- 可重試/不可重試異常區別對待
Spring Cloud Stream 使用Kafka
Producer 生產數據
- 隨機發送
- spring.cloud.stream.bindings.[channelName].producer.partitionKeyExpression不設置
- spring.cloud.stream.bindings.[channelName].producer.partitionCount=10
- 如果partitionCount > Partition數量, 會報錯,但如果autoAddPartitions=true,則不報錯而自動添加Partition,與instanceCount無關
- 如果如果partitionCount < Partition數量,則會被Partition數量覆蓋
- 定向發送
- spring.cloud.stream.bindings.[channelName].producer.partitionKeyExpression=payload.currency
- 當Partition變多后,重啟會重新分配,但是不重啟的情況下還是保持不變,也就是說消費者自動添加Partiton后,還需要重啟生產者
Consumer Offset管理
- Offset的起駛位置
- spring.cloud.stream.kafka.bindings.channelName.consumer.resetOffsets
- true : 每次從startOffset開始
- false:從Consumer當前位置開始
- spring.cloud.stream.kafka.bindings.channelName.consumer.startOffset
- 新消費組都起駛位置,latest=0, earliest=lastOffset+1
- spring.cloud.stream.kafka.bindings.channelName.consumer.resetOffsets
- 不設置組名:groupId=anonymous
- resetOffsets 默認true, startOffset 默認latest, 所以默認每次都從最新都消息消費
- 無法設置resetOffsets=false,因為無法知道它當前Offset,但是可以改變startOffset
- 設置組名:
- spring.cloud.stream.bindings.
.group - resetOffsets 默認false, startOffset 默認earliest, 所以新消費組默認從0開始消費,並且擁有記錄Offset能力
- spring.cloud.stream.bindings.
- Offset 提交
- spring.cloud.stream.kafka.bindings.channelName.consumer.autoCommitOffset
- 默認true, 自動提交Offset
- false會在Message對象都header字段里添加一個kafka_acknowledgment對象,可以用來手動提交offset
- 但是它的AckMode.MANUAL並不是立刻提交的,而是所有pull到的Offset都處理后批量自動提交,所以只能控制哪些Offset需要提交,不能控制什么時候提交
- spring.cloud.stream.kafka.bindings.channelName.consumer.ackEachRecord
- 默認false:當所有一次pull到的Offset都消費完里之后(@StreamListener都方法執行完),才會自動提交Offset
- true: 每一個消息消費完都提交Offset
- Offset管理的是最后提交的Offset,而不是處理好的Offset的list
- spring.cloud.stream.kafka.bindings.channelName.consumer.autoCommitOffset
Consumer Partition分配
- 自動分配
- spring.cloud.stream.kafka.bindings.channelName.consumer.autoRebalanceEnabled=true
- 自動分配Partition給消費組成員,並且會在當前pull的消息被處理完后才分配,有效避免消息被重復消費,但是也不能完全信任,比如消息處理緩慢造成心跳失敗或者pull輪詢使得它直接認為成員丟失而進行rebalance,但是數據其實還在處理
- 如果Comsumer數量 > Partition數量, Consumer會閑置, 但如果autoAddPartitions=true,會根據 max(instanceCount*concurrency,minPartitionCount)自動添加,並在一段時間后自動分配
- 自動分配與instanceCount/instanceIndex。對concurrency的處理和手動分配一樣
- 手動分配
- autoRebalanceEnabled=false
- spring.cloud.stream.instanceCount : 根據Partition數量/(instanceCountconcurrency)數量來決定分配到這個instance的Partition。如果instanceCountconcurrency > Partition數量, 會報錯, 但如果autoAddPartitions=true,則不報錯而自動添加Partition
- spring.cloud.stream.instanceIndex : iinstanceIndex必須在indexCount范圍內。如果index一樣,將消費相同Partition的消息,這樣就違反了一個Partition只能被一個Comsumer Member消費的原則,造成消息的重復消費
- spring.cloud.stream.bindings.channelName.consumer.partitioned????????
- Instance並發消費
- spring.cloud.stream.bindings.channelName.consumer.concurrency=10
- 同一個instance的同一個channel的多個消費者Listener也會消費相同Partition的消息,並且是同步處理,造成低性能的消息的重復消費,與concurrency無關,也就是說spring的一個instance的一個channel就只能有一個consumer的@StreamListener。是否可以定義多個channel監聽同一個topic來實現instance級別的concurrency?
- concurrency=10代表你想在一個instance中啟動10consumer線程去處理10個partition的message
數據強一致性
- 保證消息至少被發送一次
- 保證消息只被發送一次
- 保證消息至少被消費一次
- ackEachRecord=false保證消息至少被消費一次,但是可以有一整批消息會被消費多次
- ackEachRecord=true保證消息至少被消費一次,而且保證Consumer故障減少后?消息不會被重復消費,但是不保證增加Consumer
- 保證消息只被消費一次