kafka生產消費模型
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("shuaige", Integer.toString(i), Integer.toString(i))); producer.close();
- 生產者的緩沖空間池保留尚未發送到服務器的消息,后台I/O線程負責將這些消息轉換成請求發送到集群。如果使用后不關閉生產者,則會泄露這些資源。
-
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
- send()方法是異步的,添加消息到緩沖區等待發送,並立即返回。生產者將單個的消息批量在一起發送來提高效率。回調參數可選。send是異步的,並且一旦消息被保存在等待發送的消息緩存中,此方法就立即返回。這樣並行發送多條消息而不阻塞去等待每一條消息的響應。發送的結果是一個RecordMetadata,它指定了消息發送的分區,分配的offset和消息的時間戳。
- ack是判別請求是否為完整的條件(就是是判斷是不是成功發送了)。我們指定了“all”將會阻塞消息,這種設置性能最低,但是是最可靠的。
- retries,如果請求失敗,生產者會自動重試,我們指定是0次,如果啟用重試,則會有重復消息的可能性。(???)
- producer(生產者)緩存每個分區未發送消息。緩存的大小是通過 batch.size 配置指定的。值較大的話將會產生更大的批。並需要更多的內存(因為每個“活躍”的分區都有1個緩沖區)。
- 默認緩沖可立即發送,即遍緩沖空間還沒有滿,但是,如果你想減少請求的數量,可以設置linger.ms大於0。這將指示生產者發送請求之前等待一段時間,希望更多的消息填補到未滿的批中。這類似於TCP的算法,例如上面的代碼段,可能100條消息在一個請求發送,因為我們設置了linger(逗留)時間為1毫秒,然后,如果我們沒有填滿緩沖區,這個設置將增加1毫秒的延遲請求以等待更多的消息。需要注意的是,在高負載下,相近的時間一般也會組成批,即使是 linger.ms=0。在不處於高負載的情況下,如果設置比0大,以少量的延遲代價換取更少的,更有效的請求。
- buffer.memory 控制生產者可用的緩存總量,如果消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。當緩存空間耗盡,其他發送調用將被阻塞,阻塞時間的閾值通過max.block.ms設定,之后它將拋出一個TimeoutException。
- key.serializer和value.serializer示例,將用戶提供的key和value對象ProducerRecord轉換成字節,你可以使用附帶的ByteArraySerializaer或StringSerializer處理簡單的string或byte類型。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test2")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
- 偏移量和消費者的位置
kafka為分區中的每條消息保存一個偏移量(offset),這個偏移量是該分區中一條消息的唯一標示符。也表示消費者在分區的位置。例如,一個位置是5的消費者(說明已經消費了0到4的消息),下一個接收消息的偏移量為5的消息。實際上有兩個與消費者相關的“位置”概念:
消費者的位置給出了下一條記錄的偏移量。它比消費者在該分區中看到的最大偏移量要大一個。 它在每次消費者在調用poll(long)中接收消息時自動增長。
“已提交”的位置是已安全保存的最后偏移量,如果進程失敗或重新啟動時,消費者將恢復到這個偏移量。消費者可以選擇定期自動提交偏移量,也可以選擇通過調用commit API來手動的控制(如:commitSync 和 commitAsync)。上述代碼就是自動提交偏移量。
這個區別是消費者來控制一條消息什么時候才被認為是已被消費的,控制權在消費者,下面我們進一步更詳細地討論。
- 消費者組和主題訂閱
Kafka的消費者組概念,通過進程池瓜分消費和處理消息的工作。這些進程可以在同一台機器運行,也可分布到多台機器上,增加可擴展性和容錯性,相同group.id的消費者將視為同一個消費者組。
分組中的每個消費者通過subscribe API動態的訂閱一個topic列表。kafka將已訂閱topic的消息發送到每個消費者組中。並通過平衡分區在消費者分組中所有成員之間來達到平均。因此每個分區恰好地分配1個消費者(一個消費者組中)。所有如果一個topic有4個分區,並且一個消費者分組有2個消費者。那么每個消費者消費2個分區。
消費者組的成員是動態維護的:如果一個消費者故障。分配給它的分區將重新分配給同一個分組中其他的消費者。同樣的,如果一個新的消費者加入到分組,將從現有消費者中移一個給它。這被稱為重新平衡分組,並在下面更詳細地討論。 當新分區添加到訂閱的topic時,或者當創建與訂閱的正則表達式匹配的新topic時,也將重新平衡。將通過定時刷新自動發現新的分區,並將其分配給分組的成員。
從概念上講,你可以將消費者分組看作是由多個進程組成的單一邏輯訂閱者。作為一個多訂閱系統,Kafka支持對於給定topic任何數量的消費者組,而不重復。
這是在消息系統中常見的功能的略微概括。所有進程都將是單個消費者分組的一部分(類似傳統消息傳遞系統中的隊列的語義),因此消息傳遞就像隊列一樣,在組中平衡。與傳統的消息系統不同的是,雖然,你可以有多個這樣的組。但每個進程都有自己的消費者組(類似於傳統消息系統中pub-sub的語義),因此每個進程都會訂閱到該主題的所有消息。
此外,當分組重新分配自動發生時,可以通過ConsumerRebalanceListener通知消費者,這允許他們完成必要的應用程序級邏輯,例如狀態清除,手動偏移提交等。有關更多詳細信息,請參閱Kafka存儲的偏移。
它也允許消費者通過使用assign(Collection)手動分配指定分區,如果使用手動指定分配分區,那么動態分區分配和協調消費者組將失效。
- 發現消費者故障
訂閱一組topic后,當調用poll(long)時,消費者將自動加入到組中。只要持續的調用poll,消費者將一直保持可用,並繼續從分配的分區中接收消息。此外,消費者向服務器定時發送心跳。 如果消費者崩潰或無法在session.timeout.ms配置的時間內發送心跳,則消費者將被視為死亡,並且其分區將被重新分配。
還有一種可能,消費可能遇到“活鎖”的情況,它持續的發送心跳,但是沒有處理。為了預防消費者在這種情況下一直持有分區,我們使用max.poll.interval.ms活躍檢測機制。 在此基礎上,如果你調用的poll的頻率大於最大間隔,則客戶端將主動地離開組,以便其他消費者接管該分區。 發生這種情況時,你會看到offset提交失敗(調用commitSync()引發的CommitFailedException)。這是一種安全機制,保障只有活動成員能夠提交offset。所以要留在組中,你必須持續調用poll。
消費者提供兩個配置設置來控制poll循環:
-
max.poll.interval.ms:增大poll的間隔,可以為消費者提供更多的時間去處理返回的消息(調用poll(long)返回的消息,通常返回的消息都是一批)。缺點是此值越大將會延遲組重新平衡。
-
max.poll.records:此設置限制每次調用poll返回的消息數,這樣可以更容易的預測每次poll間隔要處理的最大值。通過調整此值,可以減少poll間隔,減少重新平衡分組的
對於消息處理時間不可預測地的情況,這些選項是不夠的。 處理這種情況的推薦方法是將消息處理移到另一個線程中,讓消費者繼續調用poll。 但是必須注意確保已提交的offset不超過實際位置。另外,你必須禁用自動提交,並只有在線程完成處理后才為記錄手動提交偏移量(取決於你)。 還要注意,你需要pause暫停分區,不會從poll接收到新消息,讓線程處理完之前返回的消息(如果你的處理能力比拉取消息的慢,那創建新線程將導致你機器內存溢出)。
- 控制消費的位置
大多數情況下,消費者只是簡單的從頭到尾的消費消息,周期性的提交位置(自動或手動)。kafka也支持消費者去手動的控制消費的位置,可以消費之前的消息也可以跳過最近的消息。
有幾種情況,手動控制消費者的位置可能是有用的。
一種場景是對於時間敏感的消費者處理程序,對足夠落后的消費者,直接跳過,從最近的消費開始消費。
另一個使用場景是本地狀態存儲系統。在這樣的系統中,消費者將要在啟動時初始化它的位置(無論本地存儲是否包含)。同樣,如果本地狀態已被破壞(假設因為磁盤丟失),則可以通過重新消費所有數據並重新創建狀態(假設kafka保留了足夠的歷史)在新的機器上重新創建。
kafka使用seek(TopicPartition, long)指定新的消費位置。用於查找服務器保留的最早和最新的offset的特殊的方法也可用(seekToBeginning(Collection) 和 seekToEnd(Collection))。
- 消費者流量控制
如果消費者分配了多個分區,並同時消費所有的分區,這些分區具有相同的優先級。在一些情況下,消費者需要首先消費一些指定的分區,當指定的分區有少量或者已經沒有可消費的數據時,則開始消費其他分區。
例如流處理,當處理器從2個topic獲取消息並把這兩個topic的消息合並,當其中一個topic長時間落后另一個,則暫停消費,以便落后的趕上來。
kafka支持動態控制消費流量,分別在future的poll(long)中使用pause(Collection) 和 resume(Collection) 來暫停消費指定分配的分區,重新開始消費指定暫停的分區。