Flink統計日活



.keyBy(0)
      .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
      .evictor(TimeEvictor.of(Time.seconds(0), true))
      .process(new ProcessWindowFunction[(String, String), (String, String, Long), Tuple, TimeWindow] {
        /*
        這是使用state是因為,窗口默認只會在創建結束的時候觸發一次計算,然后數據結果,
        如果長時間的窗口,比如:一天的窗口,要是等到一天結束在輸出結果,那還不如跑批。
        所有大窗口會添加trigger,以一定的頻率輸出中間結果。
        加evictor 是因為,每次trigger,觸發計算是,窗口中的所有數據都會參與,所以數據會觸發很多次,比較浪費,加evictor 驅逐已經計算過的數據,就不會重復計算了
        驅逐了已經計算過的數據,導致窗口數據不完全,所以需要state 存儲我們需要的中間結果
         */
        var wordState: MapState[String, String] = _
        var pvCount: ValueState[Long] = _

        override def open(parameters: Configuration): Unit = {
          // new MapStateDescriptor[String, String]("word", classOf[String], classOf[String])
          wordState = getRuntimeContext.getMapState(new MapStateDescriptor[String, String]("word", classOf[String], classOf[String]))
          pvCount = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pvCount", classOf[Long]))
        }

        override def process(key: Tuple, context: Context, elements: Iterable[(String, String)], out: Collector[(String, String, Long)]): Unit = {


          var pv = 0;
          val elementsIterator = elements.iterator
          // 遍歷窗口數據,獲取唯一word
          while (elementsIterator.hasNext) {
            pv += 1
            val word = elementsIterator.next()._2
            wordState.put(word, null)
          }
          // add current
          pvCount.update(pvCount.value() + pv)
          var count: Long = 0
          val wordIterator = wordState.keys().iterator()
          while (wordIterator.hasNext) {
            wordIterator.next()
            count += 1
          }
          // uv
          out.collect((key.getField(0), "uv", count))
          out.collect(key.getField(0), "pv", pv)

        }
      })


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM