相關文章鏈接
Flink之API的使用(2):Transform算子的使用
具體代碼如下所示:
// 執行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) // 獲取數據 val fileStream: DataStream[String] = env .readTextFile("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt") val sensorStream: DataStream[SensorReading] = fileStream.map(new MyMapToSensorReading) // 1、kafkaSink fileStream.addSink(new FlinkKafkaProducer[String]("cdh1:9092,cdh2:9092,cdh3:9092", "flinkTestTopic", new SimpleStringSchema())) // 2、Redis Sink(在Flink中,需要定義一個redis的mapper類,用於定義保存到redis時調用的命令) // 2.1、定義redis的連接信息 val conf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build() // 2.2、構建redis的Mapper對象(通過實現接口方式) val redisMapper: RedisMapper[SensorReading] = new RedisMapper[SensorReading] { override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature") } override def getKeyFromData(data: SensorReading): String = data.id override def getValueFromData(data: SensorReading): String = data.temperature.toString } // 2.3、通過 FlinkJedisPoolConfig 和 RedisMapper 創建 RedisSink 對象,並使用流進行sink添加 sensorStream.addSink(new RedisSink[SensorReading](conf, redisMapper)) // 3、Elasticsearch Sink // 3.1、定義ES的連接地址(httpHosts) val httpHosts: util.ArrayList[HttpHost] = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("localhost", 9200)) // 3.2、定義一個 ElasticsearchSinkFunction(通過實現接口方式) val elasticsearchSinkFunction: ElasticsearchSinkFunction[SensorReading] = new ElasticsearchSinkFunction[SensorReading] { override def process(sensorReading : SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { // 包裝寫入es的數據 val dataSource: util.HashMap[String, String] = new util.HashMap[String, String]() dataSource.put("sensor_id", sensorReading.id) dataSource.put("temp", sensorReading.temperature.toString) dataSource.put("ts", sensorReading.timestamp.toString) // 創建一個IndexRequest(其中包含index,type,source數據) val indexRequest: IndexRequest = Requests.indexRequest() .index("sensor_temp") .opType("readingData") .source(dataSource) // 用RequestIndexer將包裝好的 IndexRequest 數據發送到es(通過http) requestIndexer.add(indexRequest) println(sensorStream + " saved successfully") } } // 3.3、通過httpHosts和 ElasticsearchSinkFunction構建 ElasticsearchSink,並使用流進行sink添加 sensorStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHosts, elasticsearchSinkFunction).build()) // 4、JDBC Sink // 4.1、因為沒有專門的JDBC Sink,使用直接實現一個RichSinkFunction,將JDBC操作 val jdbcSinkFunction: RichSinkFunction[SensorReading] = new RichSinkFunction[SensorReading] { /** * 定義連接 和 預編譯 語句(這些信息需要全局調用,並在open中初始化,close中關閉) */ var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ /** * 在open生命周期方法中創建連接以及預編譯語句 * @param parameters 配置信息 */ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456") insertStmt = conn.prepareStatement("insert into temp (sensor, temperature) values (?,?)") updateStmt = conn.prepareStatement("update temp set temperature = ? where sensor = ?") } /** * 流中每進來一條數據,會調用一次此方法 * @param value 流中進入的數據 * @param context 環境上下文 */ override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = { // 執行更新語句 updateStmt.setDouble(1, value.temperature) updateStmt.setString(2, value.id) updateStmt.execute() // 如果剛才沒有更新數據,那么執行插入操作 if( updateStmt.getUpdateCount == 0 ){ insertStmt.setString(1, value.id) insertStmt.setDouble(2, value.temperature) insertStmt.execute() } } /** * 關閉資源 */ override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } } // 4.2、使用流進行sink添加 sensorStream.addSink(jdbcSinkFunction) // 啟動執行環境,執行任務 env.execute("SinkDemo")
