kafka實現無消息丟失與精確一次語義(exactly once)處理


在很多的流處理框架的介紹中,都會說kafka是一個可靠的數據源,並且推薦使用Kafka當作數據源來進行使用。這是因為與其他消息引擎系統相比,kafka提供了可靠的數據保存及備份機制。並且通過消費者位移這一概念,可以讓消費者在因某些原因宕機而重啟后,可以輕易得回到宕機前的位置。

但其實kafka的可靠性也只能說是相對的,在整條數據鏈條中,總有可以讓數據出現丟失的情況,今天就來討論如何避免kafka數據丟失,以及實現精確一致處理的語義。

kafka無消息丟失處理

在討論如何實現kafka無消息丟失的時候,首先要先清楚大部分情況下消息丟失是在什么情況下發生的。為什么是大部分,因為總有一些非常特殊的情況會被人忽略,而我們只需要關注普遍的情況就足夠了。接下來我們來討論如何較為普遍的數據丟失情況。

1.1 生產者丟失

前面介紹Kafka分區和副本的時候,有提到過一個producer客戶端有一個acks的配置,這個配置為0的時候,producer是發送之后不管的,這個時候就很有可能因為網絡等原因造成數據丟失,所以應該盡量避免。但是將ack設置為1就沒問題了嗎,那也不一定,因為有可能在leader副本接收到數據,但還沒同步給其他副本的時候就掛掉了,這時候數據也是丟失了。並且這種時候是客戶端以為消息發送成功,但kafka丟失了數據。

要達到最嚴格的無消息丟失配置,應該是要將acks的參數設置為-1(也就是all),並且將min.insync.replicas配置項調高到大於1,這部分內容在上一篇副本機制有介紹詳細解析kafka之kafka分區和副本

同時還需要使用帶有回調的producer api,來發送數據。注意這里討論的都是異步發送消息,同步發送不在討論范圍。

public class send{
    ......
    public static void main(){
        ...
        /*
        *  第一個參數是 ProducerRecord 類型的對象,封裝了目標 Topic,消息的 kv
        *  第二個參數是一個 CallBack 對象,當生產者接收到 Kafka 發來的 ACK 確認消息的時候,
        *  會調用此 CallBack 對象的 onCompletion() 方法,實現回調功能
        */
         producer.send(new ProducerRecord<>(topic, messageNo, messageStr),
                        new DemoCallBack(startTime, messageNo, messageStr));
        ...
    }
    ......
}

class DemoCallBack implements Callback {
    /* 開始發送消息的時間戳 */
    private final long startTime;
    private final int key;
    private final String message;

    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }

    /**
     * 生產者成功發送消息,收到 Kafka 服務端發來的 ACK 確認消息后,會調用此回調函數
     * @param metadata 生產者發送的消息的元數據,如果發送過程中出現異常,此參數為 null
     * @param exception 發送過程中出現的異常,如果發送成功為 null
     */
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.printf("message: (%d, %s) send to partition %d, offset: %d, in %d\n",
                    key, message, metadata.partition(), metadata.offset(), elapsedTime);
        } else {
            exception.printStackTrace();
        }
    }
}

更詳細的代碼可以參考這里:Kafka生產者分析——KafkaProducer

我們之前提到過,producer發送到kafka broker的時候,是有多種可能會失敗的,而回調函數能准確告訴你是否確認發送成功,當然這依托於acks和min.insync.replicas的配置。而當數據發送丟失的時候,就可以進行手動重發或其他操作,從而確保生產者發送成功。

1.2 kafka內部丟失

有些時候,kafka內部因為一些不大好的配置,可能會出現一些極為隱蔽的數據丟失情況,那么我們分別討論下大致都有哪幾種情況。

首先是replication.factor配置參數,這個配置決定了副本的數量,默認是1。注意這個參數不能超過broker的數量。說這個參數其實是因為如果使用默認的1,或者不在創建topic的時候指定副本數量(也就是副本數為1),那么當一台機器出現磁盤損壞等情況,那么數據也就從kafka里面丟失了。所以replication.factor這個參數最好是配置大於1,比如說3

接下來要說的還是和副本相關的,也是上一篇副本中提到的unclean.leader.election.enable 參數,這個參數是在主副本掛掉,然后在ISR集合中沒有副本可以成為leader的時候,要不要讓進度比較慢的副本成為leader的。不用多說,讓進度比較慢的副本成為leader,肯定是要丟數據的。雖然可能會提高一些可用性,但如果你的業務場景丟失數據更加不能忍受,那還是將unclean.leader.election.enable設置為false吧

1.3 消費者丟失

消費者丟失的情況,其實跟消費者位移處理不當有關。消費者位移提交有一個參數,enable.auto.commit,默認是true,決定是否要讓消費者自動提交位移。如果開啟,那么consumer每次都是先提交位移,再進行消費,比如先跟broker說這5個數據我消費好了,然后才開始慢慢消費這5個數據。

這樣處理的話,好處是簡單,壞處就是漏消費數據,比如你說要消費5個數據,消費了2個自己就掛了。那下次該consumer重啟后,在broker的記錄中這個consumer是已經消費了5個的。

所以最好的做法就是將enable.auto.commit設置為false,改為手動提交位移,在每次消費完之后再手動提交位移信息。當然這樣又有可能會重復消費數據,畢竟exactly once處理一直是一個問題呀(/攤手)。遺憾的是kafka目前沒有保證consumer冪等消費的措施,如果確實需要保證consumer的冪等,可以對每條消息維持一個全局的id,每次消費進行去重,當然耗費這么多的資源來實現exactly once的消費到底值不值,那就得看具體業務了。

1.4 無消息丟失小結

那么到這里先來總結下無消息丟失的主要配置吧:

  • producer的acks設置位-1,同時min.insync.replicas設置大於1。並且使用帶有回調的producer api發生消息。
  • 默認副本數replication.factor設置為大於1,或者創建topic的時候指定大於1的副本數。
  • unclean.leader.election.enable 設置為false,防止定期副本leader重選舉
  • 消費者端,自動提交位移enable.auto.commit設置為false。在消費完后手動提交位移。

那么接下來就來說說kafka實現精確一次(exactly once)處理的方法吧。

實現精確一次(exactly once)處理

在分布式環境下,要實現消息一致與精確一次(exactly once)語義處理是很難的。精確一次處理意味着一個消息只處理一次,造成一次的效果,不能多也不能少。

那么kafka如何能夠實現這樣的效果呢?在介紹之前,我們先來介紹其他兩個語義,至多一次(at most once)和至少一次(at least once)。

最多一次和至少一次

最多一次就是保證一條消息只發送一次,這個其實最簡單,異步發送一次然后不管就可以,缺點是容易丟數據,所以一般不采用。

至少一次語義是kafka默認提供的語義,它保證每條消息都能至少接收並處理一次,缺點是可能有重復數據。

前面有介紹過acks機制,當設置producer客戶端的acks是1的時候,broker接收到消息就會跟producer確認。但producer發送一條消息后,可能因為網絡原因消息超時未達,這時候producer客戶端會選擇重發,broker回應接收到消息,但很可能最開始發送的消息延遲到達,就會造成消息重復接收。

那么針對這些情況,要如何實現精確一次處理的語義呢?

冪等的producer

要介紹冪等的producer之前,得先了解一下冪等這個詞是什么意思。冪等這個詞最早起源於函數式編程,意思是一個函數無論執行多少次都會返回一樣的結果。比如說讓一個數加1就不是冪等的,而讓一個數取整就是冪等的。因為這個特性所以冪等的函數適用於並發的場景下。

但冪等在分布式系統中含義又做了進一步的延申,比如在kafka中,冪等性意味着一個消息無論重復多少次,都會被當作一個消息來持久化處理。

kafka的producer默認是支持最少一次語義,也就是說不是冪等的,這樣在一些比如支付等要求精確數據的場景會出現問題,在0.11.0后,kafka提供了讓producer支持冪等的配置操作。即:

props.put("enable.idempotence", ture)

在創建producer客戶端的時候,添加這一行配置,producer就變成冪等的了。注意開啟冪等性的時候,acks就自動是“all”了,如果這時候手動將ackss設置為0,那么會報錯。

而底層實現其實也很簡單,就是對每條消息生成一個id值,broker會根據這個id值進行去重,從而實現冪等,這樣一來就能夠實現精確一次的語義了。

但是!冪等的producery也並非萬能。有兩個主要是缺陷:

  • 冪等性的producer僅做到單分區上的冪等性,即單分區消息不重復,多分區無法保證冪等性。
  • 只能保持單會話的冪等性,無法實現跨會話的冪等性,也就是說如果producer掛掉再重啟,無法保證兩個會話間的冪等(新會話可能會重發)。因為broker端無法獲取之前的狀態信息,所以無法實現跨會話的冪等。

事務的producer

當遇到上述冪等性的缺陷無法解決的時候,可以考慮使用事務了。事務可以支持多分區的數據完整性,原子性。並且支持跨會話的exactly once處理語義,也就是說如果producer宕機重啟,依舊能保證數據只處理一次。

開啟事務也很簡單,首先需要開啟冪等性,即設置enable.idempotence為true。然后對producer發送代碼做一些小小的修改。

//初始化事務
producer.initTransactions();
try {
	//開啟一個事務
	producer.beginTransaction();
	producer.send(record1);
	producer.send(record2);
	//提交
	producer.commitTransaction();
} catch (KafkaException e) {
	//出現異常的時候,終止事務
	producer.abortTransaction();
}

但無論開啟冪等還是事務的特性,都會對性能有一定影響,這是必然的。所以kafka默認也並沒有開啟這兩個特性,而是交由開發者根據自身業務特點進行處理。

以上~


推薦閱讀:
分布式系統一致性問題與Raft算法(上)
Scala函數式編程(五) 函數式的錯誤處理
大數據存儲的進化史 --從 RAID 到 Hadoop Hdfs


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM