大部分的 DataStream API 的算子的輸出是單一輸出,也就是某種數據類型的流。除了 split 算子,可以將一條流分成多條流,這些流的數據類型也都相同。processfunction 的 side outputs 功能可以產生多條流,並且這些流的數據類型可以不一樣。一個 sideoutput 可以定義為 OutputTag[X]對象,X 是輸出流的數據類型。processfunction 可以通過 Context 對象發射一個事件到一個或者多個 side outputs。
下面的代碼演示了低於32F的溫度信息進入到測輸出流"freezing alert"中。
object SideOutputTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val socketStream = env.socketTextStream("hadoop102", 7777)
val dataStream: DataStream[SensorReading] = socketStream.map(d => {
val arr = d.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
})
//低溫報警處理
val processStream = dataStream.process(new FreezingAlert)
//打印主輸出流
processStream.print("process stream")
//打印側輸出流。先得到某個測輸出流。
processStream.getSideOutput(new OutputTag[String]("freezing alert")).print("freezing alert")
env.execute("window test")
}
}
class FreezingAlert extends ProcessFunction[SensorReading, SensorReading] {
lazy val tag = new OutputTag[String]("freezing alert")
override def processElement(value: SensorReading,
ctx: ProcessFunction[SensorReading, SensorReading]#Context,
collector: Collector[SensorReading]): Unit = {
if (value.temperature<32){
//側輸出流
ctx.output(tag,"freezing alert for " + value.temperature)
}else{
//主輸出流
collector.collect(value)
}
}
}
端口數據
[atguigu@hadoop102 ~]$ nc -lk 7777 sensor_1, 1547718200, 30 sensor_1, 1547718200, 25 sensor_1, 1547718200, 35
控制台打印
freezing alert> freezing alert for 30.0 freezing alert> freezing alert for 25.0 process stream> SensorReading(sensor_1,1547718200,35.0)
