kafka學習筆記(二)kafka的基本使用


概述

 第一篇隨筆從消息隊列的定義和各種應用,以及kafka的分類定義和基本知識,第二篇就寫一篇關於kafka的基本實際配置和使用的隨筆,包括kafka的集群參數的配置,生產者使用機制,消費者使用機制。總之我會使用由淺到深,由概括到具體的介紹kafka的每個功能。

kafka集群的配置

這里我只是介紹我認為比較重要的參數,當然也借鑒了網上多次提到的參數配置。嚴格來說這些配置並不單單指 Kafka 服務器端的配置,其中既有 Broker 端參數,也有主題(后面我用我們更熟悉的 Topic 表示)級別的參數、JVM 端參數和操作系統級別的參數。這里所說的 Broker 端參數也被稱為靜態參數(Static Configs),所謂靜態參數,是指你必須在 Kafka 的配置文件 server.properties 中進行設置的參數,不管你是新增、修改還是刪除。同時,你必須重啟 Broker 進程才能令它們生效。而主題級別參數的設置則有所不同,Kafka 提供了專門的 kafka-configs 命令來修改它們。至於 JVM 和操作系統級別參數,它們的設置方法比較通用化,我介紹的也都是標准的配置參數,因此,你應該很容易就能夠對它們進行設置。下面從kafka的broker參數說起。

broker端參數

目前 Kafka Broker 提供了近 200 個參數,這其中絕大部分參數都不用你親自過問。不過今天我打算換個方法,按照大的用途類別一組一組地介紹它們,希望可以更有針對性,也更方便你記憶。

首先 Broker 是需要配置存儲信息的,即 Broker 使用哪些磁盤。那么針對存儲信息的重要參數有以下這么幾個:

log.dirs:這是非常重要的參數,指定了 Broker 需要使用的若干個文件目錄路徑。要知道這個參數是沒有默認值的,這說明什么?這說明它必須由你親自指定。

log.dir:注意這是 dir,結尾沒有 s,說明它只能表示單個路徑,它是補充上一個參數用的。

這兩個參數應該怎么設置呢?很簡單,你只要設置log.dirs,即第一個參數就好了,不要設置log.dir。而且更重要的是,在線上生產環境中一定要為log.dirs配置多個路徑,具體格式是一個 CSV 格式,也就是用逗號分隔的多個路徑,比如/home/kafka1,/home/kafka2,/home/kafka3這樣。如果有條件的話你最好保證這些目錄掛載到不同的物理磁盤上。這樣做有兩個好處:

1,提升讀寫性能:比起單塊磁盤,多塊物理磁盤同時讀寫數據有更高的吞吐量。

2,能夠實現故障轉移:即 Failover。這是 Kafka 1.1 版本新引入的強大功能。要知道在以前,只要 Kafka Broker 使用的任何一塊磁盤掛掉了,整個 Broker 進程都會關閉。但是自 1.1 開始,這種情況被修正了,壞掉的磁盤上的數據會自動地轉移到其他正常的磁盤上,而且 Broker 還能正常工作。還記得上一期我們關於 Kafka 是否需要使用 RAID 的討論嗎?這個改進正是我們舍棄 RAID 方案的基礎:沒有這種 Failover 的話,我們只能依靠 RAID 來提供保障。

下面說一說kafka和zookeeper相關的重要的參數當屬zookeeper.connect。這也是一個 CSV 格式的參數,比如我可以指定它的值為zk1:2181,zk2:2181,zk3:2181。2181 是 ZooKeeper 的默認端口。現在問題來了,如果我讓多個 Kafka 集群使用同一套 ZooKeeper 集群,那么這個參數應該怎么設置呢?這時候 chroot 就派上用場了。這個 chroot 是 ZooKeeper 的概念,類似於別名。如果你有兩套 Kafka 集群,假設分別叫它們 kafka1 和 kafka2,那么兩套集群的zookeeper.connect參數可以這樣指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2。

第三組參數就是與kafka連接相關的參數,即客戶端程序或者其他Broker如何與該broker進行通信的設置。有一下三個參數:

listeners:學名叫監聽器,其實就是告訴外部連接者要通過什么協議訪問指定主機名和端口開放的 Kafka 服務。

advertised.listeners:和 listeners 相比多了個 advertised。Advertised 的含義表示宣稱的、公布的,就是說這組監聽器是 Broker 用於對外發布的。

host.name/port:列出這兩個參數就是想說你把它們忘掉吧,壓根不要為它們指定值,畢竟都是過期的參數了。

我們具體說說監聽器的概念,從構成上來說,它是若干個逗號分隔的三元組,每個三元組的格式為<協議名稱,主機名,端口號>。這里的協議名稱可能是標准的名字,比如 PLAINTEXT 表示明文傳輸、SSL 表示使用 SSL 或 TLS 加密傳輸等;也可能是你自己定義的協議名字,比如CONTROLLER: //localhost:9092。一旦你自己定義了協議名稱,你必須還要指定listener.security.protocol.map參數告訴這個協議底層使用了哪種安全協議,比如指定listener.security.protocol.map=CONTROLLER:PLAINTEXT表示CONTROLLER這個自定義協議底層使用明文不加密傳輸數據。

常見的玩法是:你的Kafka Broker機器上配置了雙網卡,一塊網卡用於內網訪問(即我們常說的內網IP);另一個塊用於外網訪問。那么你可以配置listeners為內網IP,advertised.listeners為外網IP。

第四組參數是關於Topic管理的,我來講講下面這三個參數:

auto.create.topics.enable:是否允許自動創建 Topic。auto.create.topics.enable參數我建議最好設置成 false,即不允許自動創建 Topic。在我們的線上環境里面有很多名字稀奇古怪的 Topic,我想大概都是因為該參數被設置成了 true 的緣故。

unclean.leader.election.enable:是否允許 Unclean Leader 選舉。unclean.leader.election.enable是關閉 Unclean Leader 選舉的。何謂 Unclean?還記得 Kafka 有多個副本這件事嗎?每個分區都有多個副本來提供高可用。在這些副本中只能有一個副本對外提供服務,即所謂的 Leader 副本。那么問題來了,這些副本都有資格競爭 Leader 嗎?顯然不是,只有保存數據比較多的那些副本才有資格競選,那些落后進度太多的副本沒資格做這件事。反之如果是 true,那么 Kafka 允許你從那些“跑得慢”的副本中選一個出來當 Leader。這樣做的后果是數據有可能就丟失了,因為這些副本保存的數據本來就不全,當了 Leader 之后它本人就變得膨脹了,認為自己的數據才是權威的。這個參數在最新版的 Kafka 中默認就是 false。

auto.leader.rebalance.enable:是否允許定期進行 Leader 選舉。但其實對生產環境影響非常大。設置它的值為 true 表示允許 Kafka 定期地對一些 Topic 分區進行 Leader 重選舉,當然這個重選舉不是無腦進行的,它要滿足一定的條件才會發生。嚴格來說它與上一個參數中 Leader 選舉的最大不同在於,它不是選 Leader,而是換 Leader!比如 Leader A 一直表現得很好,但若auto.leader.rebalance.enable=true,那么有可能一段時間后 Leader A 就要被強行卸任換成 Leader B。你要知道換一次 Leader 代價很高的,原本向 A 發送請求的所有客戶端都要切換成向 B 發送請求,而且這種換 Leader 本質上沒有任何性能收益,因此我建議你在生產環境中把這個參數設置成 false。

最后一組參數是數據留存方面的,即:

log.retention.{hours|minutes|ms}:這是個“三兄弟”,都是控制一條消息數據被保存多長時間。從優先級上來說 ms 設置最高、minutes 次之、hours 最低。

log.retention.bytes:這是指定 Broker 為消息保存的總磁盤容量大小。

message.max.bytes:控制 Broker 能夠接收的最大消息大小。

Topic參數設置

Topic 級別參數會覆蓋全局 Broker 參數的值,而每個 Topic 都能設置自己的參數值,這就是所謂的 Topic 級別參數。

下面我們依然按照用途分組的方式引出重要的 Topic 級別參數。從保存消息方面來考量的話,下面這組參數是非常重要的:

retention.ms:規定了該 Topic 消息被保存的時長。默認是 7 天,即該 Topic 只保存最近 7 天的消息。一旦設置了這個值,它會覆蓋掉 Broker 端的全局參數值。

retention.bytes:規定了要為該 Topic 預留多大的磁盤空間。和全局參數作用相似,這個值通常在多租戶的 Kafka 集群中會有用武之地。當前默認值是 -1,表示可以無限使用磁盤空間。

topic級別的參數有很多種,現在我們根據創建topic時設置的參數和修改 topic時設置的參數來學習一下這個級別的參數的設置:

我們先來看看如何在創建 Topic 時設置這些參數。我用上面提到的retention.ms和max.message.bytes舉例。設想你的部門需要將交易數據發送到 Kafka 進行處理,需要保存最近半年的交易數據,同時這些數據很大,通常都有幾 MB,但一般不會超過 5MB。現在讓我們用以下命令來創建 Topic:

1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880

下面看看使用另一個自帶的命令kafka-configs來修改 Topic 級別參數。假設我們現在要發送最大值是 10MB 的消息,該如何修改呢?命令如下:

1 bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760

JVM級別參數設置

說到 JVM 端設置,堆大小這個參數至關重要。雖然在后面我們還會討論如何調優 Kafka 性能的問題,但現在我想無腦給出一個通用的建議:將你的 JVM 堆大小設置成 6GB 吧,這是目前業界比較公認的一個合理值。JVM 端配置的另一個重要參數就是垃圾回收器的設置,也就是平時常說的 GC 設置。如果你依然在使用 Java 7,那么可以根據以下法則選擇合適的垃圾回收器:

如果 Broker 所在機器的 CPU 資源非常充裕,建議使用 CMS 收集器。啟用方法是指定-XX:+UseCurrentMarkSweepGC。否則,使用吞吐量收集器。開啟方法是指定-XX:+UseParallelGC。

當然了,如果你已經在使用 Java 8 了,那么就用默認的 G1 收集器就好了。在沒有任何調優的情況下,G1 表現得要比 CMS 出色,主要體現在更少的 Full GC,需要調整的參數更少等,所以使用 G1 就好了。

操作系統參數

最后我們來聊聊 Kafka 集群通常都需要設置哪些操作系統參數。通常情況下,Kafka 並不需要設置太多的 OS 參數,但有些因素最好還是關注一下,比如下面這幾個:

文件描述符限制

文件系統類型

Swappiness

提交時間

首先是ulimit -n。我覺得任何一個 Java 項目最好都調整下這個值。實際上,文件描述符系統資源並不像我們想象的那樣昂貴,你不用太擔心調大此值會有什么不利的影響。通常情況下將它設置成一個超大的值是合理的做法,比如ulimit -n 1000000。其次是文件系統類型的選擇。這里所說的文件系統指的是如 ext3、ext4 或 XFS 這樣的日志型文件系統。根據官網的測試報告,XFS 的性能要強於 ext4,所以生產環境最好還是使用 XFS。第三是 swap 的調優。網上很多文章都提到設置其為 0,將 swap 完全禁掉以防止 Kafka 進程使用 swap 空間。我個人反倒覺得還是不要設置成 0 比較好,我們可以設置成一個較小的值。為什么呢?因為一旦設置成 0,當物理內存耗盡時,操作系統會觸發 OOM killer 這個組件,它會隨機挑選一個進程然后 kill 掉,即根本不給用戶任何的預警。但如果設置成一個比較小的值,當開始使用 swap 空間時,你至少能夠觀測到 Broker 性能開始出現急劇下降,從而給你進一步調優和診斷問題的時間。基於這個考慮,我個人建議將 swappniess 配置成一個接近 0 但不為 0 的值,比如 1。最后是提交時間或者說是 Flush 落盤時間。向 Kafka 發送數據並不是真要等數據被寫入磁盤才會認為成功,而是只要數據被寫入到操作系統的頁緩存(Page Cache)上就可以了,隨后操作系統根據 LRU 算法會定期將頁緩存上的“臟”數據落盤到物理磁盤上。這個定期就是由提交時間來確定的,默認是 5 秒。一般情況下我們會認為這個時間太頻繁了,可以適當地增加提交間隔來降低物理磁盤的寫操作。當然你可能會有這樣的疑問:如果在頁緩存中的數據在寫入到磁盤前機器宕機了,那豈不是數據就丟失了。的確,這種情況數據確實就丟失了,但鑒於 Kafka 在軟件層面已經提供了多副本的冗余機制,因此這里稍微拉大提交間隔去換取性能還是一個合理的做法。

生產者基本使用

kafka發送消息的步驟:

大體上來說,用戶首先構建待發送的消息對象ProducerRecord,然后調用KafkaProducer#send方法進行發送。KafkaProducer接收到消息后首先對其進行序列化,然后結合本地緩存的元數據信息一起發送給partitioner去確定目標分區,最后追加寫入到內存中的消息緩沖池(accumulator)。此時KafkaProducer#send方法成功返回。

KafkaProducer中還有一個專門的Sender IO線程負責將緩沖池中的消息分批次發送給對應的broker,完成真正的消息發送邏輯。

結合源代碼,筆者認為新版本的producer從設計上來說具有以下幾個特點(或者說是優勢):

總共創建兩個線程:執行KafkaPrducer#send邏輯的線程——我們稱之為“用戶主線程”;執行發送邏輯的IO線程——我們稱之為“Sender線程”

不同於Scala老版本的producer,新版本producer完全異步發送消息,並提供了回調機制(callback)供用戶判斷消息是否成功發送

batching機制——“分批發送“機制。每個批次(batch)中包含了若干個PRODUCE請求,因此具有更高的吞吐量

更加合理的默認分區策略:對於無key消息而言,Scala版本分區策略是一段時間內(默認是10分鍾)將消息發往固定的目標分區,這容易造成消息分布的不均勻,而新版本的producer采用輪詢的方式均勻地將消息分發到不同的分區

底層統一使用基於Selector的網絡客戶端實現,結合Java提供的Future實現完整地提供了更加健壯和優雅的生命周期管理。

producer創建時會創建一個默認32MB(由buffer.memory參數指定)的accumulator緩沖區,專門保存待發送的消息。除了之前在“關鍵參數”段落中提到的linger.ms和batch.size等參數之外,該數據結構中還包含了一個特別重要的集合信息:消息批次信息(batches)。該集合本質上是一個HashMap,里面分別保存了每個topic分區下的batch隊列,即前面說的批次是按照topic分區進行分組的。這樣發往不同分區的消息保存在對應分區下的batch隊列中。舉個簡單的例子,假設消息M1, M2被發送到test的0分區但屬於不同的batch,M3分送到test的1分區,那么batches中包含的信息就是:{"test-0" -> [batch1, batch2], "test-1" -> [batch3]}

單個topic分區下的batch隊列中保存的是若干個消息批次。每個batch中最重要的3個組件包括:

compressor: 負責執行追加寫入操作

batch緩沖區:由batch.size參數控制,消息被真正追加寫入到的地方

thunks:保存消息回調邏輯的集合

 這一步的目的就是將待發送的消息寫入消息緩沖池中,具體流程如下圖所示:

okay!這一步執行完畢之后理論上講KafkaProducer.send方法就執行完畢了,用戶主線程所做的事情就是等待Sender線程發送消息並執行返回結果了。

此時,該Sender線程登場了。嚴格來說,Sender線程自KafkaProducer創建后就一直都在運行着 。它的工作流程基本上是這樣的:

不斷輪詢緩沖區尋找已做好發送准備的分區 

將輪詢獲得的各個batch按照目標分區所在的leader broker進行分組

將分組后的batch通過底層創建的Socket連接發送給各個broker

等待服務器端發送response回來

為了說明上的方便,我還是基於圖的方式來解釋Sender線程的工作原理:

上圖中Sender線程會發送PRODUCE請求給對應的broker,broker處理完畢之后發送對應的PRODUCE response。一旦Sender線程接收到response將依次(按照消息發送順序)調用batch中的回調方法,如下圖所示:

生產者基本參數設置

acks和timeout.ms

timeout.ms參數0.9.0已經被棄用,本質上與acks的配置相匹配---如果在指定時間內沒有收到同步副本的確認,那么broker就會返回一個錯誤。

acks=0,生產者在成功寫入消息之前不會等待任何來自服務器的響應。也就是說,如果當中出現了問題,導致服務器沒有收到消息,那么生產者就無從得知,消息也就丟失了。不過,因為生產者不需要等待服務器的響應,所以它可以以網絡能夠支持的最大速度發送消息,從而達到很高的吞吐量。

acks=1,只要集群的 Leader 節點收到消息,生產者就會收到一個來自服務器的成功響應。如果消息無法到達 Leader 節點(比如首領節點崩潰,新的 Leader 還沒有被選舉出來),生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發消息。不過,如果一個沒有收到消息的節點成為新Leader,消息還是會丟失。這個時候的吞吐量取決於使用的是同步發送還是異步發送。如果讓發送客戶端等待服務器的響應(通過調用 Future 對象的 get() 方法),顯然會增加延遲(在網絡上傳輸一個來回的延遲)。如果客戶端使用回調,延遲問題就可以得到緩解,不過吞吐量還是會受發送中消息數量的限制(比如,生產者在收到服務器響應之前可以發送多少個消息)。

如果 acks=all,只有當所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。這種模式是最安全的,它可以保證不止一個服務器收到消息,就算有服務器發生崩潰,整個集群仍然可以運行。不過,它的延遲比 acks=1 時更高,因為我們要等待不只一個服務器節點接收消息。

buffer.memory=33554432

該參數用來設置生產者內存緩沖區的大小,生產者用它緩沖要發送到服務器的消息。如果生產消息的速度超過發送的速度,會導致生產者空間不足。這個時候, send()方法調用要么被阻塞,要么拋出異常,取決於如何設置 block.on.buffer.full 參數(在 0.9.0.0 版本里被替換成了max.block.ms,表示在拋出異常之前可以阻塞一段時間)。

compression.type=none
默認情況下,消息發送時不會被壓縮。該參數可以設置為 snappygziplz4,它指定了消息被發送給 broker 之前使用哪一種壓縮算法進行壓縮。

snappy 壓縮算法由 Google 發明,占用較少的 CPU,卻能提供較好的性能和相當可觀的壓縮比,如果比較關注性能和網絡帶寬,可以使用這種算法。

gzip 壓縮算法一般會占用較多的 CPU,但會提供更高的壓縮比,所以如果網絡帶寬比較有限,可以使用這種算法。

使用壓縮可以降低網絡傳輸開銷和存儲開銷,而這往往是向 Kafka 發送消息的瓶頸所在。

retries 和 retry.backoff.ms

retries=0
生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區找不到 Leader)。在這種情況下,retries
參數的值決定了生產者可以重發消息的次數,如果達到這個次數,生產者會放棄重試並返回錯誤。

retry.backoff.ms=100
默認情況下,生產者會在每次重試之間等待 100ms,不過可以通過 retry.backoff.ms 參數來改變這個時間間隔。建議在設置重試次數和重試時間間隔之前,先測試一下恢復一個崩潰節點需要多少時間(比如所有分區選舉出 Leader 需要多長時間),讓總的重試時間比 Kafka 集群從崩潰中恢復的時間長,否則生產者會過早地放棄重試。

不過有些錯誤不是臨時性錯誤,沒辦法通過重試來解決(比如“消息太大”錯誤)。一般情況下,因為生產者會自動進行重試,所以就沒必要在代碼邏輯里處理那些可重試的錯誤。你只需要處理那些不可重試的錯誤和重試次數超出上限的情況。

batch.size 和 linger.ms

batch.size:=16384
當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算(而不是消息個數)。

linger.ms:=0
指定了生產者在每次發送消息的時間間隔

當批次被填滿 或者 等待時間達到 linger.ms設置的間隔時間,批次里的所有消息會被發送出去,哪怕此時該批次只有一條消息。

所以就算把

批次大小設置得很大,也不會造成延遲,只是會占用更多的內存而已。但如果設置得太小,因為生產者需要更頻繁地發送消息,會增加一些額外的開銷。

client.id=''
該參數可以是任意的字符串,服務器會用它來識別消息的來源

max.in.flight.requests.per.connection=5
該參數指定了生產者在收到服務器響應之前可以發送多少個消息。它的值越高,就會占用越多的內存,不過也會提升吞吐量。把它設為 1 可以保證消息是按照發送的順序寫入服務器的,即使發生了重試。

如何保證順序性:如果把 retries 設為非零整數,同時把 max.in.flight.requests.per.connection 設為比 1 大的數,那么,如果第一個批次消息寫入失敗,而第二個批次寫入成功,broker 會重試寫入第一個批次。如果此時第一個批次也寫入成功,那么兩個批次的順序就反過來了。

一般來說,如果某些場景要求消息是有序的,那么消息是否寫入成功也是很關鍵的,所以不建議把retries設為 0。可以把 max.in.flight.requests.per.connection 設為 1,這樣在生產者嘗試發送第一批消息時,就不會有其他的消息發送給broker。不過這樣會嚴重影響生產者的吞吐量,所以只有在對消息的順序有嚴格要求的情況下才能這么做。

request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms=305000
指定了生產者在發送數據時等待服務器返回響應的時間

metadata.fetch.timeout.ms (0.9.0.0版本中就被棄用)
指定了生產者在獲取元數據(比如目標分區的 Leader 是誰)時等待服務器返回響應的時間。如果等待響應超時,那么生產者要么重試發送數據,要么返回一個錯誤(拋出異常或執行回調)。

max.request.size=1048576
該參數用於控制生產者發送的請求大小。它可以指能發送的單個消息的最大值,也可以指單個請求里所有消息總的大小。例如,假設這個值為 1MB,那么可以發送的單個最大消息為 1MB,或者生產者可以在單個請求里發送一個批次,該批次包含了 1000 個消息,每個消息大小為 1KB。另外,broker 對可接收的消息最大值也有自己的限制(message.max.bytes),所以兩邊的配置最好可以匹配,避免生產者發送的消息被 broker 拒絕。

注意區分 batch.size只是針對一個 topic 的 partition,而 max.request.size針對單次請求的。

receive.buffer.bytes=32768 和 send.buffer.bytes=131072
這兩個參數分別指定了 TCP socket 接收和發送數據包的緩沖區大小。如果它們被設為 -1,就使用操作系統的默認值。如果生產者或消費者與 broker 處於不同的數據中心,那么可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。

關於更多的配置信息,可以查看:http://kafka.apachecn.org/documentation.html#configuration

消費者基本使用

consumer group是kafka提供的可擴展且具有容錯性的消費者機制。它是由一個或者多個消費者組成,它們共享同一個Group ID.
組內的所有消費者協調在一起來消費訂閱主題(subscribed topics)的所有分區(partition)。當然,每個分區只能由同一個消費組內的一個consumer來消費。

consummer group有以下的特性:

consumer group下可以有一個或多個consumer instance,consumer
instance可以是一個進程,也可以是一個線程(所以消費者可以采用多線程的方式去消費消息)

group.id是一個字符串,唯一標識一個consumer group

consumer group下訂閱的topic下的每個分區只能分配給某個group下的一個consumer(當然該分區還可以被分配給其他group)

消費者位置
消費者位置,即位移。 消費者在消費的過程中需要記錄自己消費了多少數據。
位移提交有自動、手動兩種方式進行位移提交。

自動提交:在kafka拉取到數據之后就直接提交,這樣很容易丟失數據

手動提交:成功拉取數據之后,對數據進行相應的處理之后再進行提交。如拉取數據之后進行寫入mysql這種 (存在數據處理失敗的可能性),
所以這時我們就需要進行手動提交kafka的offset下標。

reblance機制

rebalance本質上是一種協議,規定了一個consumer group下的所有consumer如何達成一致來分配訂閱topic的每個分區。

Kafka提供了一個角色:coordinator來執行對於consumer group的管理。
Group Coordinator是一個服務,每個Broker在啟動的時候都會啟動一個該服務。Group Coordinator的作用是用來存儲Group的相關Meta信息,並將對應Partition的Offset信息記錄到Kafka內置Topic(__consumer_offsets)中。

Rebalance 過程分為兩步:Join 和 Sync。
Join 顧名思義就是加入組。這一步中,所有成員都向coordinator發送JoinGroup請求,請求加入消費組。一旦所有成員都發送了JoinGroup請求,coordinator會從中選擇一個consumer擔任leader的角色,並把組成員信息以及訂閱信息發給leader——注意leader和coordinator不是一個概念。leader負責消費分配方案的制定。

 Sync,這一步leader開始分配消費方案,即哪個consumer負責消費哪些topic的哪些partition。一旦完成分配,leader會將這個方案封裝進SyncGroup請求中發給coordinator,非leader也會發SyncGroup請求,只是內容為空。coordinator接收到分配方案之后會把方案塞進SyncGroup的response中發給各個consumer。這樣組內的所有成員就都知道自己應該消費哪些分區了。

消費者常用配置

我們都應該知道最全最全的文檔應該是來自官網雖然有時候可能官網找不到):http://kafka.apachecn.org/documentation.html#newconsumerconfigs

fetch.min.bytes

該屬性指定了消費者從服務器獲取記錄的最小字節數。

fetch.max.wait.ms

等到有足夠的數據時才把它返回給消費者。feth.max.wait.ms 則用於指定 broker 的等待時間,默認是 500ms。如果沒有足夠的數據流入 Kafka,消費者獲取最小數據量的要求就得不到滿足,最終導致 500ms 的延遲。如果要降低潛在的延遲(為了滿足 SLA),可以把該參數值設置得小一些。如果 fetch.max.wait.ms 被設為 100ms,並且fetch.min.bytes 被設為 1MB,那么 Kafka 在收到消費者的請求后,要么返回 1MB 數據,要么在100ms 后返回所有可用的數據,就看哪個條件先得到滿足。

max.partition.fetch.bytes

該屬性指定了服務器從每個分區里返回給消費者的最大字節數。它的默認值是 1MB,也就是說,KafkaConsumer.poll() 方法從每個分區里返回的記錄最多不超過 max.partition.fetch.bytes指定的字節。如果一個主題有 20 個分區和 5 個消費者,那么每個消費者需要至少 4MB 的可用內存來接收記錄。在為消費者分配內存時,可以給它們多分配一些,因為如果群組里有消費者發生崩潰,剩下的消費者需要處理更多的分區。
max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節數(通過 max.message.size 屬性配置)大,否則消費者可能無法讀取這些消息,導致消費者一直掛起重試。在設置該屬性時,另一個需要考慮的因素是消費者處理數據的時間。消費者需要頻繁調用poll() 方法來避免會話過期和發生分區再均衡,如果單次調用 poll() 返回的數據太多,消費者需要更多的時間來處理,可能無法及時進行下一個輪詢來避免會話過期。
如果出現這種情況,可以把max.partition.fetch.bytes 值改小,或者延長會話過期時間。

session.timeout.ms
該屬性指定了消費者在被認為死亡之前可以與服務器斷開連接的時間,默認是 3s。如果消費者沒有在session.timeout.ms 指定的時間內發送心跳給群組協調器,就被認為已經死亡,協調器就會觸發再均衡,把它的分區分配給群組里的其他消費者。該屬性與 heartbeat.interval.ms 緊密相關。

heartbeat.interval.ms

指定了 poll() 方法向協調器發送心跳的頻率,session.timeout.ms 則指定了消費者可以多久不發送心跳。所以,一般需要同時修改這兩個屬性,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 應該是 1s。把session.timeout.ms 值設得比默認值小,可以更快地檢測和恢復崩潰的節點,不過長時間的輪詢或垃圾收集可能導致非預期的再均衡。把該屬性的值設置得大一些,可以減少意外的再均衡,不過檢測節點崩潰需要更長的時間。

auto.offset.reset

(因消費者長時間失效,包含偏移量的記錄已經過時並被刪除)該作何處理。它的默認值是 latest,意思是說,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄)。另一個值是earliest,意思是說,在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄。

enable.auto.commit

我們稍后將介紹幾種不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,默認值是true。為了盡量避免出現重復數據和數據丟失,可以把它設為 false,由自己控制何時提交偏移量。如果把它設為true,還可以通過配置 auto.commit.interval.ms 屬性來控制提交的頻率。

partition.assignment.strategy(這部分好像重復了 ~~~)

我們知道,分區會被分配給群組里的消費者。PartitionAssignor 根據給定的消費者和主題,決定哪些分區應該被分配給哪個消費者。

Kafka 有兩個默認的分配策略。

Range
  該策略會把主題的若干個連續的分區分配給消費者。假設消費者 C1 和消費者 C2 同時訂閱了主題T1 和 主題 T2,並且每個主題有 3 個分區。那么消費者 C1 有可能分配到這兩個主題的分區 0 和分區 1,而消費者 C2 分配到這兩個主題的分區 2。因為每個主題擁有奇數個分區,而分配是在主題內獨立完成的,第一個消費者最后分配到比第二個消費者更多的分區。只要使用了 Range 策略,而且分區數量無法被消費者數量整除,就會出現這種情況。

RoundRobin
  該策略把主題的所有分區逐個分配給消費者。如果使用 RoundRobin 策略來給消費者 C1 和消費者C2 分配分區,那么消費者 C1 將分到主題 T1 的分區 0 和分區 2 以及主題 T2 的分區 1,消費者 C2 將分配到主題 T1 的分區 1 以及主題 T2 的分區 0 和分區 2。一般來說,如果所有消費者都訂閱相同的主題(這種情況很常見),RoundRobin 策略會給所有消費者分配相同數量的分區(或最多就差一個分區)。可以通過設置 partition.assignment.strategy 來選擇分區策略。
默認使用的是org.apache.kafka.clients.consumer.RangeAssignor,這個類實現了 Range 策略,不過也可以把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor。我們還可以使用自定義策略,在這種情況下,partition.assignment.strategy 屬性的值就是自定義類的名字。

client.id

該屬性可以是任意字符串,broker 用它來標識從客戶端發送過來的消息,通常被用在日志、度量指標和配額里。

max.poll.records

該屬性用於控制單次調用 call() 方法能夠返回的記錄數量,可以幫你控制在輪詢里需要處理的數據量。

receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫數據時用到的 TCP 緩沖區也可以設置大小。如果它們被設為 -1,就使用操作系統的默認值。如果生產者或消費者與 broker 處於不同的數據中心內,可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。

總結

以后關於kafka系列的總結大部分來自Geek Time的課件,大家可以自行關鍵字搜索。

https://www.cnblogs.com/huxi2b/p/6364613.html

https://blog.csdn.net/weixin_40990818/article/details/107848167

https://blog.csdn.net/u012501054/article/details/80241921

https://zhuanlan.zhihu.com/p/154446216

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM