消息隊列的作用:解耦,削峰,異步,順序性(在一定程度上保證)。
解偶
快遞員可以選擇自己的時間,把快遞放到櫃子里,不需要關心小明是否在家。
小明也不需要一直等待給快遞員開門,兩個人解耦了。
異步
快遞員把快遞放到櫃子里發個信息就可以去送下一件,不需同步等待結果。
削峰
到了雙十一,小明一天要到100個快遞,由於小明一天只能消化10個快遞,剩下的就放在了櫃子里,等10天后才拿完。
Kafka架構總覽
kafka如何支撐多線程消費?
設置一個組,這個組消費某個主題的消息,組內設置和分區數一樣多的消費者,每個消費者消費一個分區。分區數決定了同組消費者個數的上限。如果你的分區數是N,那么最好線程數也保持為N,這樣通常能夠達到最大的吞吐量。超過N的配置只是浪費系統資源,因為多出的線程不會被分配到任何分區。
kafka 會不會出現不按順序消費的情況?
亂序場景一
因為一個topic可以有多個partition,kafka只能保證partition內部有序,partition數量=同一個消費者組中消費者數量時,可能需要順序的數據分布到了不同的partition,導致處理時亂序
解決方案
1、可以設置topic 有且只有一個partition。
2、同一topic下,再分為各需要順序消費的業務小類,每個小類消息指定到同一個partition。
亂序場景二
對於同一業務進入了一個消費者組之后,用了多線程來處理消息,會導致消息的亂序
解決方案
消費者內部保持與線程等量的內存隊列,相同的業務數據,放到同一個內存隊列中,然后線程從對應的內存隊列中取出並操作。
組並發:一個組可以並發消費多個分區。
分區負載均衡:一個分區可以被多個組消費。
組內分工明確:組內消費者只能消費不同的分區。
一個消息對一個組,只會發一次。
broker到消費者采用pull模式:
push模式的目標是盡可能快地傳遞消息,這樣很容易造成消費者來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式可以根據consumer的消費能力以適當的速率消費消息。
pull模式缺點在於,如果broker沒有數據,消費者會輪詢,忙等待數據直到數據到達,為了避免這種情況,我們允許消費者在pull請求時候使用“long poll”進行阻塞,直到數據到達 。
一個組對每條休息只能消費一次,
Leader和Follower分區
producer 只能往 leader 分區上寫數據,消費也只能從leader分區上讀,followers 只按順序從 leader 上復制日志。
3.2 冪等性發送
為實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。對於每個PID,該Producer發送消息的每個<Topic, Partition>都對應一個單調遞增的Sequence Number。同樣,Broker端也會為每個<PID, Topic, Partition>維護一個序號,並且每Commit一條消息時將其對應序號遞增。對於接收的每條消息,如果其序號比Broker維護的序號大1,則Broker會接受它,否則將其丟棄。
如果消息序號比Broker維護的序號差值比一大,說明中間有數據尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
如果消息序號小於等於Broker維護的序號,說明該消息已被保存,即為重復消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
Sender發送失敗后會重試,這樣可以保證每個消息都被發送到broker。
3.3 生產者
生產者發送消息的三種方式:
1. producer.send(record);//Fire-and-forget異步發送,不管發送結果。 2. RecordMetadata result = producer.send(record).get(); //同步發送。 3. producer.send(record,new MyProducerCallBack());//帶回調的異步發送 一定需要處理callback的成功或失敗邏輯: private static class MyProducerCallBack implements Callback{ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(null != e){ e.printStackTrace(); return; } System.out.println("時間戳,主題,分區,位移: " + recordMetadata.timestamp() + ", " + recordMetadata.topic() + "," + recordMetadata.partition() + " " + recordMetadata.offset()); } }
如果ProducerRecord中指定了Partition,則Partitioner不做任何事情;否則,Partitioner根據消息的key得到一個Partition。這是生產者就知道向哪個Topic下的哪個Partition發送這條消息。
3.4 kafka消費如何保證順序性
一個 topic,一個 partition,一個 consumer,內部單線程消費,這樣的狀態數據消費是有序的。但由於單線程吞吐量太低,在數據龐大的實際場景很少采用。
1、生產者在寫的時候,可以指定一個 key,比如說我們指定了某個訂單 id 作為 key,那么這個訂單相關的數據,一定會被分發到同一個 partition 中去,而且這個 partition 中的數據一定是有順序的。(需要保證順序的幾條消息,發送到同一個分區。局部有序)
2、消費者從 partition 中取出來數據的時候,也一定是有順序的。
3、但是消費者里可能會有多個線程來並發來處理消息。因為如果消費者是單線程消費數據,那么這個吞吐量太低了。而多個線程並發的話,順序可能就亂掉了。
4、寫N個queue,將具有相同key的數據都存儲在同一個queue,然后對於N個線程,每個線程分別消費一個queue即可。
producer發送消息時具體到topic的哪一個partition分區,提供了三種方式
1)指定分區
2)不指定分區,有指定key 則根據key的hash值與分區數進行運算后確定發送到哪個partition分區
3)不指定分區,不指定key,則輪詢各分區發送