Flink從Kafka 0.8中讀取多個Topic時的問題


    Flink提供了FlinkKafkaConsumer08,使用Kafka的High-level接口,從Kafka中讀取指定Topic的數據,如果要從多個Topic讀取數據,可以如下操作:

1.application.conf中配置

   如果使用了配置管理庫typesafe.config,可以在其application.conf按如下方式配置List類型的元素

myToicList:["t1","t2","t3"]

2.讀取配置文件

object MyFlinkConfig {
  import com.typesafe.config.{ Config, ConfigFactory }
  import net.ceedubs.ficus.Ficus._


  def apply(): MyFlinkConfig = apply(ConfigFactory.load)

  def apply(applicationConfig: Config): MyFlinkConfig = {

    val config = applicationConfig.getConfig("MyFlinkConfig")

    new MyFlinkConfig (config.as[List[String]]("myTopicList"))
  }
}

case class MyFlinkConfig (myTopicList: List[String]) extends Serializable {}

3.讀取多個Topic

 因為FlinkKafkaConsumer08使用Java實現的,而MyFlinkConfig 中的List是Scala的List,所以要將Scala的List轉為Java的List

val config =MyFlinkConfig()
import scala.collection.JavaConversions._
val kafkaConsumer=new FlinkKafkaConsumer08[MonitorDataRecord](config.myTopicList, new SimpleStringSchema(), kafkaProps)

 

4.遇到的問題

4.1 如果要讀取的Topic不存在,則應用程序直接報錯,因此Topic在配置文件中配置時一定要正確

4.2 如果要讀取的Topic列表中,其中一個在Kafka中沒有數據,而你又基於Event Time提取Timestamp並且設置Watermark,會導致整個Topic列表都沒法基於時間窗口觸發操作,解決方案:

    先rebalance,然后再設置水位:   

    val monitorSampling = env
      .addSource(kafkaConsumer)
      .rebalance
      .assignTimestampsAndWatermarks(new MyWatermarkGenerator[MyRecord](Time.seconds(config.latencyDuration)))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM