3.1 模塊創建和數據准備
在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為NetworkTrafficAnalysis。在這個子模塊中,我們同樣並沒有引入更多的依賴,所以也不需要改動pom文件。
在src/main/目錄下,將默認源文件目錄java改名為scala。將apache服務器的日志文件apache.log復制到資源文件目錄src/main/resources下,我們將從這里讀取數據
3.2 代碼實現
我們現在要實現的模塊是 “實時流量統計”。對於一個電商平台而言,用戶登錄的入口流量、不同頁面的訪問流量都是值得分析的重要數據,而這些數據,可以簡單地從web服務器的日志中提取出來。我們在這里實現最基本的“頁面瀏覽數”的統計,也就是讀取服務器日志中的每一行log,統計在一段時間內用戶訪問url的次數。
具體做法為:每隔5秒,輸出最近10分鍾內訪問量最多的前N個URL。可以看出,這個需求與之前“實時熱門商品統計”非常類似,所以我們完全可以借鑒此前的代碼。
在src/main/scala下創建TrafficAnalysis.scala文件,新建一個單例對象。定義樣例類ApacheLogEvent,這是輸入的日志數據流;另外還有UrlViewCount,這是窗口操作統計的輸出數據類型。在main函數中創建StreamExecutionEnvironment 並做配置,然后從apache.log文件中讀取數據,並包裝成ApacheLogEvent類型。
需要注意的是,原始日志中的時間是“dd/MM/yyyy:HH:mm:ss”的形式,需要定義一個DateTimeFormat將其轉換為我們需要的時間戳格式:
.map(line => { val linearray = line.split(" ") val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") val timestamp = sdf.parse(linearray(3)).getTime ApacheLogEvent(linearray(0), linearray(2), timestamp, linearray(5), linearray(6)) })
完整代碼如下:
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String) case class UrlViewCount(url: String, windowEnd: Long, count: Long) object TrafficAnalysis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val stream = env // 以window下為例,需替換成自己的路徑 .readTextFile("YOUR_PATH\\resources\\apache.log") .map(line => { val linearray = line.split(" ") val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") val timestamp = simpleDateFormat.parse(linearray(3)).getTime ApacheLogEvent(linearray(0), linearray(2), timestamp, linearray(5), linearray(6)) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent] (Time.milliseconds(1000)) { override def extractTimestamp(t: ApacheLogEvent): Long = { t.eventTime } }) .keyBy("url") .timeWindow(Time.minutes(10), Time.seconds(5)) .aggregate(new CountAgg(), new WindowResultFunction()) .keyBy(1) .process(new TopNHotUrls(5)) .print() env.execute("Traffic Analysis Job") } class CountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] { override def createAccumulator(): Long = 0L override def add(apacheLogEvent: ApacheLogEvent, acc: Long): Long = acc + 1 override def getResult(acc: Long): Long = acc override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2 } class WindowResultFunction extends WindowFunction[Long, UrlViewCount, Tuple, TimeWindow] { override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long], collector: Collector[UrlViewCount]) : Unit = { val url: String = key.asInstanceOf[Tuple1[String]].f0 val count = aggregateResult.iterator.next collector.collect(UrlViewCount(url, window.getEnd, count)) } } class TopNHotUrls(topsize: Int) extends KeyedProcessFunction[Tuple, UrlViewCount, String] { private var urlState : ListState[UrlViewCount] = _ override def open(parameters: Configuration): Unit = { super.open(parameters) val urlStateDesc = new ListStateDescriptor[UrlViewCount]("urlState-state", classOf[UrlViewCount]) urlState = getRuntimeContext.getListState(urlStateDesc) } override def processElement(input: UrlViewCount, context: KeyedProcessFunction[Tuple, UrlViewCount, String]#Context, collector: Collector[String]): Unit = { // 每條數據都保存到狀態中 urlState.add(input) context.timerService.registerEventTimeTimer(input.windowEnd + 1) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { // 獲取收到的所有URL訪問量 val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer() import scala.collection.JavaConversions._ for (urlView <- urlState.get) { allUrlViews += urlView } // 提前清除狀態中的數據,釋放空間 urlState.clear() // 按照訪問量從大到小排序 val sortedUrlViews = allUrlViews.sortBy(_.count)(Ordering.Long.reverse) .take(topSize) // 將排名信息格式化成 String, 便於打印 var result: StringBuilder = new StringBuilder result.append("====================================\n") result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n") for (i <- sortedUrlViews.indices) { val currentUrlView: UrlViewCount = sortedUrlViews(i) // e.g. No1: URL=/blog/tags/firefox?flav=rss20 流量=55 result.append("No").append(i+1).append(":") .append(" URL=").append(currentUrlView.url) .append(" 流量=").append(currentUrlView.count).append("\n") } result.append("====================================\n\n") // 控制輸出頻率,模擬實時滾動結果 Thread.sleep(1000) out.collect(result.toString) } } }