一、Split 和 Select (使用split切分過的流是不能被二次切分的)
DataStream --> SplitStream : 根據特征把一個DataSteam 拆分成兩個或者多個DataStream.
SplitStream --> DataStream:從一個SplitStream中獲取一個或者多個DataStream。
二、Connect 和 CoMap / CoFlatMap
DataStream,DataStream --> ConnectedStream:連接兩個保持他們類型的數據流,兩個數據流被Connect之后,只是被放在了一個同一個流中,內部依然保持着各自的數據和形式,不發生變化,兩個流相互獨立。
ConnectedStream --> DataStream:作用與 ConnectedStream上,功能與map和Flatmap一樣,對 ConnectedStream中的每一個Stream分別進行map和flatmap處理。
三、Union
DataStream --> DataStream:對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新DataStream
注意:Connect 與 Union區別:
1、Union之前兩個流的類型必須是一樣的,Conect可以不一樣,並且Connect之后進行coMap中調整為一樣的。
2、Connect只能操作兩個流,Union可以操作多個。
綜合代碼:(可直接運行,數據在注釋中)
package com.wyh.streamingApi.Transform import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.scala._ //溫度傳感器讀數樣例類 case class SensorReading(id: String, timestamp: Long, temperature: Double) object TransformTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) /** * sensor_1,1547718199,35.80018327300259 * sensor_6,1547718201,15.402984393403084 * sensor_7,1547718202,6.720945201171228 * sensor_10,1547718205,38.1010676048934444 * sensor_1,1547718199,35.1 * sensor_1,1547718199,31.0 * sensor_1,1547718199,39 */ val streamFromFile = env.readTextFile("F:\\flink-study\\wyhFlinkSD\\data\\sensor.txt") //基本轉換算子和滾動聚合算子======================================================================================= /** * map keyBy sum */ val dataStream: DataStream[SensorReading] = streamFromFile.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) // dataStream.keyBy(0).sum(2).printToErr("keyBy test") //scala強類型語言 只有_.id 可以指定返回類型 val aggStream: KeyedStream[SensorReading, String] = dataStream.keyBy(_.id) val stream1: DataStream[SensorReading] = aggStream.sum("temperature") // stream1.printToErr("scala強類型語言") /** * reduce * * 輸出當前傳感器最新的溫度要加10,時間戳是上一次數據的時間加1 */ aggStream.reduce(new ReduceFunction[SensorReading] { override def reduce(t: SensorReading, t1: SensorReading): SensorReading = { SensorReading(t.id, t.timestamp + 1, t1.temperature + 10) } }) //.printToErr("reduce test") //多流轉換算子==================================================================================================== /** * 分流 * split select * DataStream --> SplitStream --> DataStream * * 需求:傳感器數據按照溫度高低(以30度為界),拆分成兩個流 */ val splitStream = dataStream.split(data => { //蓋上戳 后面進行分揀 if (data.temperature > 30) { Seq("high") } else if (data.temperature < 10) { Seq("low") } else { Seq("health") } }) //根據戳進行分揀 val highStream = splitStream.select("high") val lowStream = splitStream.select("low") val healthStream = splitStream.select("health") //可以傳多個參數,一起分揀出來 val allStream = splitStream.select("high", "low") // highStream.printToErr("high") // lowStream.printToErr("low") // allStream.printToErr("all") // healthStream.printToErr("healthStream") /** * 合並 注意: Connect 只能進行兩條流進行合並,但是比較靈活,不同流的數據結構可以不一樣 * Connect CoMap/CoFlatMap * * DataStream --> ConnectedStream --> DataStream */ val warningStream = highStream.map(data => (data.id, data.temperature)) val connectedStream = warningStream.connect(lowStream) val coMapDataStream = connectedStream.map( warningData => (warningData._1, warningData._2, "溫度過高報警!!"), lowData => (lowData.id, lowData.temperature, "溫度過低報警===") ) // coMapDataStream.printToErr("合並流") /** * 合並多條流 注意: 要求數據結構必須要一致,一樣 * * Union DataStream --> DataSteam 就沒有一個中間轉換操作了 * */ val highS = highStream.map(h => (h.id, h.timestamp, h.temperature, "溫度過高報警!!")) val lowS = lowStream.map(l => (l.id, l.timestamp, l.temperature, "溫度過低報警===")) val healthS = healthStream.map(l => (l.id, l.timestamp, l.temperature, "健康")) val unionStream = highS.union(lowS).union(healthS) unionStream.printToErr("union合並") env.execute("transform test") } }