Flink之API的使用(1):Sink的使用


相關文章鏈接

Flink之API的使用(1):Sink的使用

Flink之API的使用(2):Transform算子的使用

Flink之API的使用(3):Source的使用

具體代碼如下所示:

// 執行環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)

// 獲取數據
val fileStream: DataStream[String] = env
    .readTextFile("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt")
val sensorStream: DataStream[SensorReading] = fileStream.map(new MyMapToSensorReading)

// 1、kafkaSink
fileStream.addSink(new FlinkKafkaProducer[String]("cdh1:9092,cdh2:9092,cdh3:9092", "flinkTestTopic", new SimpleStringSchema()))

// 2、Redis Sink(在Flink中,需要定義一個redis的mapper類,用於定義保存到redis時調用的命令)
// 2.1、定義redis的連接信息
val conf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()
// 2.2、構建redis的Mapper對象(通過實現接口方式)
val redisMapper: RedisMapper[SensorReading] = new RedisMapper[SensorReading] {
    override def getCommandDescription: RedisCommandDescription = {
        new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")
    }

    override def getKeyFromData(data: SensorReading): String = data.id

    override def getValueFromData(data: SensorReading): String = data.temperature.toString
}
// 2.3、通過 FlinkJedisPoolConfig 和 RedisMapper 創建 RedisSink 對象,並使用流進行sink添加
sensorStream.addSink(new RedisSink[SensorReading](conf, redisMapper))

// 3、Elasticsearch Sink
// 3.1、定義ES的連接地址(httpHosts)
val httpHosts: util.ArrayList[HttpHost] = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))
// 3.2、定義一個 ElasticsearchSinkFunction(通過實現接口方式)
val elasticsearchSinkFunction: ElasticsearchSinkFunction[SensorReading] = new ElasticsearchSinkFunction[SensorReading] {
    override def process(sensorReading : SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
        // 包裝寫入es的數據
        val dataSource: util.HashMap[String, String] = new util.HashMap[String, String]()
        dataSource.put("sensor_id", sensorReading.id)
        dataSource.put("temp", sensorReading.temperature.toString)
        dataSource.put("ts", sensorReading.timestamp.toString)
        // 創建一個IndexRequest(其中包含index,type,source數據)
        val indexRequest: IndexRequest = Requests.indexRequest()
            .index("sensor_temp")
            .opType("readingData")
            .source(dataSource)
        // 用RequestIndexer將包裝好的 IndexRequest 數據發送到es(通過http)
        requestIndexer.add(indexRequest)
        println(sensorStream + " saved successfully")
    }
}
// 3.3、通過httpHosts和 ElasticsearchSinkFunction構建 ElasticsearchSink,並使用流進行sink添加
sensorStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHosts, elasticsearchSinkFunction).build())

// 4、JDBC Sink
// 4.1、因為沒有專門的JDBC Sink,使用直接實現一個RichSinkFunction,將JDBC操作
val jdbcSinkFunction: RichSinkFunction[SensorReading] = new RichSinkFunction[SensorReading] {

    /**
     * 定義連接 和 預編譯 語句(這些信息需要全局調用,並在open中初始化,close中關閉)
     */
    var conn: Connection = _
    var insertStmt: PreparedStatement = _
    var updateStmt: PreparedStatement = _

    /**
     * 在open生命周期方法中創建連接以及預編譯語句
     * @param parameters 配置信息
     */
    override def open(parameters: Configuration): Unit = {
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
        insertStmt = conn.prepareStatement("insert into temp (sensor, temperature) values (?,?)")
        updateStmt = conn.prepareStatement("update temp set temperature = ? where sensor = ?")
    }

    /**
     * 流中每進來一條數據,會調用一次此方法
     * @param value 流中進入的數據
     * @param context 環境上下文
     */
    override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
        // 執行更新語句
        updateStmt.setDouble(1, value.temperature)
        updateStmt.setString(2, value.id)
        updateStmt.execute()
        // 如果剛才沒有更新數據,那么執行插入操作
        if( updateStmt.getUpdateCount == 0 ){
            insertStmt.setString(1, value.id)
            insertStmt.setDouble(2, value.temperature)
            insertStmt.execute()
        }
    }

    /**
     * 關閉資源
     */
    override def close(): Unit = {
        insertStmt.close()
        updateStmt.close()
        conn.close()
    }
}
// 4.2、使用流進行sink添加
sensorStream.addSink(jdbcSinkFunction)


// 啟動執行環境,執行任務
env.execute("SinkDemo")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM