案例功能說明
通過socketTextStream讀取9999端口數據,統計在一定時間內不同類型商品的銷售總額度,如果持續銷售額度為0,則執行定時器通知老板,是不是賣某種類型商品的員工偷懶了(只做功能演示,根據個人業務來使用,比如統計UV等操作)。
ProcessFunction是一個低階的流處理操作,它可以訪問流處理程序的基礎構建模塊: 1.事件(event)(流元素)。 2.狀態(state)(容錯性,一致性,僅在keyed stream中)。 3.定時器(timers)(event time和processing time, 僅在keyed stream中)。
案例代碼
使用ValueState記錄了狀態信息,每次來商品都會進行總額度累加。
商品第一次進入的時候會注冊一個定時器,每隔10秒執行一次,定時器做預警功能,如果十秒內商品銷售等於0,我們則進行預警。

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object ProcessFuncationScala { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = env.socketTextStream("localhost", 9999) val typeAndData: DataStream[(String, String)] = stream.map(x => (x.split(",")(0), x.split(",")(1))).setParallelism(4) typeAndData.keyBy(0).process(new MyprocessFunction()).print("結果") env.execute() } /** * 實現: * 根據key分類,統計每個key進來的數據量,定期統計數量,如果數量為0則預警 */ class MyprocessFunction extends KeyedProcessFunction[Tuple,(String,String),String]{ //統計間隔時間 val delayTime : Long = 1000 * 10 lazy val state : ValueState[(String,Long)] = getRuntimeContext.getState[(String,Long)](new ValueStateDescriptor[(String, Long)]("cjcount",classOf[Tuple2[String,Long]])) override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = { printf("定時器觸發,時間為:%d,狀態為:%s,key為:%s\n",timestamp,state.value(),ctx.getCurrentKey) if(state.value()._2==0){ //該時間段數據為0,進行預警 printf("類型為:%s,數據為0,預警\n",state.value()._1) } //定期數據統計完成后,清零 state.update(state.value()._1,0) //再次注冊定時器執行 val currentTime: Long = ctx.timerService().currentProcessingTime() ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime) } override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), String]#Context, out: Collector[String]): Unit = { printf("狀態值:%s,state是否為空:%s\n",state.value(),(state.value()==null)) if(state.value() == null){ //獲取時間 val currentTime: Long = ctx.timerService().currentProcessingTime() //注冊定時器十秒后觸發 ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime) printf("定時器注冊時間:%d\n",currentTime+10000L) state.update(value._1,value._2.toInt) } else{ //統計數據 val key: String = state.value()._1 var count: Long = state.value()._2 count += value._2.toInt //更新state值 state.update((key,count)) } println(getRuntimeContext.getTaskNameWithSubtasks+"->"+value) printf("狀態值:%s\n",state.value()) //返回處理后結果 out.collect("處理后返回數據->"+value) } } }
案例測試
10秒內輸入四條數據
帽子,12 帽子,12 鞋,10 鞋,10
通過我們打印我們會發現統計完成
定時器觸發,時間為:1586005420511,狀態為:(鞋,20),key為:(鞋)
定時器觸發,時間為:1586005421080,狀態為:(帽子,24),key為:(帽子)
如果我們10秒內不輸入數據,則會提示數據為0,進行預警
定時器觸發,時間為:1586005406244,狀態為:(帽子,0),key為:(帽子) 類型為:帽子,數據為0,預警 定時器觸發,時間為:1586005406244,狀態為:(鞋,0),key為:(鞋) 類型為:鞋,數據為0,預警
數據傾斜問題
到這里我們已經實現了定期統計功能,但有沒有發現,如果帽子分配在task1執行,鞋在task2執行,鞋一天進來1億條數據,帽子進來1條數據,我們會出現嚴重的數據傾斜問題。
我們實際看一下具體問題,計算結果我們就先不看了,直接看數據分配問題
三個task階段 , Socket是單並行的source,我們將並行度改為4
輸入數據:1條 帽子,10 ;50條 鞋,10
我們看Map階段,數據是均衡的,因為這里還沒有進行keyby
我們再看keyby后的task
我們發現50條數據都在ID為3的subtask中,出現了嚴重數據傾斜問題。
這種問題我們可以進行兩階段keyby解決該問題。
兩階段keyby方法
數據傾斜如左圖所示。而我們期望的是如右圖所示。
但我們的需要根據key進行聚合統計,那么把相同的key放在不同的subtask如何統計?
1.首先將key打散,我們加入將key轉化為 key-隨機數 ,保證數據散列 2.對打散后的數據進行聚合統計,這時我們會得到數據比如 : (key1-12,1),(key1-13,19),(key1-1,20),(key2-123,11),(key2-123,10) 3.將散列key還原成我們之前傳入的key,這時我們的到數據是聚合統計后的結果,不是最初的原數據 4.二次keyby進行結果統計,輸出到addSink
打散key代碼實現

import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.windowing.WindowFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector object ProcessFunctionScalaV2 { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(2000) val stream: DataStream[String] = env.socketTextStream("localhost", 9999) val typeAndData: DataStream[(String, Long)] = stream.map(x => (x.split(",")(0), x.split(",")(1).toLong)) val dataStream: DataStream[(String, Long)] = typeAndData .map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2)) val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1) .timeWindow(Time.seconds(10)) .aggregate(new CountAggregate()) keyByAgg.print("第一次keyby輸出") val result: DataStream[DataJast] = keyByAgg.map(data => { val newKey: String = data.key.substring(0, data.key.indexOf("-")) println(newKey) DataJast(newKey, data.count) }).keyBy(_.key) .process(new MyProcessFunction()) result.print("第二次keyby輸出") env.execute() } case class DataJast(key :String,count:Long) //計算keyby后,每個Window中的數據總和 class CountAggregate extends AggregateFunction[(String, Long),DataJast, DataJast] { override def createAccumulator(): DataJast = { println("初始化") DataJast(null,0) } override def add(value: (String, Long), accumulator: DataJast): DataJast = { if(accumulator.key==null){ printf("第一次加載,key:%s,value:%d\n",value._1,value._2) DataJast(value._1,value._2) }else{ printf("數據累加,key:%s,value:%d\n",value._1,accumulator.count+value._2) DataJast(value._1,accumulator.count + value._2) } } override def getResult(accumulator: DataJast): DataJast = { println("返回結果:"+accumulator) accumulator } override def merge(a: DataJast, b: DataJast): DataJast = { DataJast(a.key,a.count+b.count) } } /** * 實現: * 根據key分類,統計每個key進來的數據量,定期統計數量 */ class MyProcessFunction extends KeyedProcessFunction[String,DataJast,DataJast]{ val delayTime : Long = 1000L * 30 lazy val valueState:ValueState[Long] = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("ccount",classOf[Long])) override def processElement(value: DataJast, ctx: KeyedProcessFunction[String, DataJast, DataJast]#Context, out: Collector[DataJast]): Unit = { if(valueState.value()==0){ valueState.update(value.count) printf("運行task:%s,第一次初始化數量:%s\n",getRuntimeContext.getIndexOfThisSubtask,value.count) val currentTime: Long = ctx.timerService().currentProcessingTime() //注冊定時器 ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime) }else{ valueState.update(valueState.value()+value.count) printf("運行task:%s,更新統計結果:%s\n" ,getRuntimeContext.getIndexOfThisSubtask,valueState.value()) } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, DataJast, DataJast]#OnTimerContext, out: Collector[DataJast]): Unit = { //定時器執行,可加入業務操作 printf("運行task:%s,觸發定時器,30秒內數據一共,key:%s,value:%s\n",getRuntimeContext.getIndexOfThisSubtask,ctx.getCurrentKey,valueState.value()) //定時統計完成,初始化統計數據 valueState.update(0) //注冊定時器 val currentTime: Long = ctx.timerService().currentProcessingTime() ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime) } } }
對key進行散列
val dataStream: DataStream[(String, Long)] = typeAndData
.map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2))
設置窗口滾動時間,每隔十秒統計一次每隔key下的數據總量
val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1) .timeWindow(Time.seconds(10)) .aggregate(new AverageAggregate()) keyByAgg.print("第一次keyby輸出")
還原key,並進行二次keyby,對數據總量進行累加
val result: DataStream[DataJast] = keyByAgg.map(data => { val newKey: String = data.key.substring(0, data.key.indexOf("-")) println(newKey) DataJast(newKey, data.count) }).keyBy(_.key) .process(new MyProcessFunction())
我們看下優化后的狀態
先看下第一map,直接從端口拿數據,這不涉及keyby,所以這個沒影響
再看下第一次keyby后的結果,因為我們散列后,flink根據哈希進行分配,所以數據不是百分之百平均,但是很明顯基本上已經均衡了,不會出現這里1一條,那里1條這種狀況。
再看下第二次keyby,這里會發現我們ID的2的subtask有820條數據,其他的沒有數據;這里是正常現象,因為我們是對第一次聚合后的數據進行keyby統計,所以這里的數據大小會非常小,比如我們原始數據一條數據有1M大小,1000條數據就1個G,業務往往還有其他操作,我們再第一次keyby 散列時處理其他邏輯(比如ETL等等操作),最終將統計結果輸出給第二次keyby,很可能1個G的數據,最終只有1kb,這比我們將1個G的數據放在一個subtask中處理好很多。
上面我們自定義了MyProcessFunction方法,設置每30秒執行一次,實際業務場景,我們可能會設置一小時執行一次。
至此我們既保證了數據定時統計,也保證了數據不傾斜問題。
https://blog.csdn.net/zhangshenghang/article/details/105322423