相关文章链接
Flink之TableAPI和SQL(2):表和外部系统的连接方式
Flink之TableAPI和SQL(3):通过TableAPI和SQL表的一些操作(包括查询,过滤,聚集等)
Flink之TableAPI和SQL(4):表的Sink实现
flink中表的输出Sink可以分为3种:
1、追加模式(Append Mode):
在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。
2、撤回模式(Retract Mode):
在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。
3、Upsert(更新插入)模式:
在Upsert模式下,动态表和外部连接器交换Upsert和Delete消息。
具体实现如下代码所示:
// 创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) // 获取输入流,并转换为表 val sensorStream: DataStream[SensorReading] = env .readTextFile("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt") .map(new MyMapToSensorReading) val sensorTable: Table = tableEnv.fromDataStream(sensorStream) // 1、输出到文件 tableEnv .connect(new FileSystem().path("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\out.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) .createTemporaryTable("outputTable") sensorTable.insertInto("outputTable") // 2、输出到Kafka tableEnv .connect(new Kafka() .version("0.11") .topic("flinkTestTopic") .property("zookeeper.connect", "cdh1:2181") .property("bootstrap.servers", "cdh1:9092") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaOutputTable") sensorTable.insertInto("kafkaOutputTable") // 3、输出到es tableEnv .connect(new Elasticsearch() .version("7") .host("cdh1", 9200, "http") .index("sensor") .documentType("temp") ) .inUpsertMode() // 指定为Upsert模式(es支持更新模式) .withFormat(new Json()) // es只支持json,flink下的json格式 .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("count", DataTypes.BIGINT()) ) .createTemporaryTable("esOutputTable") sensorTable.insertInto("esOutputTable") // 输出到 Mysql val sinkDDL: String = """ |create table jdbcOutputTable ( | id varchar(20) not null, | cnt bigint not null |) with ( | 'connector.type' = 'jdbc', | 'connector.url' = 'jdbc:mysql://localhost:3306/test', | 'connector.table' = 'sensor_count', | 'connector.driver' = 'com.mysql.jdbc.Driver', | 'connector.username' = 'root', | 'connector.password' = '123456' |""".stripMargin tableEnv.sqlUpdate(sinkDDL) sensorTable.insertInto("jdbcOutputTable") // 启动执行器,执行任务 env.execute("OutputTableDemo")