提醒:
1、連接kafka,一定要注意依賴版本,否則即使程序啟動沒有報錯,也不會接受到數據 2、kafka依賴除了Flink和kafka的,建議加上kafka-client的依賴,對應版本也需一致
-----------------------------------------------------------------------
1、連接es,需要使用flink-connector-elasticsearch6_2.11、elasticsearch-rest-high-level-client、elasticsearch-rest-client、elasticsearch,否則無法啟動
數據格式
sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9
##########保存至文件##############
1、處理主類
package sink import com.yangwj.api.SensorReading import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ /** * @author yangwj * @date 2021/1/6 21:17 * @version 1.0 */ object FileSink { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" val input: DataStream[String] = env.readTextFile(inputFile) val dataStream = input.map(data => { val arr: Array[String] = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) //1、過時方法 val savePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensorToFile" dataStream.writeAsCsv(savePath) //2、分布式方法 val saveDistributePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\saveDistributePath" dataStream.addSink(StreamingFileSink.forRowFormat( new Path(saveDistributePath), new SimpleStringEncoder[SensorReading]()) .build() ) env.execute("FileSink Test") } }
##########保存至Es##############
1、依賴
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.3.1</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>6.3.1</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.6.0</version> </dependency>
2、處理主類
package sink import java.util import com.yangwj.api.SensorReading import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests /** * @author yangwj * @date 2021/1/6 22:05 * @version 1.0 */ object EsSink { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" val input: DataStream[String] = env.readTextFile(inputFile) val dataStream = input.map(data => { val arr: Array[String] = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) val httpHosts = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("localhost",9200)) //自定義寫入Es的function val myEsSinkFunc: ElasticsearchSinkFunction[SensorReading] = new ElasticsearchSinkFunction[SensorReading] { //process 數據處理方法,並發送至Es override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { //包裝一個Map作為data source val dataSource = new util.HashMap[String, String]() dataSource.put("id",t.id) dataSource.put("temperature",t.temperature.toString) dataSource.put("ts",t.timestamp.toString) //創建indexRequest,用於發送http請求 val request: IndexRequest = Requests.indexRequest().index("sensor").`type`("_doc").source(dataSource) requestIndexer.add(request) } }
esLink.setBulkFlushMaxActions(1) //表示將buffer中的數據sink到es
dataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHosts,myEsSinkFunc).build())
env.execute("Es Sink Test") } }
##########保存至Kafka##############
1、依賴(注意:一定要注意版本的問題,否則程序啟動沒有錯誤,也接受不到kafka的數據)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
2、處理主類
package sink import java.util.Properties import com.yangwj.api.SensorReading import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer010, FlinkKafkaProducer011} /** * @author yangwj * @date 2021/1/6 21:32 * @version 1.0 */ object KafkaSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 從kafka讀取數據 val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "consumer-group") val stream = env.addSource( new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties) ) // 先轉換成樣例類類型(簡單轉換操作) val dataStream = stream .map( data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString } ) dataStream.addSink( new FlinkKafkaProducer011[String]("localhost:9092", "sinktest", new SimpleStringSchema()) ) env.execute("kafka sink test") } }
##########保存至Mysql##############
1、依賴
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
2、處理主類
package sink import java.sql.{Connection, DriverManager, PreparedStatement} import com.yangwj.api.SensorReading import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ /** * @author yangwj * @date 2021/1/6 22:27 * @version 1.0 * 由於官網沒有提供,無法保證數據一致性 */ object MysqlSink { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" val input: DataStream[String] = env.readTextFile(inputFile) val dataStream = input.map(data => { val arr: Array[String] = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) dataStream.addSink( new MyJdbcSinkFunc() ) env.execute("mysql Sink Test") } } class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{ // 定義連接、預編譯語句 var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456") insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)") updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?") } override def invoke(value: SensorReading): Unit = { // 先執行更新操作,查到就更新 updateStmt.setDouble(1, value.temperature) updateStmt.setString(2, value.id) updateStmt.execute() // 如果更新沒有查到數據,那么就插入 if( updateStmt.getUpdateCount == 0 ){ insertStmt.setString(1, value.id) insertStmt.setDouble(2, value.temperature) insertStmt.execute() } } override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } }
##########保存至Redis##############
1、依賴
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
2、處理主類
package sink import com.yangwj.api.SensorReading import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} /** * @author yangwj * @date 2021/1/6 21:48 * @version 1.0 */ object RedisSink { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" val input: DataStream[String] = env.readTextFile(inputFile) val dataStream = input.map(data => { val arr: Array[String] = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) //1、redis配置 val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("localhost") .setPort(6379) .build() dataStream.addSink(new RedisSink[SensorReading](config , new MyRedisMapper())) } } // 2、定義一個RedisMapper class MyRedisMapper extends RedisMapper[SensorReading]{ // 定義保存數據寫入redis的命令,HSET 表名 key value override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET, "sensor_temp") } // 將溫度值指定為value override def getValueFromData(data: SensorReading): String = data.temperature.toString // 將id指定為key override def getKeyFromData(data: SensorReading): String = data.id }