一、
1、原理圖
2、原理描述
一個topic 可以配置幾個partition,produce發送的消息分發到不同的partition中,consumer接受數據的時候是按照group來接受,kafka確保每個partition只能同一個group中的同一個consumer消費,如果想要重復消費,那么需要其他的組來消費。Zookeerper中保存這每個topic下的每個partition在每個group中消費的offset
新版kafka把這個offsert保存到了一個__consumer_offsert的topic下
這個__consumer_offsert 有50個分區,通過將group的id哈希值%50的值來確定要保存到那一個分區. 這樣也是為了考慮到zookeeper不擅長大量讀寫的原因。
所以,如果要一個group用幾個consumer來同時讀取的話,需要多線程來讀取,一個線程相當於一個consumer實例。當consumer的數量大於分區的數量的時候,有的consumer線程會讀取不到數據。
假設一個topic test 被groupA消費了,現在啟動另外一個新的groupB來消費test,默認test-groupB的offset不是0,而是沒有新建立,除非當test有數據的時候,groupB會收到該數據,該條數據也是第一條數據,groupB的offset也是剛初始化的ofsert, 除非用顯式的用–from-beginnging 來獲取從0開始數據
3、查看topic-group的offsert
位置:zookeeper
路徑:[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets/partitions
在zookeeper的topic中有一個特殊的topic __consumer_offserts
計算方法:(放入哪個partitions)
int hashCode = Math.abs("ttt".hashCode());
int partition = hashCode % 50;
先計算group的hashCode,再除以分區數(50),可以得到partition的值
使用命令查看: kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
4.參數
auto.offset.reset:默認值為largest,代表最新的消息,smallest代表從最早的消息開始讀取,當consumer剛開始創建的時候沒有offset這種情況,如果設置了largest,則為當收到最新的一條消息的時候開始記錄offsert,若設置為smalert,那么會從頭開始讀partition
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class JasonPartitioner<T> implements Partitioner { public JasonPartitioner(VerifiableProperties verifiableProperties) {} @Override public int partition(Object key, int numPartitions) { try { int partitionNum = Integer.parseInt((String) key); return Math.abs(Integer.parseInt((String) key) % numPartitions); } catch (Exception e) { return Math.abs(key.hashCode() % numPartitions); } } }
如果將上例中的類作為partition.class,並通過如下代碼發送20條消息(key分別為0,1,2,3)至topic3(包含4個Partition)。
public void sendMessage() throws InterruptedException{ for(int i = 1; i <= 5; i++){ List messageList = new ArrayList<KeyedMessage<String, String>>(); for(int j = 0; j < 4; j++){ messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j)); } producer.send(messageList); } producer.close(); }
則key相同的消息會被發送並存儲到同一個partition里,而且key的序號正好和Partition序號相同。(Partition序號從0開始,本例中的key也從0開始)。下圖所示是通過Java程序調用Consumer后打印出的消息列表。
4、consumer group (本節所有描述都是基於Consumer hight level API而非low level API)。
使用Consumer high level API時,同一Topic的一條消息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一消息。
這是Kafka用來實現一個Topic消息的廣播(發給所有的Consumer)和單播(發給某一個Consumer)的手段。一個Topic可以對應多個Consumer Group。如果需要實現廣播,只要每個Consumer有一個獨立的Group就可以了。要實現單播只要所有的Consumer在同一個Group里。用Consumer Group還可以將Consumer進行自由的分組而不需要多次發送消息到不同的Topic。
實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時將數據實時備份到另一個數據中心,只需要保證這三個操作所使用的Consumer屬於不同的Consumer Group即可。
下面這個例子更清晰地展示了Kafka Consumer Group的特性。首先創建一個Topic (名為topic1,包含3個Partition),然后創建一個屬於group1的Consumer實例,並創建三個屬於group2的Consumer實例,最后通過Producer向topic1發送key分別為1,2,3的消息。結果發現屬於group1的Consumer收到了所有的這三條消息,同時group2中的3個Consumer分別收到了key為1,2,3的消息。