Flink之TableAPI和SQL(4):表的Sink实现


相关文章链接

Flink之TableAPI和SQL(1):基本功能描述

Flink之TableAPI和SQL(2):表和外部系统的连接方式

Flink之TableAPI和SQL(3):通过TableAPI和SQL表的一些操作(包括查询,过滤,聚集等)

Flink之TableAPI和SQL(4):表的Sink实现

Flink之TableAPI和SQL(5):表的时间特性

flink中表的输出Sink可以分为3种:

1、追加模式(Append Mode):

  在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。

2、撤回模式(Retract Mode):

  在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。

3、Upsert(更新插入)模式:

  在Upsert模式下,动态表和外部连接器交换Upsert和Delete消息。

 

具体实现如下代码所示:

// 创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// 获取输入流,并转换为表
val sensorStream: DataStream[SensorReading] = env
    .readTextFile("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt")
    .map(new MyMapToSensorReading)
val sensorTable: Table = tableEnv.fromDataStream(sensorStream)

// 1、输出到文件
tableEnv
    .connect(new FileSystem().path("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\out.txt"))
    .withFormat(new Csv())
    .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
    )
    .createTemporaryTable("outputTable")
sensorTable.insertInto("outputTable")

// 2、输出到Kafka
tableEnv
    .connect(new Kafka()
        .version("0.11")
        .topic("flinkTestTopic")
        .property("zookeeper.connect", "cdh1:2181")
        .property("bootstrap.servers", "cdh1:9092")
    )
    .withFormat(new Csv())
    .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
    )
    .createTemporaryTable("kafkaOutputTable")
sensorTable.insertInto("kafkaOutputTable")

// 3、输出到es
tableEnv
    .connect(new Elasticsearch()
        .version("7")
        .host("cdh1", 9200, "http")
        .index("sensor")
        .documentType("temp")
    )
    .inUpsertMode()                                         // 指定为Upsert模式(es支持更新模式)
    .withFormat(new Json())                                 // es只支持json,flink下的json格式
    .withSchema( new Schema()
        .field("id", DataTypes.STRING())
        .field("count", DataTypes.BIGINT())
    )
    .createTemporaryTable("esOutputTable")
sensorTable.insertInto("esOutputTable")

// 输出到 Mysql
val sinkDDL: String =
    """
      |create table jdbcOutputTable (
      |  id varchar(20) not null,
      |  cnt bigint not null
      |) with (
      |  'connector.type' = 'jdbc',
      |  'connector.url' = 'jdbc:mysql://localhost:3306/test',
      |  'connector.table' = 'sensor_count',
      |  'connector.driver' = 'com.mysql.jdbc.Driver',
      |  'connector.username' = 'root',
      |  'connector.password' = '123456'
      |""".stripMargin
tableEnv.sqlUpdate(sinkDDL)
sensorTable.insertInto("jdbcOutputTable")

// 启动执行器,执行任务
env.execute("OutputTableDemo")

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM