我們都知道flink 連接kafka時,默認是一個partition對應一個thread,它究竟是怎么實現的呢?以及到我們自己定義 RichParallelSourceFunction 的時候如何借鑒這部分代碼呢?
我們一起來看一下(基於flink-1.8) 看過flink kafka連接器源碼的同學對 FlinkKafkaConsumerBase 應該不陌生(沒有看過的也無所謂,我們一起來看就好) 一起來看一下 FlinkKafkaConsumerBase 的 open 方法中關鍵的部分
//獲取fixed topic's or topic pattern 's partitions of this subtask final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
沒錯這就是查看Flink Consumer 保證 一個partition對應一個Thread的入口方法
public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException { if (!closed && !wakeup) { try { ... // (2) eliminate partition that are old partitions or should not be subscribed by this subtask if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) { throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor); } else { Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator(); KafkaTopicPartition nextPartition; while (iter.hasNext()) { nextPartition = iter.next(); //從之前已經發現的KafkaTopicPartition中移除,其二可以保證僅僅是這個subtask的partition if (!setAndCheckDiscoveredPartition(nextPartition)) { iter.remove(); } } } return newDiscoveredPartitions; ... }
關鍵性的部分 setAndCheckDiscoveredPartition 方法,點進去
public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) { if (isUndiscoveredPartition(partition)) { discoveredPartitions.add(partition); //kafkaPartition與indexOfThisSubTask --對應 return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask; } return false; }
indexOfThisSubtask 表示當前線程是那個subtask,numParallelSubtasks 表示總共並行的subtask 的個數, 當其返回true的時候,表示此partition 屬於此indexOfThisSubtask。 下面來看一下具體是怎么划分的
public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks; // here, the assumption is that the id of Kafka partitions are always ascending // starting from 0, and therefore can be used directly as the offset clockwise from the start index return (startIndex + partition.getPartition()) % numParallelSubtasks; }
基於topic 和 partition,然后對numParallelSubtasks取余。
那么,當我們自己去定義RichParallelSourceFunction的時候如何去借鑒它呢,直接上代碼:
public class WordSource extends RichParallelSourceFunction<Tuple2<Long, Long>> { private Boolean isRun = true; @Override public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { int start = 0; int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); while (isRun) { start += 1; if (start % numberOfParallelSubtasks == getRuntimeContext().getIndexOfThisSubtask()) { ctx.collect(new Tuple2<>( Long.parseLong(start+""), 1L)); Thread.sleep(1000); System.out.println("Thread.currentThread().getName()=========== " + Thread.currentThread().getName()); } } } @Override public void cancel() { isRun = false; } }
當當當,自此,自己定義個RichParallelSourceFunction也可以並行發數據了,啦啦啦啦!
轉載:https://cloud.tencent.com/developer/article/1446321