Flink模擬項目: 實時流量統計


3.1 模塊創建和數據准備

在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為NetworkTrafficAnalysis。在這個子模塊中,我們同樣並沒有引入更多的依賴,所以也不需要改動pom文件。

在src/main/目錄下,將默認源文件目錄java改名為scala。將apache服務器的日志文件apache.log復制到資源文件目錄src/main/resources下,我們將從這里讀取數據

3.2 代碼實現

我們現在要實現的模塊是 “實時流量統計”。對於一個電商平台而言,用戶登錄的入口流量、不同頁面的訪問流量都是值得分析的重要數據,而這些數據,可以簡單地從web服務器的日志中提取出來。我們在這里實現最基本的“頁面瀏覽數”的統計,也就是讀取服務器日志中的每一行log,統計在一段時間內用戶訪問url的次數。

具體做法為:每隔5秒,輸出最近10分鍾內訪問量最多的前N個URL。可以看出,這個需求與之前“實時熱門商品統計”非常類似,所以我們完全可以借鑒此前的代碼。

在src/main/scala下創建TrafficAnalysis.scala文件,新建一個單例對象。定義樣例類ApacheLogEvent,這是輸入的日志數據流;另外還有UrlViewCount,這是窗口操作統計的輸出數據類型。在main函數中創建StreamExecutionEnvironment 並做配置,然后從apache.log文件中讀取數據,並包裝成ApacheLogEvent類型。

需要注意的是,原始日志中的時間是“dd/MM/yyyy:HH:mm:ss”的形式,需要定義一個DateTimeFormat將其轉換為我們需要的時間戳格式:

.map(line => {
val linearray = line.split(" ")
val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val timestamp = sdf.parse(linearray(3)).getTime
ApacheLogEvent(linearray(0), linearray(2), timestamp, 
linearray(5), linearray(6))
})

  

完整代碼如下:

 

case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
case class UrlViewCount(url: String, windowEnd: Long, count: Long)

object TrafficAnalysis {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val stream = env
// 以window下為例,需替換成自己的路徑
      .readTextFile("YOUR_PATH\\resources\\apache.log")
      .map(line => {
val linearray = line.split(" ")
val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val timestamp = simpleDateFormat.parse(linearray(3)).getTime
        ApacheLogEvent(linearray(0), linearray(2), timestamp, linearray(5), linearray(6))
      })
      .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent]
(Time.milliseconds(1000)) {
override def extractTimestamp(t: ApacheLogEvent): Long = {
t.eventTime
}
})
      .keyBy("url")
      .timeWindow(Time.minutes(10), Time.seconds(5))
      .aggregate(new CountAgg(), new WindowResultFunction())
      .keyBy(1)
      .process(new TopNHotUrls(5))
      .print()
    env.execute("Traffic Analysis Job")
  }

  class CountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {
    override def createAccumulator(): Long = 0L
    override def add(apacheLogEvent: ApacheLogEvent, acc: Long): Long = acc + 1
    override def getResult(acc: Long): Long = acc
    override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
  }

  class WindowResultFunction extends WindowFunction[Long, UrlViewCount, Tuple, TimeWindow] {
    override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long], collector: Collector[UrlViewCount]) : Unit = {
      val url: String = key.asInstanceOf[Tuple1[String]].f0
      val count = aggregateResult.iterator.next
      collector.collect(UrlViewCount(url, window.getEnd, count))
    }
  }

  class TopNHotUrls(topsize: Int) extends KeyedProcessFunction[Tuple, UrlViewCount, String] {
    private var urlState : ListState[UrlViewCount] = _

    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      val urlStateDesc = new ListStateDescriptor[UrlViewCount]("urlState-state", classOf[UrlViewCount])
      urlState = getRuntimeContext.getListState(urlStateDesc)
    }

    override def processElement(input: UrlViewCount, context: KeyedProcessFunction[Tuple, UrlViewCount, String]#Context, collector: Collector[String]): Unit = { 
// 每條數據都保存到狀態中
      urlState.add(input)
      context.timerService.registerEventTimeTimer(input.windowEnd + 1)
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { 
// 獲取收到的所有URL訪問量
      val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer()
      import scala.collection.JavaConversions._
      for (urlView <- urlState.get) {
        allUrlViews += urlView
      }
      // 提前清除狀態中的數據,釋放空間
      urlState.clear()
      // 按照訪問量從大到小排序
      val sortedUrlViews = allUrlViews.sortBy(_.count)(Ordering.Long.reverse)
.take(topSize)
      // 將排名信息格式化成 String, 便於打印
      var result: StringBuilder = new StringBuilder
      result.append("====================================\n")
      result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")

      for (i <- sortedUrlViews.indices) {
        val currentUrlView: UrlViewCount = sortedUrlViews(i)
// e.g.  No1:  URL=/blog/tags/firefox?flav=rss20  流量=55
        result.append("No").append(i+1).append(":")
.append("  URL=").append(currentUrlView.url)
.append("  流量=").append(currentUrlView.count).append("\n")
      }
      result.append("====================================\n\n")
      // 控制輸出頻率,模擬實時滾動結果
      Thread.sleep(1000)
      out.collect(result.toString)
    }
  }
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM