15.如何消費內部topic: __consumer_offsets
- 主要是要讓它來格式化:
GroupMetadataManager.OffsetsMessageFormatter
- 最后用看了它的源碼,把這部分挑選出來,自己解析了得到的byte[]。核心代碼如下:
// com.sina.mis.app.ConsumerInnerTopic
ConsumerRecords<byte[], byte[]> records = consumer.poll(512);
for (ConsumerRecord<byte[], byte[]> record : records) {
Object offsetKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
if (offsetKey instanceof OffsetKey) {
GroupTopicPartition groupTopicPartition = ((OffsetKey) offsetKey).key();
OffsetAndMetadata value = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value()));
LOG.info(groupTopicPartition.toString() + "---:---" + value);
} else {
LOG.info("############:{}", offsetKey);
}
}
1.For Kafka 0.8.2.x
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Only consume the latest consumer offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \
--zookeeper localhost:2181 --topic __consumer_offsets
2.For Kafka 0.9.x.x and 0.10.0.0
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Only consume the latest consumer offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
--zookeeper 10.39.40.98:2181/kafka10 --topic __consumer_offsets
- Committing and fetching consumer offsets in Kafka : 使用Java API獲取Consumer Offset。
Since version 0.8.2, kafka has had the ability to store consumer offsets in an internal compacted topic called __consumer_offsets
14.Kafka metrics
Kafka 用 Yammer Metrics來存儲Server、Client的數據。可以使用插件式的方式獲取這些數據,寫入到CSV文件。
Kafka 實現了 KafkaCSVMetricsReporter.scala
,可以 將metrics寫入到CSV文件。
由於沒有實現寫入ganglia的實現類,所以無法直接從Kafka將metrics寫入到ganglia。
13.snappy的壓縮率
為什么某個topic的HDFS的數據多余Kafka自己統計的流量40%左右。
sina的KafkaProxy都使用了snappy壓縮后入kafka。
- 猜想 30%-40%
- 需要測試一下:找一批HDFS的文件,寫入Kafka,消費出來,寫成文件,看看大小差別。
12.Kafka的Consumer讀取數據的時候,讀哪個partition
High level Consumer的API,默認以Range的方式分配,還有另外一個是RoundRobin。
11.Kafka的Producer發送數據的時候,發送給哪個partition
這是有DefaultPartitioner
決定的。
If a partition is specified in the record, use it.
- If no partition is specified but a key is present choose a partition based on a hash of the key
- If no partition or key is present choose a partition in a round-robin fashion
中文:
- 有key就hash
- 沒key就Round-robin
0.8.0 版本在沒key的時候,是Random的方式。
10.Linkedin的集群GC情況
90%的broker GC暫停時間為21ms左右。每秒進行的young GC小於1次
9.解釋一下什么是ZoroCopy(sendfile技術)
傳統網絡IO流程,一次傳送過程:
- 從Disk把數據讀到內核區的Read Buffer。
- 把數據從內核區到用戶區Buffer。
- 再把數據寫入到內核區的Socket Buffer上。
- 把數據從Socket Buffer復制到網卡的NIC Buffer上。
Kafka少了中間兩步,這就是sendfile技術:
8.kafka如何做到大吞吐量、強大消息堆積能力等特性
- 依賴OS文件系統的頁緩存 (當上層有寫操作時,操作系統只是將數據寫入PageCache,同時標記Page屬性為Dirty。當讀操作發生時,先從PageCache中查找,如果發生缺頁才進行磁盤調度,最終返回需要的數據。實際上PageCache是把盡可能多的空閑內存都當做了磁盤緩存來使用。同時如果有其他進程申請內存,回收PageCache的代價又很小。
總結:依賴OS的頁緩存能大量減少IO,高效利用內存來作為緩存) - 不使用JVM緩存數據, 內存利用率高
- 順序IO以及O(1)常量時間get、put消息
- sendfile技術(零拷貝)
7.一個隊列最重要的就是消息丟失問題,kafka是如何處理的
每次發送數據時,Producer都是send()
之后就認為已經發送出去了,但其實大多數情況下消息還在內存的MessageSet當中,尚未發送到網絡,這時候如果Producer掛掉,那就會出現丟數據的情況。
解決辦法: ack機制,一般設置為acks=1,消息只需要被Leader接受並確認即可,這樣同時保證了可靠性和效率。
6.Kafka 0.10的Producer做了什么優化
- MessageSet手段批量順序寫入
- 數據支持壓縮
- 異步發送
5.為什么kafka是pull模型
push模式的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據consumer的消費能力以適當的速率消費消息。
4.底層的LogSegment、Index是怎么存儲的
- log:字節流,sendfile、zero copy技術
- index:稀疏索引,mmap的數據結構-本質是個類,二分查找尋找到offset。
3.一個很重要的問題是當Leader宕機了,怎樣在Follower中選舉出新的Leader
一種非常常用的選舉leader的方式是“Majority Vote”(“少數服從多數”)。
剛創建的topic一般"preferred replica"是leader。在ZooKeeper中動態維護了一個ISR(in-sync replicas),這個ISR里的所有Replica都跟上了leader,只有ISR里的成員才有被選為Leader的可能。
所有Partition的Leader選舉都由controller決定。controller會將Leader的改變直接通過RPC的方式,通知需要做此修改的Broker。
那么Controller是如何選舉leader的?
- 如果當前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader。
- 如果replica都不在ISR列表里面,選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)。
- 如果該Partition的所有Replica都宕機了,則將新的Leader設置為-1。
2. Partition的leader選舉是怎么樣的
- Producer在發布消息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,然后寫入數據。
- Consumer(0.8)通過zk找到leader,讀取數據。
- Consumer(0.10)通過Coordinator找到Leader,讀取數據。
1. reassign一個topic,Producer、Consumer是否會丟失數據
不會。擴容的時候,新的leader需要從舊有的broker復制數據,跟上以后,會切換成leader。
這個時間期間,Producer、Consumer會向舊有的leader通信。
內部topic:__consumer_offsets
這個topic是用來管理所有的consumer的進度的,這樣避免了把消費進度存zk上面影響擴展性。它是由Coordinator來管理的。
如果請求過來的topic是__consumer_offsets,那就啟動OffsetManager的異步讀
這個異步讀會一直讀取__consumer_offsets並把消息解碼成消費進度放入緩存
queued.max.requests=16
I/O線程可以處理請求的隊列大小,若實際請求數超過此大小,網絡線程將停止接收新的請求。
confluence
- 單機partition數的最大值:
100 * broker * replica
(if you care about latency, it’s probably a good idea to limit the number of partitions per broker to 100 x b x r, where b is the number of brokers in a Kafka cluster and r is the replication factor.)