flink-Stream解析canal-json数据


引入依赖

    <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.33</version>
        </dependency>

  

val env = StreamExecutionEnvironment.getExecutionEnvironment
    println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)+"flink 代码开始运行")
    val begin_date = new EQTJStreamUtil().getParamDate(ParameterTool.fromArgs(args))
    println(begin_date)

    //添加kakka数据源

    val reportStreamSouce = env.addSource(new FlinkKafkaConsumer[String]("bymm_topic", new SimpleStringSchema(), new EQTJStreamUtil().getKafkaProps())
      .setStartFromEarliest())  //设置消费kafka位置
      .map(JSON.parseObject(_))
      .filter(_.get("table")=="epidemic_report")
      .filter(_.get("type").toString.matches("(INSERT|UPDATE)"))
      .map(_.getJSONArray("data").getObject(0,new Dxxbs_epidemic_report().getClass))
//      .filter(_.getSet_id=="1")
      .filter(_.getCreat_time > begin_date)

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM