Flink之TableAPI和SQL(4):表的Sink實現


相關文章鏈接

Flink之TableAPI和SQL(1):基本功能描述

Flink之TableAPI和SQL(2):表和外部系統的連接方式

Flink之TableAPI和SQL(3):通過TableAPI和SQL表的一些操作(包括查詢,過濾,聚集等)

Flink之TableAPI和SQL(4):表的Sink實現

Flink之TableAPI和SQL(5):表的時間特性

flink中表的輸出Sink可以分為3種:

1、追加模式(Append Mode):

  在追加模式下,表(動態表)和外部連接器只交換插入(Insert)消息。

2、撤回模式(Retract Mode):

  在撤回模式下,表和外部連接器交換的是:添加(Add)和撤回(Retract)消息。

3、Upsert(更新插入)模式:

  在Upsert模式下,動態表和外部連接器交換Upsert和Delete消息。

 

具體實現如下代碼所示:

// 創建執行環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// 獲取輸入流,並轉換為表
val sensorStream: DataStream[SensorReading] = env
    .readTextFile("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt")
    .map(new MyMapToSensorReading)
val sensorTable: Table = tableEnv.fromDataStream(sensorStream)

// 1、輸出到文件
tableEnv
    .connect(new FileSystem().path("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\out.txt"))
    .withFormat(new Csv())
    .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
    )
    .createTemporaryTable("outputTable")
sensorTable.insertInto("outputTable")

// 2、輸出到Kafka
tableEnv
    .connect(new Kafka()
        .version("0.11")
        .topic("flinkTestTopic")
        .property("zookeeper.connect", "cdh1:2181")
        .property("bootstrap.servers", "cdh1:9092")
    )
    .withFormat(new Csv())
    .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
    )
    .createTemporaryTable("kafkaOutputTable")
sensorTable.insertInto("kafkaOutputTable")

// 3、輸出到es
tableEnv
    .connect(new Elasticsearch()
        .version("7")
        .host("cdh1", 9200, "http")
        .index("sensor")
        .documentType("temp")
    )
    .inUpsertMode()                                         // 指定為Upsert模式(es支持更新模式)
    .withFormat(new Json())                                 // es只支持json,flink下的json格式
    .withSchema( new Schema()
        .field("id", DataTypes.STRING())
        .field("count", DataTypes.BIGINT())
    )
    .createTemporaryTable("esOutputTable")
sensorTable.insertInto("esOutputTable")

// 輸出到 Mysql
val sinkDDL: String =
    """
      |create table jdbcOutputTable (
      |  id varchar(20) not null,
      |  cnt bigint not null
      |) with (
      |  'connector.type' = 'jdbc',
      |  'connector.url' = 'jdbc:mysql://localhost:3306/test',
      |  'connector.table' = 'sensor_count',
      |  'connector.driver' = 'com.mysql.jdbc.Driver',
      |  'connector.username' = 'root',
      |  'connector.password' = '123456'
      |""".stripMargin
tableEnv.sqlUpdate(sinkDDL)
sensorTable.insertInto("jdbcOutputTable")

// 啟動執行器,執行任務
env.execute("OutputTableDemo")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM