Flink报错排查


一、动态topic检测报错:Unable to retrieve any partitions with KafkaTopicsDescriptor

  运行逻辑:先获取kafka中全部的topic list,再进行正则匹配,得到指定的topic list

  调试发现,获取kafka全部topic list返回null,集群启动消费者控制台报LEADER_NOT_AVAILABLE。因flink报错未直接报错LEADER_NOT_AVAILABLE,导致需调试看源码才能发现问题,花费了一些时间。

  网上说重启集群或修改server.properties,kafka无权限重启,暂不处理。

  advertised.listeners=PLAINTEXT://<your_kafka_server_ip>:9092
  advertised.host.name=node1 //主机名

    val kafkaConsumer = new FlinkKafkaConsumer010(
      //正则表达式指定topic,支持topic动态检测
      java.util.regex.Pattern.compile("test[0-9]*"),
      new SimpleStringSchema,
      properties)
//1、定位至FlinkKafkaConsumerBase.java 的open函数中的获取allPartitions
    public void open(Configuration configuration) throws Exception {

        List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
//2、进入AbstractPartitionDiscoverer的discoverPartitions(),getAllTopics()返回null,导致直接throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
       ......
                    List<String> matchedTopics = getAllTopics();
    ......
if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) { throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
//补充setAndCheckDiscoveredPartition()方法,指定了task与partition的对应关系
public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
if (isUndiscoveredPartition(partition)) {
discoveredPartitions.add(partition);
return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
}
return false;
}
//KafkaTopicPartitionAssigner:
    public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
        int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

        // here, the assumption is that the id of Kafka partitions are always ascending
        // starting from 0, and therefore can be used directly as the offset clockwise from the start index
        return (startIndex + partition.getPartition()) % numParallelSubtasks;
    }

 二、flink 隐式转换失败或kafkaconsumer匹配不到构造函数,需导入以下包

import org.apache.flink.streaming.api.scala._
import scala.collection.JavaConverters._

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM