Flink學習(七) 多流轉換算子 拆分合並流


一、Split 和 Select (使用split切分過的流是不能被二次切分的)

 

 

 DataStream --> SplitStream : 根據特征把一個DataSteam 拆分成兩個或者多個DataStream.

 

 

 SplitStream --> DataStream:從一個SplitStream中獲取一個或者多個DataStream。

 

二、Connect 和 CoMap / CoFlatMap

 

 

 DataStream,DataStream --> ConnectedStream:連接兩個保持他們類型的數據流,兩個數據流被Connect之后,只是被放在了一個同一個流中,內部依然保持着各自的數據和形式,不發生變化,兩個流相互獨立。

 

 

 ConnectedStream --> DataStream:作用與 ConnectedStream上,功能與map和Flatmap一樣,對 ConnectedStream中的每一個Stream分別進行map和flatmap處理。

 

三、Union

 

 

 DataStream --> DataStream:對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新DataStream

 

注意:Connect 與 Union區別:

1、Union之前兩個流的類型必須是一樣的,Conect可以不一樣,並且Connect之后進行coMap中調整為一樣的。

2、Connect只能操作兩個流,Union可以操作多個。

 

綜合代碼:(可直接運行,數據在注釋中)

package com.wyh.streamingApi.Transform

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._


//溫度傳感器讀數樣例類
case class SensorReading(id: String, timestamp: Long, temperature: Double)

object TransformTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    /**
      * sensor_1,1547718199,35.80018327300259
      * sensor_6,1547718201,15.402984393403084
      * sensor_7,1547718202,6.720945201171228
      * sensor_10,1547718205,38.1010676048934444
      * sensor_1,1547718199,35.1
      * sensor_1,1547718199,31.0
      * sensor_1,1547718199,39
      */
    val streamFromFile = env.readTextFile("F:\\flink-study\\wyhFlinkSD\\data\\sensor.txt")


    //基本轉換算子和滾動聚合算子=======================================================================================
    /**
      * map keyBy sum
      */
    val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {
      val dataArray = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })

    //    dataStream.keyBy(0).sum(2).printToErr("keyBy test")

    //scala強類型語言 只有_.id 可以指定返回類型
    val aggStream: KeyedStream[SensorReading, String] = dataStream.keyBy(_.id)
    val stream1: DataStream[SensorReading] = aggStream.sum("temperature")
    //    stream1.printToErr("scala強類型語言")


    /**
      * reduce
      *
      * 輸出當前傳感器最新的溫度要加10,時間戳是上一次數據的時間加1
      */
    aggStream.reduce(new ReduceFunction[SensorReading] {
      override def reduce(t: SensorReading, t1: SensorReading): SensorReading = {
        SensorReading(t.id, t.timestamp + 1, t1.temperature + 10)
      }
    }) //.printToErr("reduce test")


    //多流轉換算子====================================================================================================
    /**
      * 分流
      * split select
      * DataStream --> SplitStream --> DataStream
      *
      * 需求:傳感器數據按照溫度高低(以30度為界),拆分成兩個流
      */
    val splitStream = dataStream.split(data => {
      //蓋上戳 后面進行分揀
      if (data.temperature > 30) {
        Seq("high")
      } else if (data.temperature < 10) {
        Seq("low")
      } else {
        Seq("health")
      }
    })

    //根據戳進行分揀
    val highStream = splitStream.select("high")
    val lowStream = splitStream.select("low")
    val healthStream = splitStream.select("health")

    //可以傳多個參數,一起分揀出來
    val allStream = splitStream.select("high", "low")


    //    highStream.printToErr("high")
    //    lowStream.printToErr("low")
    //    allStream.printToErr("all")
    //    healthStream.printToErr("healthStream")

    /**
      * 合並      注意: Connect 只能進行兩條流進行合並,但是比較靈活,不同流的數據結構可以不一樣
      * Connect CoMap/CoFlatMap
      *
      * DataStream --> ConnectedStream --> DataStream
      */
    val warningStream = highStream.map(data => (data.id, data.temperature))
    val connectedStream = warningStream.connect(lowStream)

    val coMapDataStream = connectedStream.map(
      warningData => (warningData._1, warningData._2, "溫度過高報警!!"),
      lowData => (lowData.id, lowData.temperature, "溫度過低報警===")
    )

    //    coMapDataStream.printToErr("合並流")

    /**
      * 合並多條流  注意: 要求數據結構必須要一致,一樣
      *
      * Union   DataStream --> DataSteam    就沒有一個中間轉換操作了
      *
      */

    val highS = highStream.map(h => (h.id, h.timestamp, h.temperature, "溫度過高報警!!"))
    val lowS = lowStream.map(l => (l.id, l.timestamp, l.temperature, "溫度過低報警==="))
    val healthS = healthStream.map(l => (l.id, l.timestamp, l.temperature, "健康"))

    val unionStream = highS.union(lowS).union(healthS)

    unionStream.printToErr("union合並")


    env.execute("transform test")
  }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM