引入依賴
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.33</version> </dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)+"flink 代碼開始運行") val begin_date = new EQTJStreamUtil().getParamDate(ParameterTool.fromArgs(args)) println(begin_date) //添加kakka數據源 val reportStreamSouce = env.addSource(new FlinkKafkaConsumer[String]("bymm_topic", new SimpleStringSchema(), new EQTJStreamUtil().getKafkaProps()) .setStartFromEarliest()) //設置消費kafka位置 .map(JSON.parseObject(_)) .filter(_.get("table")=="epidemic_report") .filter(_.get("type").toString.matches("(INSERT|UPDATE)")) .map(_.getJSONArray("data").getObject(0,new Dxxbs_epidemic_report().getClass)) // .filter(_.getSet_id=="1") .filter(_.getCreat_time > begin_date)