0.11 版本之前保證的語義是:至少一次
至少一次的解釋
可以做到消息不丟失--> 可以做到發送成功的消息一定可以被消費到。
不能做到消息不重復。
## 發送成功的消息,表示業務邏輯認為此消息已發送成功,即send方法已執行完成。
丟消息場景
異步發送端:
a:send之后,等待發送的時候down(消息在緩沖區中),導致消息丟失。
b:send時,緩沖區已滿,導致消息丟棄。
同步(異步)發送端:
a:有限的重試
服務端Leader down時,因為有與zk的超時timeout,導致在timeout之后才會進行切換, 如果重試次數 * 重試間隔 < zk session timeout + 切換耗時,則消息會丟失。
b:ack != -1
ack = 0 的場景,不需服務端確認,發送后,Leader down,導致消息丟失。
ack = 1 的場景,只需要Leader確認,Leader收到消息后,未同步到Replica之前,Leader down,導致消息丟失。
服務端:
a:min.insync.replicas < 2 且 unclean.leader.election.enable = true
min.insync.replicas < 2 的場景下,如果副本均落后Leader,在Leader down時,根據臟選開關,會選擇落后的副本作為新的Leader,則落后的數據會丟失。
消費端:
a:自動offset提交
消息處理失敗,但是offset也提交了,對業務來說消息丟失。
b:先手動提交offset,后處理消息
先提交offset,后處理消息,但是處理邏輯失敗,對業務來說消息丟失。
不丟失數據的方法
發送端:
a:同步發送
b:retries = Long.MAX_VALUE
c:acks = -1
服務端:
a:replication factor = 3
b:min.insync.replicas = 2
c:unclean.leader.election.enable = false
消費端:
a:auto.commit.enable = false
b:僅當消息處理成功之后再提交offset
消息重復的場景
發送端:
a:發送端重試
發送端發送消息后,服務端實際已接收,但是客戶端因為網絡或其他原因未收到確認響應, 再進行消息重試發送,導致消息重復。
消費端:
a:auto.commit.enable = true
消息處理后,為進行自動offset提交之前,consumer down,恢復后從上個提交點開始消費導致消息處理重復。
消息不重復的方法
發送端 --> 做不到。
消費端 --> auto.commit.enable = false,並且等消息處理完成再提交offset。
0.11之后版本保證的語義是:恰好一次
恰好一次的解釋
可以做到消息不丟失 --> 可以做到發送成功的消息一定可以被消費到。
可以做到消息不重復。
發送端做不到消息不重復的解決辦法
方法:
給Producer編號,且給每條消息編號,服務端保存Producer編號與此Producer最后一條消息的映射,服務端校驗當前收到的消息是否與保存的最后一條消息編號相同,如果相同則拒絕。
配置:
enable.idempotence = true -->
retries = Integer.MAX_VALUE
max.in.flight.requests.per.connection = 1
acks = -1
過程:
a:Producer發送InitProducerIdRequest獲取ProducerIdAndEpoch producerId 是Broker從zk分段獲取后遞增分配的,保證唯一,epoch為事務所用,暫時不提。
b:Producer發送消息(消息中加入ProducerIdAndEpoch,消息序列號(0))。
c:服務端收到消息,內存中維護 ProducerIdAndEpoch與此消息的映射,數據落盤, 同步到副本。
d:Producer發送消息(消息中加入ProducerIdAndEpoch,消息序列號(1))。
e:服務端收到消息,判斷內存中的映射是否存在,如不存在,更新映射,數據落盤, 同步到副本,如存在,返回消息重復異常。
判斷是否重復的條件:
isFromClient == true &&
batch.producerIdAndEpoch == producerIdAndEpoch &&
batch.baseSequence == firstSeq &&
batch.lastSequence == lastSeq
幾個問題:
a:重試的場景中,ProducerIdAndEpoch映射的會不會不是上次重試的消息。
max.in.flight.requests.per.connection代表了同時只能有一條(批)消
息在發送,retries 保證了必須發送成功才會進行下一條的發送,所以不會有映射成其他消息 的情況。
b:有了映射緩存,其他副本沒成功復制怎么辦 acks = -1 保證了其他副本必須復制成功。
c:服務端啟動或新選舉為Leader的時候,緩存內容為空怎么判斷重復 會先構造緩存內容再提供服務 初始化時加載.snapshot文件,append消息時更新緩存,退出時,寫入.snapshot文件。
事務消息
事務消息語義
保證發送消息不重復。
保證多條消息(發往不同服務端)發送的原子性,即同時提交或同時回滾。
保證 “消費-處理-發送”邏輯的原子性,及要么全部成功,要么整體回滾。
整體原理
Producer新增 transactionalId、producerId、epoch標識,可標記事務。
新增transaction coordinator,協調處理事務的開啟、提交、終止,記錄事務日志 ;
transaction coordinator依附在Broker上,基於Broker提供服務;
記錄事務日志是一個內部可根據Key壓縮的Topic,可復制的,當 transaction coordinator down,有其他的接管繼續事務處理。
服務端只要接收到消息,就寫入文件,不管后續是否是提交或者終止。
新增control類型消息,標記事務是提交還是終止(全部消息以transactionalId為標識)。
消費端使用緩存堆積消息,直到看到control類型消息,返回給應用或丟棄。
多條消息發送的原子性
發送消息過程
a:初始化事務
發送FindCoordinatorRequest,查找transaction coordinator。
發送InitProducerIdRequest到transaction coordinator,獲取ProducerIdAndEpoch。
transaction coordinator 新增消息 transactionalId --> producerId 到事務日志。
transaction coordinator 恢復或終止此 transactionalId 之前的事務。
b:開啟事務
c:生產消息(可多次)
發送AddPartitionsToTxnRequest到transaction coordinator,transaction coordinator增加 BEGIN的事務日志,日志包含此partition 發送消息到實際的Broker,Broker接收消息寫入文件並復制到副本
d:提交或終止事務
Producer 發送 EndTxnRequest 到 transaction coordinator transaction coordinator 寫入PREPARE_COMMIT或 PREPARE_ABORT到事務日志 transaction coordinator 依次向全部partition發送 COMMIT/ ABORT的control消息 transaction coordinator 寫入 COMMIT/ ABORT到事務日志 Broker接收消息寫入文件並復制到副本
消費過程(READ_COMMITED)
a:初始化、Rebalance
b:拉消息,如果不包含transactionalId,則返回給應用,如果包含,則放入緩存,直到拉到該transactionalId的control消息,如果該control消息是COMMIT,則相關消息返回應用;如果是ABORT,則刪除這些消息
“消費-處理-發送”邏輯的原子性
offset提交:
offset提交除了寫zk外,還提供一種寫Broker的方式,即新增 consumercoordinator 接收offset 的提交請求,consumercoordinator 將 groupId-offset 寫入內部Topic,此Topic可壓縮,即舊的 groupId-offset 會被清除,只保留最新的。
處理過程
a:初始化事務
b:開啟事務
c:生產消息(可多次)
發送AddPartitionsToTxnRequest到 transaction coordinator,transaction coordinator增加 BEGIN的事務日志,日志包含此partition 發送消息到實際的Broker,Broker接收消息寫入文件並復制到副本。
發送 AddOffsetsToTxnRequest 到 transaction coordinator,寫入事務日志。
發送 TxnOffsetCommitRequest 到 consumercoordinator,寫入文件並復制到副本。
d:提交或終止事務
Producer 發送 EndTxnRequest 到 transaction。
coordinator transaction coordinator 寫入PREPARE_COMMIT或 PREPARE_ABORT到事務日志。
transaction coordinator 依次向全部partition發送 COMMIT/ ABORT的control消息。
transaction coordinator 向 consumercoordinator 發送 offset提交的control消息。
transaction coordinator 寫入 COMMIT/ ABORT到事務日志。
Broker接收消息寫入文件並復制到副本。
消息可見性
offset可見性:
offset提交后,事務提交前,此offset雖然寫入文件,但是 consumercoordinator 緩存不更新,直到收到事務提交的control消息才更新緩存,即事務提交前,外部不能查詢到此 offset,只能查詢到舊的offset。
消息可見性:
a:read_uncommited
所有消息都可被拉取,也可返回給應用處理。
b:read_commited
所有消息都可被拉取,但是只有判斷已提交的可返回給應用處理。
失敗場景分析
生產端事務開啟后,發消息前down:
事務無任何動作,后續會被超時處理掉,不影響事務語義。
生產端事務開啟后,發消息后,保存offset前down:
事務會被超時關閉,該消息沒有對應的control消息,對外不可見,不影響事務語義。
生產端事務開啟后,發消息后,保存offset后,提交事務前down:
事務會被超時關閉,該消息及offset沒有對應的control消息,對外不可見,不影響事務語義。
生產端事務提交事務后down:
后續事務動作與客戶端無關,由服務端處理,會全部執行完成。
transaction coordinator down:
因為 transaction coordinator 實際為 Broker,Kafka既有機制保證選舉新的 coordinator。
因為事務日志是同步的,即最少有三個節點共享事務狀態,單個down不影響事務繼續。
consumercoordinator down:
事務提交前,offset信息是同步的,但是不生效。
事務提交后,offset信息是同步的,並可對外公開。
broker leader down:
Kafka既有機制保證重新選舉Leader,且消息同步。
消費端 down:
read_uncommited的場景,出現消費到未提交事務的消息,且有可能重復。
read_commited的場景,不會消費到未提交事務的消息,其他消費者接管后,從之前位置繼續。
最新穩定的消息位置(LSO)
問題:穿插在一批事務消息中間的非事務消息,被消費端消費后,如何提交消費位置
解答:通過LSO機制,即只能消費到最早的全部事務完成的消息位置
事務日志狀態
BEGIN
PREPARE_COMMIT
PREPARE_ABORT
COMMIT
ABORT