Flink保證並行度與kafka partition一對一詳解(一)


我們都知道flink 連接kafka時,默認是一個partition對應一個thread,它究竟是怎么實現的呢?以及到我們自己定義 RichParallelSourceFunction 的時候如何借鑒這部分代碼呢?

我們一起來看一下(基於flink-1.8) 看過flink kafka連接器源碼的同學對 FlinkKafkaConsumerBase 應該不陌生(沒有看過的也無所謂,我們一起來看就好) 一起來看一下 FlinkKafkaConsumerBase 的 open 方法中關鍵的部分

//獲取fixed topic's or topic pattern 's   partitions of this subtask
        final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();

沒錯這就是查看Flink Consumer 保證 一個partition對應一個Thread的入口方法

public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
        if (!closed && !wakeup) {
            try {
            ...
                // (2) eliminate partition that are old partitions or should not be subscribed by this subtask
                if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
                    throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
                } else {
                    Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
                    KafkaTopicPartition nextPartition;
                    while (iter.hasNext()) {
                        nextPartition = iter.next();
                        //從之前已經發現的KafkaTopicPartition中移除,其二可以保證僅僅是這個subtask的partition
                        if (!setAndCheckDiscoveredPartition(nextPartition)) {
                            iter.remove();
                        }
                    }
                }

                return newDiscoveredPartitions;
            ...
    }

關鍵性的部分 setAndCheckDiscoveredPartition 方法,點進去

public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
        if (isUndiscoveredPartition(partition)) {
            discoveredPartitions.add(partition);
            
            //kafkaPartition與indexOfThisSubTask --對應
            return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
        }
        return false;
    }

indexOfThisSubtask 表示當前線程是那個subtask,numParallelSubtasks 表示總共並行的subtask 的個數, 當其返回true的時候,表示此partition 屬於此indexOfThisSubtask。 下面來看一下具體是怎么划分的

public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
        int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

        // here, the assumption is that the id of Kafka partitions are always ascending
        // starting from 0, and therefore can be used directly as the offset clockwise from the start index
        return (startIndex + partition.getPartition()) % numParallelSubtasks;
    }

基於topic 和 partition,然后對numParallelSubtasks取余。

那么,當我們自己去定義RichParallelSourceFunction的時候如何去借鑒它呢,直接上代碼:

public class WordSource extends RichParallelSourceFunction<Tuple2<Long, Long>> {
    
    private Boolean isRun = true;
    
    @Override
    public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
        int start = 0;
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        while (isRun) {
            start += 1;
            if (start % numberOfParallelSubtasks == getRuntimeContext().getIndexOfThisSubtask()) {
                ctx.collect(new Tuple2<>(
                        Long.parseLong(start+""),
                        1L));
                Thread.sleep(1000);
                System.out.println("Thread.currentThread().getName()=========== " + Thread.currentThread().getName());
            }
        }
    }
    
    @Override
    public void cancel() {
        isRun = false;
    }
}

當當當,自此,自己定義個RichParallelSourceFunction也可以並行發數據了,啦啦啦啦!

 

轉載:https://cloud.tencent.com/developer/article/1446321


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM