每個並發有個編號,只會讀取kafka partition % 總並發數 == 編號 的分區
如: 6 分區, 4個並發
分區: p0 p1 p2 p3 p4 p5
並發: 0 1 2 3
分區 p0 分配給並發 0 : 0 % 4 = 0
分區 p1分配給並發1: 1 % 4 = 1
分區 p2分配給並發2: 2 % 4 = 2
分區 p3 分配給並發 3: 3 % 4 = 3
分區 p4 分配給並發 0 : 4 % 4 = 0
分區 p5 分配給並發 5 : 5 % 4 = 1
源碼解析:
FlinkKafkaConsumerBase.java 458 行 open 方法:
public void open(Configuration configuration) throws Exception {
調用 AbstractPartitionDiscoverer 類的方法 allPartitions ,發現並分配分區:
final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
發現訂閱的全部分區,並移除部分分區:
Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
KafkaTopicPartition nextPartition;
while (iter.hasNext()) {
nextPartition = iter.next();
if (!setAndCheckDiscoveredPartition(nextPartition)) {
iter.remove();
}
}
添加還未發現的分區
public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
if (isUndiscoveredPartition(partition)) {
discoveredPartitions.add(partition);
return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
}
return false;
}
public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
// here, the assumption is that the id of Kafka partitions are always ascending
// starting from 0, and therefore can be used directly as the offset clockwise from the start index
return (startIndex + partition.getPartition()) % numParallelSubtasks;
}
先發現訂閱的主題的所有新分區,循環所有新分區,把還未發現的新分區添加到發現的分區列表,把還未發現的新分區的
startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
求 startIndex(不知道干嘛), startIndex + 分區編號,對 並發數取余,
返回的結果 與 當前sub task 的並發編號相比,如果不相等,就把分區從新分區列表中移除,最后剩下的分區就是當前並發
讀取的分區。