Kafka如何保證百萬級寫入速度以及保證不丟失不重復消費


 正文前先來一波福利推薦:

福利一:

百萬年薪架構師視頻,該視頻可以學到很多東西,是本人花錢買的VIP課程,學習消化了一年,為了支持一下女朋友公眾號也方便大家學習,共享給大家。

福利二:

畢業答辯以及工作上各種答辯,平時積累了不少精品PPT,現在共享給大家,大大小小加起來有幾千套,總有適合你的一款,很多是網上是下載不到。

獲取方式:

微信關注 精品3分鍾 ,id為 jingpin3mins,關注后回復   百萬年薪架構師 ,精品收藏PPT  獲取雲盤鏈接,謝謝大家支持!

------------------------正文開始---------------------------

 

一、如何保證百萬級寫入速度:

目錄

1、頁緩存技術 + 磁盤順序寫

2、零拷貝技術

3、最后的總結

“這篇文章來聊一下Kafka的一些架構設計原理,這也是互聯網公司面試時非常高頻的技術考點。

Kafka是高吞吐低延遲的高並發、高性能的消息中間件,在大數據領域有極為廣泛的運用。配置良好的Kafka集群甚至可以做到每秒幾十萬、上百萬的超高並發寫入。

那么Kafka到底是如何做到這么高的吞吐量和性能的呢?這篇文章我們來一點一點說一下。

1、頁緩存技術 + 磁盤順序寫

 

首先Kafka每次接收到數據都會往磁盤上去寫,如下圖所示。

那么在這里我們不禁有一個疑問了,如果把數據基於磁盤來存儲,頻繁的往磁盤文件里寫數據,這個性能會不會很差?大家肯定都覺得磁盤寫性能是極差的。

沒錯,要是真的跟上面那個圖那么簡單的話,那確實這個性能是比較差的。

但是實際上Kafka在這里有極為優秀和出色的設計,就是為了保證數據寫入性能,首先Kafka是基於操作系統的頁緩存來實現文件寫入的。

操作系統本身有一層緩存,叫做page cache,是在內存里的緩存,我們也可以稱之為os cache,意思就是操作系統自己管理的緩存。

你在寫入磁盤文件的時候,可以直接寫入這個os cache里,也就是僅僅寫入內存中,接下來由操作系統自己決定什么時候把os cache里的數據真的刷入磁盤文件中。

僅僅這一個步驟,就可以將磁盤文件寫性能提升很多了,因為其實這里相當於是在寫內存,不是在寫磁盤,大家看下圖。

接着另外一個就是kafka寫數據的時候,非常關鍵的一點,他是以磁盤順序寫的方式來寫的。也就是說,僅僅將數據追加到文件的末尾,不是在文件的隨機位置來修改數據。

普通的機械磁盤如果你要是隨機寫的話,確實性能極差,也就是隨便找到文件的某個位置來寫數據。

但是如果你是追加文件末尾按照順序的方式來寫數據的話,那么這種磁盤順序寫的性能基本上可以跟寫內存的性能本身也是差不多的。

所以大家就知道了,上面那個圖里,Kafka在寫數據的時候,一方面基於了os層面的page cache來寫數據,所以性能很高,本質就是在寫內存罷了。

另外一個,他是采用磁盤順序寫的方式,所以即使數據刷入磁盤的時候,性能也是極高的,也跟寫內存是差不多的。

基於上面兩點,kafka就實現了寫入數據的超高性能。

那么大家想想,假如說kafka寫入一條數據要耗費1毫秒的時間,那么是不是每秒就是可以寫入1000條數據?

但是假如kafka的性能極高,寫入一條數據僅僅耗費0.01毫秒呢?那么每秒是不是就可以寫入10萬條數?

所以要保證每秒寫入幾萬甚至幾十萬條數據的核心點,就是盡最大可能提升每條數據寫入的性能,這樣就可以在單位時間內寫入更多的數據量,提升吞吐量。

2、零拷貝技術

 

說完了寫入這塊,再來談談消費這塊。

大家應該都知道,從Kafka里我們經常要消費數據,那么消費的時候實際上就是要從kafka的磁盤文件里讀取某條數據然后發送給下游的消費者,如下圖所示。

那么這里如果頻繁的從磁盤讀數據然后發給消費者,性能瓶頸在哪里呢

 

假設要是kafka什么優化都不做,就是很簡單的從磁盤讀數據發送給下游的消費者,那么大概過程如下所示:

先看看要讀的數據在不在os cache里,如果不在的話就從磁盤文件里讀取數據后放入os cache。

接着從操作系統的os cache里拷貝數據到應用程序進程的緩存里,再從應用程序進程的緩存里拷貝數據到操作系統層面的Socket緩存里,最后從Socket緩存里提取數據后發送到網卡,最后發送出去給下游消費。

整個過程,如下圖所示:

大家看上圖,很明顯可以看到有兩次沒必要的拷貝吧!

一次是從操作系統的cache里拷貝到應用進程的緩存里,接着又從應用程序緩存里拷貝回操作系統的Socket緩存里。

而且為了進行這兩次拷貝,中間還發生了好幾次上下文切換,一會兒是應用程序在執行,一會兒上下文切換到操作系統來執行。

所以這種方式來讀取數據是比較消耗性能的。

Kafka為了解決這個問題,在讀數據的時候是引入零拷貝技術。

也就是說,直接讓操作系統的cache中的數據發送到網卡后傳輸給下游的消費者,中間跳過了兩次拷貝數據的步驟,Socket緩存中僅僅會拷貝一個描述符過去,不會拷貝數據到Socket緩存。

大家看下圖,體會一下這個精妙的過程:

通過零拷貝技術,就不需要把os cache里的數據拷貝到應用緩存,再從應用緩存拷貝到Socket緩存了,兩次拷貝都省略了,所以叫做零拷貝。

對Socket緩存僅僅就是拷貝數據的描述符過去,然后數據就直接從os cache中發送到網卡上去了,這個過程大大的提升了數據消費時讀取文件數據的性能。

而且大家會注意到,在從磁盤讀數據的時候,會先看看os cache內存中是否有,如果有的話,其實讀數據都是直接讀內存的。

如果kafka集群經過良好的調優,大家會發現大量的數據都是直接寫入os cache中,然后讀數據的時候也是從os cache中讀。

相當於是Kafka完全基於內存提供數據的寫和讀了,所以這個整體性能會極其的高。

說個題外話,下回有機會給大家說一下Elasticsearch的架構原理,其實ES底層也是大量基於os cache實現了海量數據的高性能檢索的,跟Kafka原理類似。

3、最后的總結

 

通過這篇文章對kafka底層的頁緩存技術的使用,磁盤順序寫的思路,以及零拷貝技術的運用,大家應該就明白Kafka每台機器在底層對數據進行寫和讀的時候采取的是什么樣的思路,為什么他的性能可以那么高,做到每秒幾十萬的吞吐量。

這種設計思想對我們平時自己設計中間件的架構。

 

二、Kafka如何做到不丟失不重復消費

有很多公司因為業務要求必須保證消息不丟失、不重復的到達,比如無人機實時監控系統,當無人機闖入機場區域,我們必須立刻報警,不允許消息丟失。

而無人機離開禁飛區域后我們需要將及時報警解除。如果消息重復了呢,我們是否需要復雜的邏輯來自己處理消息重復的情況呢,這種情況恐怕相當復雜而難以處理。但是如果我們能保證消息exactly once,那么一切都容易得多。

下面我們來簡單了解一下消息傳遞語義,以及kafka的消息傳遞機制。

首先我們要了解的是message delivery semantic 也就是消息傳遞語義。

這是一個通用的概念,也就是消息傳遞過程中消息傳遞的保證性。

分為三種:

最多一次(at most once): 消息可能丟失也可能被處理,但最多只會被處理一次。

可能丟失 不會重復

至少一次(at least once): 消息不會丟失,但可能被處理多次。

可能重復 不會丟失

精確傳遞一次(exactly once): 消息被處理且只會被處理一次。

不丟失 不重復 就一次

而kafka其實有兩次消息傳遞,一次生產者發送消息給kafka,一次消費者去kafka消費消息。

兩次傳遞都會影響最終結果,

兩次都是精確一次,最終結果才是精確一次。

兩次中有一次會丟失消息,或者有一次會重復,那么最終的結果就是可能丟失或者重復的。

一、Produce端消息傳遞

這是producer端的代碼:

Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));
            System.out.println("testkafka"+i);
        }
        kafkaProducer.close();
 

其中指定了一個參數acks 可以有三個值選擇:

 0: producer完全不管broker的處理結果 回調也就沒有用了 並不能保證消息成功發送 但是這種吞吐量最高

​-1或者all: leader broker會等消息寫入 並且ISR都寫入后 才會響應,這種只要ISR有副本存活就肯定不會丟失,但吞吐量最低。

​ 1: 默認的值 leader broker自己寫入后就響應,不會等待ISR其他的副本寫入,只要leader broker存活就不會丟失,即保證了不丟失,也保證了吞吐量。

所以設置為0時,實現了at most once,而且從這邊看只要保證集群穩定的情況下,不設置為0,消息不會丟失。

但是還有一種情況就是消息成功寫入,而這個時候由於網絡問題producer沒有收到寫入成功的響應,producer就會開啟重試的操作,直到網絡恢復,消息就發送了多次。這就是at least once了。

kafka producer 的參數acks 的默認值為1,所以默認的producer級別是at least once。並不能exactly once。

 

 

二、Consumer端消息傳遞

consumer是靠offset保證消息傳遞的。

consumer消費的代碼如下:

Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        props.put("auto.offset.reset","earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
      try{  
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
         }
        }finally{
          consumer.close();
        }

其中有一個參數是 enable.auto.commit

若設置為true consumer在消費之前提交位移 就實現了at most once

若是消費后提交 就實現了 at least once 默認的配置就是這個。

kafka consumer的參數enable.auto.commit的默認值為true ,所以默認的consumer級別是at least once。也並不能exactly once。

三、精確一次

通過了解producer端與consumer端的設置,我們發現kafka在兩端的默認配置都是at least once,可能重復,通過配置也不能做到exactly once,好像kafka的消息一定會丟失或者重復的,

是不是沒有辦法做到exactly once了呢?

確實在kafka 0.11.0.0版本之前producer端確實是不可能的,

但是在kafka 0.11.0.0版本之后,kafka正式推出了idempotent producer。

也就是冪等的producer還有對事務的支持。

冪等的producer

kafka 0.11.0.0版本引入了idempotent producer機制,在這個機制中同一消息可能被producer發送多次,但是在broker端只會寫入一次,他為每一條消息編號去重,而且對kafka開銷影響不大。

如何設置開啟呢? 需要設置producer端的新參數 enable.idempotent 為true。

而多分區的情況,我們需要保證原子性的寫入多個分區,即寫入到多個分區的消息要么全部成功,要么全部回滾。

這時候就需要使用事務,在producer端設置 transcational.id為一個指定字符串。

這樣冪等producer只能保證單分區上無重復消息;事務可以保證多分區寫入消息的完整性。

這樣producer端實現了exactly once,那么consumer端呢?

consumer端由於可能無法消費事務中所有消息,並且消息可能被刪除,所以事務並不能解決consumer端exactly once的問題,我們可能還是需要自己處理這方面的邏輯。比如自己管理offset的提交,不要自動提交,也是可以實現exactly once的。

還有一個選擇就是使用kafka自己的流處理引擎,也就是Kafka Streams,

設置processing.guarantee=exactly_once,就可以輕松實現exactly once了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM