消費者不需要自行管理 offset(分組+topic+分區),系統通過 broker 將 offset 存放在本地。低版本通過 zk 自行管理。系統自行管理分區和副本情況。消費者斷線后會自動根據上一次記錄的 offset 去獲取數據(默認一分鍾更新一次 offset),同一個分組中的客戶不能同時消費同一個分片。不同的 group 記錄不同的 offset,這樣不同程序讀取同一個 topic 才不會因為 offset 互相影響。
一、消費者組
分區的所有權從一個消費者轉移到另一個消費者,這樣的行為稱為再均衡,再均衡非常重要,它為消費者群組帶來了高可用性和伸縮性(可以放心的添加和刪除消費者),再均衡期間消費者無法讀取消息,造成整個群組一小段時間的不可用。另外,當分區被重新分配給另一個消費者時,消費者當前的讀取狀態會丟失,它有可能還需要去刷新緩存,在它重新恢復狀態之前會拖慢應用程序。通過心跳機制檢測消費者是否活躍。
二、消費方式
如果消費者提交了偏移量卻未能處理完消息,那么就有可能造成消息丟失,這也是消費者丟失消息的主要原因。
【1】Consumer 通過拉(pull)的模式從 broker 中讀取數據【另一種是broker 向消費者推(push)數據】。推(push)模式存在一個問題:消費者的速率與 broker 推的速率不相同時,會導致資源浪費(消費者性能優於broker)和系統崩潰(消費者性能次於broker)。
【2】它的目標是盡可能以最快的速度傳遞消息,但是這樣容易造成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull 模式則可以根據 consumer 的消費能力以適當的速率消費消息。
【3】pull 模式的不足之處是,如果 Kafka 沒有數據,消費者可能會陷入循環中,一直返回空數據。針對此,Kafka 的消費者在消息數據時會傳入一個時長參數,如果當前沒有數據可供消費,Consumer 會等待一段時間之后再返回,這段時長即為 timeout。
三、分區分配策略
【1】一個 Consumer Group 中有多個 consumer,一個 Topic 有多個 partition,所以就會涉及到 partition 的分配問題,即確定那個 partition 由那個 consumer 消費。由 partition.assignment.strategy 屬性決定。Kafka 有兩種策略:RoundRobin 根據組進行輪詢分配【循環】、Range 根據 Topic 進行划分【范圍:默認策略】。
■ Range:該策略會把主題的若干個連續的分區分配給消費者。假設消費者 C1 和消費者 C2 同時訂閱主題 T1 和 T2,並且每個主題有 3個分區。那么消費者 C1 有可能分配到這兩個主題的分區0和分區1,而消費者 C2分配到這兩個主題的分區2.因為每個主題擁有計數個分區,而分區是在主題內獨立完成的,第一個消費者最后分配到的分區比第二個消費者多很多分區。當分區數量不能被消費者數量整除,就會出現這種情況。
■ RoundRobin:把主題的所有分區逐個分配給消費者(交叉分配),消費者之間最多差一個分區。使用此分區策略來給消費者 C1 和 C2 分配分區是,C1 將分到主題T1的分區0和分區2以及主題2的分區1,消費者C2將分配到主題T1的分區1以及主題2的分區0和分區2。
【2】分配分區的過程:在消費者要加入群組時,它會向群組協調器發送一個 JoinGroup 請求。第一個加入群組的消費者將成為“群主”。群主從協調器那里獲得群組的成員列表(列表中包含了所有最近發送過心跳的消費者,它們被認為是活躍的),並負責給每一個消費者分配分區。它使用一個實現了讓 PartitionAssignor 接口的類來決定那個分區應該分配給哪個消費者。分配完畢后,群主把分配情況列表發送給群主協調器,協調器再把這些信息發送給所有消費者,每個消費者只能看到自己的分配信息,只有群主知道群組里所有消費者的分配信息,這個過程會在每次再鈞衡時重復發送。
四、消費者的配置
【1】group.id:如果兩個消費者具有相同的group.id,並且訂閱了同一個主題,那么每個消費者會分到主題分區的一個子集(群組會讀取主題所有的消息)。如果你希望消費者看到主題的所有消息,那么需要為他們設置唯一的 group.id。
【2】auto.offset.reset:這個參數指定了在沒有偏移量可提交時(比如消費者第1次啟動時)或者請求的偏移量在 broker上不存在時,消費者會做些什么。這個參數一般兩種配置,一種是 earliest,如果選擇了這個配置,消費者會從分區的開始位置讀取數據,不管偏移量是否有效,這樣會導致消費者讀取大量的重復數據,但可以保證最少的數據丟失。另一種是 latest(默認值),如果選擇這個配置,消費者會從分區的末尾開始讀取數據,這樣可以減少重復處理消息,但很有可能會錯過一些消息。
【3】enable.auto.commit:這是一個非常重要的配置參數,可以讓消費者基於任務調度自動提交偏移量,也可以在代碼里手動提交偏移量。自動提交的好處是,在實現消費者邏輯時少考慮一些問題。如果在消息者輪詢操作所有的數據,那么自動提交可以保證只提交已經處理過的偏移量。缺點是:無法控制重復處理消息。
【4】auto.commit.interval.ms:與3中的參數有直接聯系。如果選擇了自動提交偏移量,可以通過該參數配置提交的頻度,默認是每5秒提交一次。依賴來說,頻繁提交會增加額外的開銷,但也會降低重復處理消息的概率。
【5】fetch.min.byte:消費者從服務器獲取記錄的最小字節數,broker 在收到消費者的數據請求時,如果可用的數據量小於該配置,那么它會等到有足夠的可用數據時才把它返回給消費者。
【6】fetch.max.wait.ms:指定 broker 的等待時間,默認是 500ms。如果沒有足夠的數據流入 Kafka,消費者獲取最小數據量就得不到滿足,最終導致 500ms 延遲。fetch.min.byte 被設置為 1MB,那么Kafka 在收到消費者的請求后,要么返回 1MB數據,要么在 500ms 后返回所有可用的數據,就看那個條件先得到滿足。
【7】max.partition.fetch.bytes:該屬性指定了服務器從每個分區里返回給消費者的最大字節數。它的默認值是 1MB,如果一個主題有20個分區和5個消費者,那么每個消費者需要至少 4MB 的可用內存來接收記錄。在為消費者分配內存時,可以給他們多分配一些,因為群組里有消費者發生崩潰,剩下的消費者需要處理更多的分區。
【8】session.timeout.ms:指定了消費者在被認為死亡之前可以與服務器斷開連接的時間,默認是3s。如果消費者沒有在指定的時間內發送心跳給群組協調器,就被認為已經死亡,協調器就會觸發再均衡,把它的分區分配給群組里的其他消費者。該屬性與 heartbeat.interval.ms 緊密相關。 heartbeat.interval.ms 指定了 poll() 方法向協調器發送心跳的頻率,session.timeout.ms 則指定了消費者可以多久不發送心跳。一般 hearbeat.interval.ms 是 session.timeout.ms 的三分之一。
【9】client.id:任意字符串,broker 用它來表示從客戶端發送過來的消息,通常被用在日志、度量指標和配額里。
五、消費者組案列
需求:測試同一個消費者組中的消費者,同一時刻只能有一個消費者消費。
【1】在 hadoop102、hadoop103 上修改 kafka/config/consumer.properties 配置文件中的 group.id 屬性為任意組名。
【2】在hadoop102、hadoop103上分別啟動消費者,啟動命令如下:
【3】在 hadoop104 上啟動生產者:
【4】查看 hadoop102 和 hadoop103 的接收者:同一時刻只有一個消費者接收到消息。
六、Kafka 消費者 Java API
創建消費者:在讀取消息之前,需要先創建一個 KafkaConsumer 對象。創建 KafkaConsumer 與創建 KafkaProducer 對象非常相似,將需要傳送的屬性放入 Properties 對象里。有三個必要的屬性:boostrap.servers、key.deserializer 和 value.deserializer props.put 中的 key 值,可以通過 ConsumerConfig.xxx 獲取常量值(推薦)
【1】bootstrap.servers:Kafka 集群的連接字符串。與在 KafkaProducer 中的用途是一樣的。
【2】key.deserializer 與 value.deserializer :使用指定的類把字節數組轉成 Java 對象。
【3】group.id:不是必須的,但是很重要。它指定了 KafkaConsumer 屬於哪一個消息群組。創建不屬於任何一個群組的消費者也是可以的,只是這樣不太常見。
訂閱主題:創建好消費者之后,下一步就是訂閱主題。subscribe() 方法接收一個主題列表作為參數。我們也可以在調用 subscribe() 方法時傳入一個正則表達式。正則表達式可以匹配多個主題,如果有人創建一個新的主題,並且主題的名字與正則表達式匹配,那么會立即觸發一次再均衡,消費者就可以讀取新添加的主題。如果應用程序需要讀取多個主題,並且可以處理不同類型的數據,那么這種方式就很管用。在 KIafka 和其他系統之間復制數據時,使用正則表達式的方式訂閱多個主題是很常見的做法。例如:cust.subscribe("test.*);
輪詢:輪詢(while循環)不只是獲取數據那么簡單。在第一調用一個新消費者的 poll() 方法時,它會負責查找 GroupCoordinator 協調器,然后加入群組,接收分配的分區。如果發生了再均衡,這個過程也是在輪詢期間發生的。當然,心跳也是從輪詢里發送出來的,所以,我們要確保在輪詢期間所做的任何處理工作都應該盡快完成。
七、偏移量
消費者可以使用 Kafka 來追蹤消息在分區里的位置(偏移量),消費者往一個叫做 _consumer_offset 的特殊主題發送消息,消息里包含每個分區的偏移量。如果消費者一直處於運行狀態,那么偏移量就沒有什么用處。如果消費者發生崩潰或者有新的消費者加入群組,就會觸發再均衡,再均衡之后,每個消費者可能分配到新的分區,而不是之前處理的那個。為了能夠繼續之前的工作,消費者需要讀取每個分區最后一次提交的偏移量,然后從偏移量指定的地方繼續處理。如果提交的偏移量小於客戶端處理的最后一個消息的偏移量,就會導致重復處理。如果提交的偏移量大於客戶單處理的最后一個消息的偏移量,就會導致數據丟失。所以,處理偏移量的方式對客戶端有很大的影響。KafkaConsumer API 提供了多種方式來提交偏移量:
【1】自動提交:最簡單的提交方式是讓消費者自動提交偏移量。enable.auto.commit 設置為 true,那么每過 5s,消費者會自動把從 poll() 方法接收到的最大偏移量提交上去。提交的時間間隔由 auto.commit.interval.ms 控制,默認5s。自動提交也是在輪詢中完成。消費者每次在進行輪詢時會檢查是否該提交偏移量了,如果是,就會提交從上一次輪詢返回的偏移量。但可能會造成5秒內的數據被重復處理。
【2】手動提交當前偏移量:開發者通過控制偏移量提交時間來消除丟失消息的可能性。並在發生再均衡時減少重復消息的數量。開發者可以在必要的時候提交當前偏移量,而不是基於時間間隔。把 enable.auto.commit 設為 false,讓應用程序決定何時提交偏移量,使用 commitSync() 提交偏移量最簡單也可靠。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出錯誤。
【3】異步提交:手動提交有一個不足之處,在broker 對提交請求作出回應之前,應用程序會一直阻塞,這樣會限制吞吐量。可以通過降低提交頻率來提升吞吐量,但如果發生再均衡,會增加重復消息的數量。這時就可以使用異步提交 API,只管發送提交,無需等待broker 的響應。commitAsync() 提交最后一個偏移量,然后繼續做其他事。在碰到無法恢復的錯誤之前,commitSync() 會一直重試,但 commitAsync() 不會。原因是因為重試的過程中,可能有一個更大的偏移量提交成功了。但它提供了回調,當 broker 作出響應時會執行回調。回調經常用於記錄提交錯誤或生成度量指標,不過如果要進行重試,那一定要注意順序(對比回調中的偏移量是否與提交的偏移量相等,相等說明沒有新的提交)。
【4】同步和異步組合提交:針對偶爾出現提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那么后續的提交總會有成功的,但如果這是發生在關閉消費者或再均衡前的最后一次提交,就要確保能夠提交成功。因此,在消費者關閉前一般會組合使用 commitSync() 和 commitAsync():
【5】提交特定的偏移量:提交偏移量的頻率與處理消息批次的頻率是一樣的。如果 poll() 方法返回一大批數據,為了避免因再均衡引起的重復處理整批消息,想要在批次中間提交偏移量該怎么辦,這種情況無法通過 commitSync() 或 commitAsync() 來實現,因為它們只會提交最后一個偏移量,而此時該批次里的消息還沒處理完。幸運的是,消費者 API 允許在調用 commitSync() 和 commitAsync() 方法時傳進去希望提交的分區的偏移量的 map。舉例:
八、再均衡監聽器
消費者在退出和進行分區再均衡之前,會做一些清理工作。我們希望在消費者失去對一個分區的所有權之前提交最后一個已處理記錄的偏移量。在為消費者分配新分區或移除舊分區時,可以通過消費者API 執行一些應用程序代碼,在調用 subscribe() 方法時傳入一個 ConsumerRebalanceListener 實例就可以了。該接口有兩個需要實現的方法:
【1】onPartitionsRevoked(Collection<TopicPartition> partitions):會在再均衡開始之前和消費者停止讀取消息之后被調用。如果在這里提交偏移量,下一個接管分區的消費者就知道該從哪里開始讀取消息了。
【2】onPartitionsAssigned(Collection<TopicPartition> partitions):會在重新分配分區之后和消費者開始讀取消息之前調用。
九、如何退出
如果確定要退出循環,需要通過另一個線程調用 consumer.wakeup() 方法。如果循環運行在主線程里,可以在 ShutdownHook 里調用該方法。要記住,consumer.wakeup() 是消費者唯一一個可以從其他線程中安全調用的方法。調用 consumer.wakeup() 可以退出 poll(),並拋出 WakeupException 異常,或者如果調用 consumer.wakeup() 時線程沒有等待輪詢,那么異常會等到下一輪 poll() 時拋出。我們不需要處理 WakeupException,因為它只是調出循環的一種方式。不過,在退出線程之前調用客戶端的關閉 close() 方法時很有必要的。它會提交任何還沒有提交的東西,並向群組協調器發送消息,告知自己要離開群組,接下來就會觸發再均衡,而不需要等到會話超時。
十、反序列化器
【1】生成消息使用的序列化器與讀取消息使用的反序列化器應該是一一對應的。下面就自定義一個反序列化器:
【2】Thrift 序列化框架的使用:鏈接
十一、獨立消費者
現在我們只需要一個消費者從一個主題的所有分區或者某個特定分區讀取數據。此時就不需要消費者群組和再均衡,只需要把主題或者分區分配給消費者,然后開始讀取消息並提交偏移量。如果這樣就不需要訂閱主題,取而代之的是為自己分配分區。一個消費者可以訂閱主題(並加入消費者組),或者為自己分配分區,但不能同時做這兩件事。當確定拉去的分區之后,通過 assign 方法獲取消息。


