Flink之Transform操作


import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object TransformTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) //設置全局並行度為1

import org.apache.flink.api.scala._
val streamFromFile = env.readTextFile("sensor1.txt")

//1.基本轉換算子和簡單聚合算子
val dataStream = streamFromFile.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
//注意觀察結果,flink是來一條數據處理一條,所以不會只看到最終求和后的結果
dataStream.keyBy(0).sum(2).print()
dataStream.keyBy("id").sum("temperature").print() //方式二
//例子:輸出當前傳感器最新的溫度加10,而時間戳是上一次數據的時間戳加1
dataStream.keyBy(0).reduce((x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature + 10)).print() //x和y分別代表當前值和新來的值

//2.多流轉換算子
//split分流
val splitStream = dataStream.split(sensorData => {
if (sensorData.temperature > 30)
Seq("high")
else
Seq("low")
})
val high = splitStream.select("high")
val low = splitStream.select("low")
val all = splitStream.select("high", "low")
high.print("high temperature")
low.print("low temperature")
all.print("all")

//合並: connect和union
/*
* 1.union之前兩個流的類型必須是一樣, connect可以不一樣, 在之后的coMap中再去調整成為一樣的
* 2. connect只能操作兩個流, union可以操作多個
*/
val warning = high.map(x => (x.id, x.temperature))
val connectedStream = warning.connect(low)
val coMap = connectedStream.map(
warningData => (warningData._1, warningData._2, "warning"),
safeData => (safeData.id, "safe")
)
coMap.print()

val unionStream = high.union(low)
unionStream.print()

// streamFromFile.map(data => {
// val len = data.split(",")
// len(0) + " " + len(1)
// }).print()

//自定義函數類
dataStream.filter(new MyFilter).print()

env.execute("transform test")
}
}

class MyFilter() extends FilterFunction[SensorReading] {
override def filter(t: SensorReading): Boolean = {
t.id.startsWith("sensor_1")
}
}

有幫助的歡迎評論打賞哈,謝謝!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM