一、自定義生產數據
https://www.cnblogs.com/robots2/p/16048729.html
二、生產轉化數據,導出到mysql
2.1 建表語句
CREATE TABLE `video_order` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`money` int(11) DEFAULT NULL,
`title` varchar(32) DEFAULT NULL,
`trade_no` varchar(64) DEFAULT NULL,
`create_time` date DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=32 DEFAULT CHARSET=utf8mb4
2.2自定義source,配置mysql數據源
package net.xdclass.class6.sink;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import net.xdclass.model.VideoOrder;
/**
* @desc
* @menu
*/
public class MysqlSink extends RichSinkFunction<VideoOrder> {
private Connection conn;
private PreparedStatement ps;
//打開
@Override
public void open(Configuration parameters) throws Exception {
//建立mysql連接
conn = DriverManager.getConnection("jdbc:mysql://ip地址:3306/flink_test?useUnicode=true" +
"&characterEncoding=utf8&allowMultiQueries=true&serverTimezone" +
"=Asia/Shanghai", "賬號", "密碼");
String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
ps = conn.prepareStatement(sql);
System.out.println("自定義sink,open數據庫鏈接 =====");
}
//關閉
@Override
public void close() throws Exception {
//關閉mysql連接
System.out.println("自定義sink,close數據庫連接 =====");
if(conn != null){
conn.close();
}
if(ps != null){
ps.close();;
}
}
//每個數據流對象過來,調用的方法
@Override
public void invoke(VideoOrder value, Context context) throws Exception {
ps.setInt(1,value.getUserId());
ps.setInt(2,value.getMoney());
ps.setString(3,value.getTitle());
ps.setString(4,value.getTradeNo());
ps.setDate(5,new Date(value.getCreateTime().getTime()));
ps.executeUpdate();
}
}
2.3 導出到mysql
package net.xdclass.class6.app;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import net.xdclass.class6.sink.MysqlSink;
import net.xdclass.class6.source.VideoOrderSource;
import net.xdclass.model.VideoOrder;
/**
* @menu
*/
public class FLink04CustomSinkApp {
public static void main(String[] args) throws Exception {
//WebUi方式運行
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設置運行模式為流批一體
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//並行度
env.setParallelism(1);
//設置為自定義source
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
//過濾
DataStream<VideoOrder> dfFliter = ds.filter(new FilterFunction<VideoOrder>() {
@Override
public boolean filter(VideoOrder videoOrder) throws Exception {
return videoOrder.getMoney() > 10;
}
});
dfFliter.print();
//設置為自定義sink
dfFliter.addSink(new MysqlSink());
//設置名字
env.execute("CustomSourceApp");
}
}