需求
求每個小時內用戶點擊量的TOP3,每五分鍾更新一次
bean:
利用底層API實現
import java.sql.Timestamp import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer object TopHotItems { case class UserBehavior(userId: Long,//用戶id itemId: Long,//商品id categoryId: Long,//城市id behavior: String,//行為,pv為瀏覽 timestamp: Long) case class ItemViewCount(itemId: Long, windowStart: Long,//window開始時間 windowEnd: Long,//window結束時間 count: Long)//瀏覽量 def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream: DataStream[String] = env .readTextFile("filePath") //封裝對象,並過濾出pv類型數據 .map(line => { val arr = line.split(",") UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toLong, arr(3), arr(4).toLong * 1000L) }) .filter(_.behavior.equals("pv")) .assignAscendingTimestamps(_.timestamp) // 分配升序時間戳 DataStream .keyBy(_.itemId) // 使用商品ID分流 .timeWindow(size = Time.hours(1), slide = Time.minutes(5)) // 按需求開窗 .aggregate(preAggregator = new CountAgg, windowFunction = new WindowResult) // 增量聚合和全窗口聚合結合,最大化優化內存的使用 .keyBy(_.windowEnd) // 按窗口結束時間分流,這樣就能保證每條流中的數據都是同一個窗口的數據 .process(keyedProcessFunction = new TopN(3)) // DataStream stream.print() env.execute() } class CountAgg extends AggregateFunction[UserBehavior, Long, Long] { override def createAccumulator(): Long = 0L override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1 override def getResult(accumulator: Long): Long = accumulator override def merge(a: Long, b: Long): Long = a + b } class WindowResult extends ProcessWindowFunction[Long, ItemViewCount, Long, TimeWindow] { override def process(key: Long, context: Context, elements: Iterable[Long], out: Collector[ItemViewCount]): Unit = { //封裝對象並附加窗口結束時間信息 out.collect(ItemViewCount(key, context.window.getStart, context.window.getEnd, elements.head)) } } class TopN(val topSize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] { // 只針對當前key可見的 lazy val listState: ListState[ItemViewCount] = getRuntimeContext.getListState( new ListStateDescriptor[ItemViewCount]("list-state", Types.of[ItemViewCount]) ) override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = { listState.add(value) // 不會重復注冊 ctx.timerService().registerEventTimeTimer(value.windowEnd + 100) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { //如果擔心內存溢出可以考慮TreeSet,但是一般不會有那么大的數據量 val allItems: ListBuffer[ItemViewCount] = ListBuffer() import scala.collection.JavaConversions._ // 將列表狀態中的數據轉移到內存 for (item <- listState.get) { allItems += item } // 清空狀態 listState.clear() // 使用瀏覽量降序排列 val sortedItems: ListBuffer[ItemViewCount] = allItems.sortBy(-_.count).take(topSize) val result = new StringBuffer() result.append("----------------------\n") .append(s"time: ${timestamp -100} \n") for ( i <- 0 until allItems.size){ result.append(s" NO.${i+1} 商品id ${allItems(i).itemId} 點擊量 ${allItems(i).count} \n") } result.append("----------------------\n") Thread.sleep(5000) out.collect(result.toString) } } }
利用Flink SQL實現
import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala._ import org.apache.flink.table.api.{EnvironmentSettings, Tumble} import org.apache.flink.types.Row // 使用sql實現實時top n需求 object HotItemsSQL { case class UserBehavior(userId: Long, itemId: Long, categoryId: Long, behavior: String, timestamp: Long) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 新建表環境 val settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(env, settings) val stream = env .readTextFile("/Users/yuanzuo/Desktop/flink-tutorial/FlinkSZ1128/src/main/resources/UserBehavior.csv") .map(line => { val arr = line.split(",") UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toLong, arr(3), arr(4).toLong * 1000L) }) .filter(_.behavior.equals("pv")) .assignAscendingTimestamps(_.timestamp) // 分配升序時間戳 DataStream // 創建臨時表 tableEnv.createTemporaryView("t", stream, 'itemId, 'timestamp.rowtime as 'ts) // top n只有blink planner支持 // 最內部的子查詢實現了:stream.keyBy(_.itemId).timeWindow(Time.hours(1), Time.minutes(5)).aggregate(new CountAgg, new WindowResult) // 倒數第二層子查詢:.keyBy(_.windowEnd).process(Sort) // 最外層:取出前三名 val result = tableEnv .sqlQuery( """ |SELECT * |FROM ( | SELECT *, | ROW_NUMBER() OVER (PARTITION BY windowEnd ORDER BY icount DESC) as row_num | FROM ( | SELECT itemId, count(itemId) as icount, | HOP_END(ts, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) as windowEnd | FROM t GROUP BY itemId, HOP(ts, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) | ) |) |WHERE row_num <= 3 |""".stripMargin) result .toRetractStream[Row] .filter(_._1 == true) .print() env.execute() } }