一、Source
輸入 Controlling Latency(控制延遲)
默認情況下,流中的元素並不會一個一個的在網絡中傳輸(這會導致不必要的網絡流量消耗) ,而是緩存起來,緩存 的大小可以在Flink的配置文件、ExecutionEnvironment、或者某個算子上進行配置(默認為100ms)。
- 好處:提高吞吐
- 壞處:增加了延遲
- 為了最大吞吐量,可以設置 setBufferTimeout(-1),這會移除 timeout 機制,緩存中的數據一滿就會被發送 。
- 為了最小的延遲,可以將超時設置為 0 ,但是會有一些性能的損耗。
1.1、flink內置數據源
1、基於文件
env.readTextFile("file://path")
env.readFile(inputFormat, "file://path");
2、基於socket數據源
env.socketTextStream("localhost", 6666, '\n')
3. 基於Collection
import org.apache.flink.api.scala._ env.fromCollection(List(1,2,3)) env.fromElements(1,2,3) env.generateSequence(0, 1000)
1.2、自定義數據源
1、實現SourceFunction
SourceFunction 是非並行的,所以不能指定並行度,即不能用setParallelism(num) 算子;
SocketTextStreamFunction就是實現的SourceFunction ,源碼中也有詳細的用例;
import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.util.Collector // 需要加上這一行隱式轉換 否則在調用flatmap方法的時候會報錯 import org.apache.flink.api.scala._ // SourceFunction 是非並行的,所以不能指定並行度 即 不能 用 setParallelism(num) 算子 class MySourceFunction extends SourceFunction[String]{ var num: Long = 0 var isCancel: Boolean = true //在cancel的時候被執行,傳遞變量用於控制run方法中的執行 override def cancel(): Unit = { println("cancel") isCancel = false } // 調用run 方法向下游產生數據 override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (isCancel){ ctx.collect(s"xxd\t${num}") Thread.sleep(1000) num += 1 } } } object SourceFunctionWordCount{ def main(args: Array[String]): Unit = { val logPath: String = "/tmp/logs/flink_log" var conf: Configuration = new Configuration() // 開啟spark-webui conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件 conf.setString("web.log.path", logPath) // 配置 taskManager 的日志文件,否則打印日志到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath) // 配置有多少個solor conf.setString("taskmanager.numberOfTaskSlots","3") // 獲取本地運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
// 設置延時時間
env.setBufferTimeout(3000) // 定義數據源 val sourceDataStream: DataStream[String] = env.addSource(new MySourceFunction) // 增加 setParallelism就會報錯 // val sourceDataStream: DataStream[String] = env.addSource(new MySourceFunction).setParallelism(2) // 定義 operators,作用是解析數據,分組,窗口化,並且聚合就SUM val wordCountData: DataStream[(String, Int)] = sourceDataStream.flatMap(new FlatMapFunction[String, (String, Int)] { override def flatMap(value: String, out: Collector[(String, Int)]): Unit = { val strings: Array[String] = value.split(" ") for (f <- strings) { out.collect((f, 1)) } } }).setParallelism(2).keyBy(_._1).sum(1).setParallelism(2) // 定義sink打印輸出 wordCountData.print().setParallelism(2) // 打印任務執行計划 println(env.getExecutionPlan) // 運行 env.execute("Socket Window WordCount") } }
2、實現ParallelSourceFunction
ParallelSourceFunction是並行化的source所以能指定並行度
import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.util.Collector import org.apache.flink.api.scala._ //ParallelSourceFunction是並行化的source所以能指定並行度 class MyParallelSource extends ParallelSourceFunction[String] { var num = 0 var isCancel = true override def cancel(): Unit = { println("cancel") isCancel = false } override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (isCancel) { ctx.collect(s"xxd\t${num}") Thread.sleep(1000) num += 1 } } } object ParallelSourceWordCount { def main(args: Array[String]): Unit = { val logPath: String = "/tmp/logs/flink_log" var conf: Configuration = new Configuration() // 開啟spark-webui conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件 conf.setString("web.log.path", logPath) // 配置 taskManager 的日志文件,否則打印日志到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath) // 配置有多少個solor conf.setString("taskmanager.numberOfTaskSlots", "8") // 獲取本地運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) // 定義數據源 val sourceDataStream: DataStream[String] = env.addSource(new MyParallelSource).setParallelism(4) // 定義 operators,作用是解析數據,分組,窗口化,並且聚合就SUM val wordCountData: DataStream[(String, Int)] = sourceDataStream.flatMap(new FlatMapFunction[String, (String, Int)] { override def flatMap(value: String, out: Collector[(String, Int)]): Unit = { val strings: Array[String] = value.split(" ") for (f <- strings) { out.collect((f, 1)) } } }).setParallelism(2).keyBy(_._1).sum(1).setParallelism(2) // 定義sink打印輸出 wordCountData.print().setParallelism(2) // 打印任務執行計划 println(env.getExecutionPlan) // 運行 env.execute("Socket Window WordCount") } }
3、繼承RichParallelSourceFunction
RichParallelSourceFunction不僅實現了ParallelSourceFunction,還繼承了AbstractRichFunction
所以RichParallelSourceFunction不僅能夠並行化,還比ParallelSourceFunction增加了open和close方法、getRuntimeContext
import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.util.Collector import org.apache.flink.api.scala._ //RichParallelSourceFunction不但能並行化 //還比ParallelSourceFunction增加了open和close方法、getRuntimeContext class MyRichParallelSource extends RichParallelSourceFunction[String]{ var num = 0 var isCancel = true //初始化 在source開啟的時候執行一次,比如可以在這里開啟mysql的連接 override def open(parameters: Configuration): Unit = { println("open") num = 100 } //在source關閉的時候執行一次 //比如mysql連接用完了,給還回連接池 override def close(): Unit = { while (isMysql){ Thread.sleep(1000) println("close sleep") } println("close") num = 0 } //在輸出的時候被執行,傳遞變量用於控制run方法中的執行 //這個是被手動觸發,在執行完cancel之后,會再執行close override def cancel(): Unit = { println("cancel") isCancel = false } //調用run方法向下游產生數據 //手動cancel之后,不會等待run方法中處理結束而是強制執行close方法 //這樣就可能導致run方法中正在使用的連接被close了 //所以此時需要加一個處理完成標識,用於判斷是否可以進行close var isMysql = false override def run(ctx: SourceFunction.SourceContext[String]): Unit = { import scala.util.control.Breaks._ breakable{ while (isCancel){ println(getRuntimeContext.getIndexOfThisSubtask) // 獲取執行的taskid ctx.collect(s"xxd\t${num}") Thread.sleep(2000) num += 1 if (num > 1200){ break() } } } isMysql = true } } object RichParallelSourceWordCount{ def main(args: Array[String]): Unit = { val logPath: String = "/tmp/logs/flink_log" var conf: Configuration = new Configuration() // 開啟spark-webui conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件 conf.setString("web.log.path", logPath) // 配置 taskManager 的日志文件,否則打印日志到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath) // 配置有多少個solor conf.setString("taskmanager.numberOfTaskSlots","8") // 獲取本地運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) // 定義數據源 val sourceDataStream: DataStream[String] = env.addSource(new MyRichParallelSource).setParallelism(4) // 定義 operators,作用是解析數據,分組,窗口化,並且聚合就SUM val wordCountData: DataStream[(String, Int)] = sourceDataStream.flatMap(new FlatMapFunction[String, (String, Int)] { override def flatMap(value: String, out: Collector[(String, Int)]): Unit = { val strings: Array[String] = value.split(" ") for (f <- strings) { out.collect((f, 1)) } } }).setParallelism(2).keyBy(_._1).sum(1).setParallelism(2) // 定義sink打印輸出 wordCountData.slotSharingGroup("xxd").print().setParallelism(2) // 打印任務執行計划 println(env.getExecutionPlan) // 運行 env.execute("Socket Window WordCount") } }
二、sink
2.1、內置數據輸出源
1、基於文件
#使用TextOutputFormat stream.writeAsText("/path/to/file") #使用CsvOutputFormat stream.writeAsCsv("/path/to/file")
2、基於socket
stream.writeToSocket(host, port, SerializationSchema)
3、基於標准/錯誤輸出
#注: 線上應用杜絕使用,采用抽樣打印或者日志的方式
stream.print()
stream.printToErr()
2.2、自定義輸出源
1、實現SinkFunction
import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ class MySinkFunction extends SinkFunction[(String, Int)] { override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = { println(s"value:${value}," + s"processTime:${context.currentProcessingTime()}," + s"waterMark:${context.currentWatermark()}") } } object SinkFunctionWordCount { def main(args: Array[String]): Unit = { val logPath: String = "/tmp/logs/flink_log" var conf: Configuration = new Configuration() // 開啟spark-webui conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件 conf.setString("web.log.path", logPath) // 配置 taskManager 的日志文件,否則打印日志到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath) // 配置有多少個solor conf.setString("taskmanager.numberOfTaskSlots", "8") // 獲取本地運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) // 定義數據源 val input = env.fromElements("xxd xxd xxd") val output: DataStream[(String, Int)] = input.flatMap(f => f.split(" ")).map((_, 1)) // 使用自定義的sink output.addSink(new MySinkFunction) env.execute() } }
2、繼承RichSinkFunction
package com.xxd.flink.sink import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ class MyRichSinkFunction extends RichSinkFunction[(String, Int)]{ //在Sink開啟的時候執行一次,比如可以在這里開啟mysql的連接 override def open(parameters: Configuration): Unit = { println("open") } //在Sink關閉的時候執行一次 //比如mysql連接用完了,給還回連接池 override def close(): Unit = { println("close") } //調用invoke方法,執行數據的輸出 override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = { //在rich方法中可以使用getRuntimeContext方法得到比如廣播變量和累加器 //getRuntimeContext.getBroadcastVariable("") println(s"value:${value}," + s"processTime:${context.currentProcessingTime()}," + s"waterMark:${context.currentWatermark()}") } } object RichSinkFunctionWordCount { def main(args: Array[String]): Unit = { val logPath: String = "/tmp/logs/flink_log" var conf: Configuration = new Configuration() // 開啟spark-webui conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件 conf.setString("web.log.path", logPath) // 配置 taskManager 的日志文件,否則打印日志到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath) // 配置有多少個solor conf.setString("taskmanager.numberOfTaskSlots","8") // 獲取本地運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) // 定義數據源 val input = env.fromElements("xxd xxd xxd") val output: DataStream[(String, Int)] = input.flatMap(f => f.split(" ")).map((_, 1)) // 使用自定義的sink output.addSink(new MyRichSinkFunction) env.execute() } }
3、使用自定義OutputFormat,然后使用stream.writeUsingOutputFormat("自定義outputFormat")
import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} class MyOutPutFormat extends OutputFormat[(String, Int)]{ //配置outputformat override def configure(parameters: Configuration): Unit = { println("configure") } //在Sink開啟的時候執行一次,比如可以在這里開啟mysql的連接 override def open(taskNumber: Int, numTasks: Int): Unit = { //taskNumber第幾個tak,numTasks總任務數 println(s"taskNumber:${taskNumber},numTasks:${numTasks}") } //調用writeRecord方法,執行數據的輸出 override def writeRecord(record: (String,Int)): Unit = { println(record) } //在Sink關閉的時候執行一次 //比如mysql連接用完了,給還回連接池 override def close(): Unit = { println("close") } } object OutputFormatWordCount { def main(args: Array[String]): Unit = { val logPath: String = "/tmp/logs/flink_log" var conf: Configuration = new Configuration() // 開啟spark-webui conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日志文件 conf.setString("web.log.path", logPath) // 配置 taskManager 的日志文件,否則打印日志到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,logPath) // 配置有多少個solor conf.setString("taskmanager.numberOfTaskSlots","8") // 獲取本地運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) // 定義數據源 val input = env.fromElements("xxd xxd xxd") val output: DataStream[(String, Int)] = input.flatMap(f => f.split(" ")).map((_,1)) //使用自定義的outputFormat output.writeUsingOutputFormat(new MyOutPutFormat) env.execute() } }