flink 讀取kafka 數據,partition分配


每個並發有個編號,只會讀取kafka partition  % 總並發數 == 編號 的分區
 
如: 6 分區, 4個並發
分區: p0 p1 p2 p3 p4 p5
並發: 0 1 2 3 
 
分區 p0 分配給並發 0 :    0 % 4 = 0
分區 p1分配給並發1:    1 % 4 = 1
分區 p2分配給並發2:    2 % 4 = 2
分區 p3 分配給並發 3:    3 % 4 = 3
分區 p4 分配給並發 0 :    4 % 4 = 0
分區 p5 分配給並發 5 :    5 % 4 = 1
 
源碼解析:
FlinkKafkaConsumerBase.java  458 行 open 方法:
public void open(Configuration configuration) throws Exception {
調用 AbstractPartitionDiscoverer 類的方法 allPartitions ,發現並分配分區:
final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
發現訂閱的全部分區,並移除部分分區:
Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
KafkaTopicPartition nextPartition;
while (iter.hasNext()) {
nextPartition = iter.next();
if (!setAndCheckDiscoveredPartition(nextPartition)) {
iter.remove();
}
}
添加還未發現的分區
public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
if (isUndiscoveredPartition(partition)) {
discoveredPartitions.add(partition);

return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
}

return false;
}

public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

// here, the assumption is that the id of Kafka partitions are always ascending
// starting from 0, and therefore can be used directly as the offset clockwise from the start index
return (startIndex + partition.getPartition()) % numParallelSubtasks;
}

先發現訂閱的主題的所有新分區,循環所有新分區,把還未發現的新分區添加到發現的分區列表,把還未發現的新分區的
startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
求 startIndex(不知道干嘛), startIndex + 分區編號,對 並發數取余,
返回的結果 與 當前sub task 的並發編號相比,如果不相等,就把分區從新分區列表中移除,最后剩下的分區就是當前並發
讀取的分區。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM