我們在生產實踐中經常會遇到這樣的場景,需把輸入源按照需要進行拆分,比如我期望把訂單流按照金額大小進行拆分,或者把用戶訪問日志按照訪問者的地理位置進行拆分等。面對這樣的需求該如何操作呢?
大部分的DataStream API的算子的輸出時單一輸出,也就是某種數據類型的流。除了split算子(使用split切分過的流是不能被二次切分的),可以將一條流分成多條流,這些流的數據類型也都相同。processfunction的side outputs功能可以產生多條流,並且這些流的數據類型可以不一樣。一個side output可以定義為OutputTag[X]對象,X是輸出流的數據類型。processfunction可以通過Context對象發送一個事件到一個或者多個sideouputs.
SideOutPut 分流
SideOutPut 是 Flink 框架為我們提供的最新的也是最為推薦的分流方法,在使用 SideOutPut 時,需要按照以下步驟進行:
定義 OutputTag
調用特定函數進行數據拆分
ProcessFunction
KeyedProcessFunction
CoProcessFunction
KeyedCoProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
在這里我們使用 ProcessFunction 來講解如何使用 SideOutPut:
package com.wyh.processFunctionApi
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object SideOutputTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.socketTextStream("localhost", 7777)
//Transform操作
val dataStream: DataStream[SensorReading] = stream.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
//===到來的數據是升序的,准時發車,用assignAscendingTimestamps
//指定哪個字段是時間戳 需要的是毫秒 * 1000
// .assignAscendingTimestamps(_.timestamp * 1000)
//===處理亂序數據
// .assignTimestampsAndWatermarks(new MyAssignerPeriodic())
//==底層也是周期性生成的一個方法 處理亂序數據 延遲1秒種生成水位 同時分配水位和時間戳 括號里傳的是等待延遲的時間
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(t: SensorReading): Long = {
t.timestamp * 1000
}
})
val processedStream = dataStream.process(new FreezingAlert())
//這里打印的是主流
processedStream.print("process data")
//打印側輸出流
processedStream.getSideOutput(new OutputTag[String]("Freezing alert")).print()
processedStream.getSideOutput(new OutputTag[String]("commen data")).print()
//dataStream.print("input data")
env.execute("window Test")
}
}
/**
* 冰點報警 如果小於32F,輸出報警信息到側輸出流
*/
//輸出的類型是主輸出流的數據類型
class FreezingAlert() extends ProcessFunction[SensorReading, SensorReading] {
lazy val alertOutput: OutputTag[String] = new OutputTag[String]("Freezing alert")
lazy val commenOutput: OutputTag[String] = new OutputTag[String]("commen data")
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if (value.temperature < 32.0) {
//側輸出流
ctx.output(alertOutput, value.id + "低溫報警!!!此時溫度為:" + value.temperature)
} else if (value.temperature >= 32.0) {
ctx.output(commenOutput, value.id + "正常溫度。。此時溫度為:" + value.temperature)
} else {
//主流
out.collect(value)
}
}
}
在Linux命令行中輸入 nc -lk 7777開啟一個服務
輸入數據:
注意:在主程序中,直接print()打印的主輸出流,想要打印側輸出流:
//這里打印的是主流 processedStream.print("process data") //打印側輸出流 processedStream.getSideOutput(new OutputTag[String]("Freezing alert")).print() processedStream.getSideOutput(new OutputTag[String]("commen data")).print()