Flink報錯排查


一、動態topic檢測報錯:Unable to retrieve any partitions with KafkaTopicsDescriptor

  運行邏輯:先獲取kafka中全部的topic list,再進行正則匹配,得到指定的topic list

  調試發現,獲取kafka全部topic list返回null,集群啟動消費者控制台報LEADER_NOT_AVAILABLE。因flink報錯未直接報錯LEADER_NOT_AVAILABLE,導致需調試看源碼才能發現問題,花費了一些時間。

  網上說重啟集群或修改server.properties,kafka無權限重啟,暫不處理。

  advertised.listeners=PLAINTEXT://<your_kafka_server_ip>:9092
  advertised.host.name=node1 //主機名

    val kafkaConsumer = new FlinkKafkaConsumer010(
      //正則表達式指定topic,支持topic動態檢測
      java.util.regex.Pattern.compile("test[0-9]*"),
      new SimpleStringSchema,
      properties)
//1、定位至FlinkKafkaConsumerBase.java 的open函數中的獲取allPartitions
    public void open(Configuration configuration) throws Exception {

        List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
//2、進入AbstractPartitionDiscoverer的discoverPartitions(),getAllTopics()返回null,導致直接throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
       ......
                    List<String> matchedTopics = getAllTopics();
    ......
if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) { throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
//補充setAndCheckDiscoveredPartition()方法,指定了task與partition的對應關系
public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
if (isUndiscoveredPartition(partition)) {
discoveredPartitions.add(partition);
return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
}
return false;
}
//KafkaTopicPartitionAssigner:
    public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
        int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

        // here, the assumption is that the id of Kafka partitions are always ascending
        // starting from 0, and therefore can be used directly as the offset clockwise from the start index
        return (startIndex + partition.getPartition()) % numParallelSubtasks;
    }

 二、flink 隱式轉換失敗或kafkaconsumer匹配不到構造函數,需導入以下包

import org.apache.flink.streaming.api.scala._
import scala.collection.JavaConverters._

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM