flink-Stream解析canal-json數據


引入依賴

    <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.33</version>
        </dependency>

  

val env = StreamExecutionEnvironment.getExecutionEnvironment
    println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)+"flink 代碼開始運行")
    val begin_date = new EQTJStreamUtil().getParamDate(ParameterTool.fromArgs(args))
    println(begin_date)

    //添加kakka數據源

    val reportStreamSouce = env.addSource(new FlinkKafkaConsumer[String]("bymm_topic", new SimpleStringSchema(), new EQTJStreamUtil().getKafkaProps())
      .setStartFromEarliest())  //設置消費kafka位置
      .map(JSON.parseObject(_))
      .filter(_.get("table")=="epidemic_report")
      .filter(_.get("type").toString.matches("(INSERT|UPDATE)"))
      .map(_.getJSONArray("data").getObject(0,new Dxxbs_epidemic_report().getClass))
//      .filter(_.getSet_id=="1")
      .filter(_.getCreat_time > begin_date)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM