一、動態topic檢測報錯:Unable to retrieve any partitions with KafkaTopicsDescriptor
運行邏輯:先獲取kafka中全部的topic list,再進行正則匹配,得到指定的topic list
調試發現,獲取kafka全部topic list返回null,集群啟動消費者控制台報LEADER_NOT_AVAILABLE。因flink報錯未直接報錯LEADER_NOT_AVAILABLE,導致需調試看源碼才能發現問題,花費了一些時間。
網上說重啟集群或修改server.properties,kafka無權限重啟,暫不處理。
advertised.listeners=PLAINTEXT://<your_kafka_server_ip>:9092
advertised.host.name=node1 //主機名
val kafkaConsumer = new FlinkKafkaConsumer010( //正則表達式指定topic,支持topic動態檢測 java.util.regex.Pattern.compile("test[0-9]*"), new SimpleStringSchema, properties) //1、定位至FlinkKafkaConsumerBase.java 的open函數中的獲取allPartitions public void open(Configuration configuration) throws Exception { List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions(); //2、進入AbstractPartitionDiscoverer的discoverPartitions(),getAllTopics()返回null,導致直接throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor); public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException { ...... List<String> matchedTopics = getAllTopics();
...... if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) { throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
//補充setAndCheckDiscoveredPartition()方法,指定了task與partition的對應關系
public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
if (isUndiscoveredPartition(partition)) {
discoveredPartitions.add(partition);
return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
}
return false;
}
//KafkaTopicPartitionAssigner:
public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
// here, the assumption is that the id of Kafka partitions are always ascending
// starting from 0, and therefore can be used directly as the offset clockwise from the start index
return (startIndex + partition.getPartition()) % numParallelSubtasks;
}
二、flink 隱式轉換失敗或kafkaconsumer匹配不到構造函數,需導入以下包
import org.apache.flink.streaming.api.scala._ import scala.collection.JavaConverters._