基於spark和flink的電商數據分析項目


本文是原項目的一次重寫。主要是用DataFrame代替原來的RDD,並在一些實現上進行優化,還有就是實時流計算改用Flink進行實現。
項目分為用戶訪問session模塊、頁面轉跳轉化率統計、熱門商品離線統計和廣告流量實時統計四部分組成。

業務需求

用戶訪問session

該模塊主要是對用戶訪問session進行統計分析,包括session的聚合指標計算、按時間比例隨機抽取session、獲取每天點擊、下單和購買排名前10的品類、並獲取top10品類的點擊量排名前10的session。主要使用Spark DataFrame。

頁面單跳轉化率統計

該模塊主要是計算關鍵頁面之間的單步跳轉轉化率,涉及到頁面切片算法以及頁面流匹配算法。主要使用Spark DataFrame。

熱門商品離線統計

該模塊主要實現每天統計出各個區域的top3熱門商品。主要使用Spark DataFrame。

廣告流量實時統計

經過實時黑名單過濾的每天各省各城市廣告點擊實時統計、每天各省topn熱門廣告、各廣告近1小時內每分鍾的點擊趨勢。主要使用Spark streaming或Flink。

業務數據源

輸入表

# 用戶表
user_id      用戶的ID
username     用戶的名稱
name         用戶的名字
age          用戶的年齡
professional 用戶的職業
city         用戶所在的城市
sex          用戶的性別

# 商品表
product_id   商品的ID
product_name 商品的名稱
extend_info  商品額外的信息

# 用戶訪問動作表
date               用戶點擊行為的日期
user_id            用戶的ID
session_id         Session的ID
page_id            某個頁面的ID
action_time        點擊行為的時間點
search_keyword     用戶搜索的關鍵詞
click_category_id  某一個商品品類的ID
click_product_id   某一個商品的ID
order_category_ids 一次訂單中所有品類的ID集合
order_product_ids  一次訂單中所有商品的ID集合
pay_category_ids   一次支付中所有品類的ID集合
pay_product_ids    一次支付中所有商品的ID集合
city_id            城市ID

輸出表

# 聚合統計表
taskid                       當前計算批次的ID
session_count                所有Session的總和
visit_length_1s_3s_ratio     1-3sSession訪問時長占比
visit_length_4s_6s_ratio     4-6sSession訪問時長占比
visit_length_7s_9s_ratio     7-9sSession訪問時長占比
visit_length_10s_30s_ratio   10-30sSession訪問時長占比
visit_length_30s_60s_ratio   30-60sSession訪問時長占比
visit_length_1m_3m_ratio     1-3mSession訪問時長占比
visit_length_3m_10m_ratio    3-10mSession訪問時長占比
visit_length_10m_30m_ratio   10-30mSession訪問時長占比
visit_length_30m_ratio       30mSession訪問時長占比
step_length_1_3_ratio        1-3步長占比
step_length_4_6_ratio        4-6步長占比
step_length_7_9_ratio        7-9步長占比
step_length_10_30_ratio      10-30步長占比
step_length_30_60_ratio      30-60步長占比
step_length_60_ratio         大於60步長占比

# 品類Top10表
taskid
categoryid
clickCount
orderCount
payCount

# Top10 Session
taskid
categoryid
sessionid
clickCount

用戶訪問Session分析

Session聚合統計

統計出符合條件的session中,各訪問時長、步長的占比,並將結果保存到MySQL中。符合條件的session指搜索過某些關鍵詞的用戶、訪問時間在某個時間段內的用戶、年齡在某個范圍內的用戶、職業在某個范圍內的用戶、所在某個城市的用戶,所發起的session。

除了將原rdd的實現改為DF外,本文還在兩方面進行了優化。第一是join前提前filter。原實現是先從用戶動作表中計算出訪問時長、步長后和用戶信息表進行關聯后再filter的,這無疑是對一些無關的用戶多余地計算了訪問時長和步長,也增加了join是shuffle的數據量。第二點是原實現采用accumulator實現個訪問時長人數和各步長人數的統計,這會增加driver的負擔。而重寫后的代碼基於DF,且利用when函數對訪問時長和步長進行離散化,最后利用聚合函數得出統計結果,讓所有統計都在executors中並行執行。

// 原始數據包含“用戶訪問動作表”中的信息
// 先根據時間范圍篩選“用戶訪問動作表”,然后將它和“UserInfo表”進行inner join,補充用於進一步篩選的信息:age、professional、city、sex
// 根據searchKeywords、clickCategoryIds和上面4個條件對數據進行篩選,得出所需的session。

// 利用spark sql篩選特定時間段的session
spark.sql("select * from user_visit_action where date>='" + startDate + "' and date<='" + endDate + "'")

// 下面代碼用於合成SQL語句並用於filter特定類型的session,但有一定的安全隱患,要對輸入的參數進行嚴格的校驗,防止SQL注入。
val selectClause = new StringBuilder("SELECT * FROM user_visit_action_to_user_info WHERE 1=1 ")
if (ValidUtils.equal(Constants.PARAM_SEX, sex)){
  selectClause append ("AND sex == '" + sex + "'")
}
if (ValidUtils.in(Constants.PARAM_PROFESSIONALS, professionals)){
  selectClause append ("AND professional in (" + professionals + ")")
}
if (ValidUtils.in(Constants.PARAM_CITIES, cities)){
  selectClause append ("AND cities in (" + cities + ")")
}
if (ValidUtils.in(Constants.PARAM_KEYWORDS, keywords)){
  selectClause append ("AND search_keyword in (" + keywords + ")")
}
if (ValidUtils.in(Constants.PARAM_CATEGORY_IDS, categoryIds)){
  selectClause append ("AND click_category_id in (" + categoryIds + ")")
}
if (ValidUtils.between(Constants.FIELD_AGE, startAge, endAge)){
  selectClause append  ("AND age BETWEEN " + startAge + " AND " + endAge)
}
val sqlQuery = selectClause.toString()

// filter完后與“用戶表”建立連接

// 下面進行session聚合計算,結果得到的信息包括sessionid、search_keyword、click_category_id、stepLength、visitLength、session開始時間start、AGE、PROFESSIONAL、CITY、SEX
val timeFmt = "yyyy-MM-dd HH:mm:ss"
val sessionid2ActionsRDD2 = UserVisitActionDF
  .withColumn("action_time", unix_timestamp($"action_time", timeFmt))
  .groupBy("session_id")
  .agg(min("action_time") as "start",
    max("action_time") as "end",
    count("*") as "stepLength")
  .withColumn("visitLength", $"start" - $"end")
  .withColumn("discrete_VL", discretiseVisitLength)
  .withColumn("discrete_SL", discretiseStepLength)

// 離散化 visitLength 和 stepLength
val discretiseVisitLength = when($"visitLength" >= 1 && $"visitLength" <= 3 , Constants.TIME_PERIOD_1s_3s)
.when($"visitLength" >= 4 && $"visitLength" <= 6 , Constants.TIME_PERIOD_4s_6s)
...
.when($"visitLength" >= 1800, Constants.TIME_PERIOD_30m)

// 統計信息,獲得每種訪問時長的人數。將下面discrete_VL換成stepLength就是每種步長的人數了
val statisticVisitLength = sessionid2ActionsRDD2.groupBy("discrete_VL").agg(count("discrete_VL")).collect()

Session分層抽樣

根據各時長、步長的比例抽樣。原實現利用rdd和scala自身數據結構和方法來實現,新的實現直接利用dataframe的統計函數sampleBy實現。

df.stat.sampleBy("colName", fractions, seed),其中fractions為Map,是每distinct key和其需要抽取的比例,如("a" -> 0.8)就是從key為a的數據中抽80%條

val fractions = HashMap(
  TIME_PERIOD_1s_3s -> 0.1,
  TIME_PERIOD_4s_6s -> 0.1,
  TIME_PERIOD_7s_9s -> 0.1,
  TIME_PERIOD_10s_30s -> 0.1,
  TIME_PERIOD_30s_60s -> 0.1,
  TIME_PERIOD_1m_3m -> 0.1,
  TIME_PERIOD_3m_10m -> 0.1,
  TIME_PERIOD_10m_30m -> 0.1,
  TIME_PERIOD_30m -> 0.1
)
df.stat.sampleBy("time_period", fractions, 2L)

// 如果time_period未知,用下面方式得出map
df.select("time_period")
  .distinct
  .map(x=> (x, 0.8))
  .collectAsMap

Top10熱門品類

分別計算出各商品的點擊數、下單數、支付次數,然后將三個結果進行連接,並排序。排序規則是點擊數大的排前面,相同時下單數大的排前面,然后再相同時支付次數大的排前面。這里的優化點是采用rdd的takeOrdered取前十,它的底層是每個分區一個最小堆,取出每個分區的前10,然后再匯總。這樣省去了原來實現當中的sortbykey+take,該方法進行了全排序,效率較低。

// 分別計算出各商品的點擊數、下單數、支付次數,然后將三個結果進行連接,並排序。
val clickCategoryId2CountDF = sessionid2detailDF
  .select("clickCategoryId")
  .na.drop()
  .groupBy("clickCategoryId")
  .agg(count("clickCategoryId"))
  .withColumnRenamed("clickCategoryId", "categoryId")

val orderCategoryId2CountDF = sessionid2detailDF
  .select("order_category_ids")
  .na.drop()
  .withColumn("splitted_order_category_ids", split($"order_category_ids", ","))
  .withColumn("single_order_category_ids", explode($"splitted_order_category_ids"))
  .groupBy("single_order_category_ids")
  .agg(count("single_order_category_ids"))
  .withColumnRenamed("single_order_category_ids", "categoryId")

val payCategoryId2Count = sessionid2detailDF
  .select("pay_category_ids")
  .na.drop()
  .withColumn("splitted_pay_category_ids", split($"pay_category_ids", ","))
  .withColumn("single_pay_category_ids", explode($"splitted_pay_category_ids"))
  .groupBy("single_pay_category_ids")
  .agg(count("single_pay_category_ids"))
  .withColumnRenamed("single_pay_category_ids", "categoryId")

val top10CategoryId = clickCategoryId2CountDF.join(orderCategoryId2CountDF, Seq("categoryId"), "left")
  .join(payCategoryId2Count, Seq("categoryId"), "left")
  .na.fill(0L, Seq(""))
  .map(row => {
    (row.getAs[Int]("categoryId"), 
     row.getAs[Int]("count(clickCategoryId)"), 
     row.getAs[Int]("count(single_order_category_ids)"),
     row.getAs[Int]("count(single_pay_category_ids)"))
  })
  .rdd
  .takeOrdered(10)(ordering)

// 補充
implicit val ordering = new Ordering[(Int, Int, Int, Int)] {
  override def compare(x: (Int, Int, Int, Int), y: (Int, Int, Int, Int)): Int = {
    val compare2 = x._2.compareTo(y._2)
    if (compare2 != 0) return compare2
    val compare3 = x._3.compareTo(y._3)
    if (compare3 != 0) return compare3
    val compare4 = x._4.compareTo(y._4)
    if (compare4 != 0) return compare4
    0
  }
}.reverse

Top10活躍Session

對於top10的品類,每一個都要獲取對它點擊次數排名前10的session。
原代碼的實現是先groupByKey,統計出每個sessionid對各品類的點擊次數,然后再跟前10熱門品類連接來減少數據,然后再用groupBuKey,對每個分組數據toList后排序取前10。這個實現並不太好,首先它一開始的groupByKey對非Top10熱門品類的數據進行了統計,這是一種浪費。更好的做法是提前filter,即先利用熱門品類這個名單進行filter。然后,原代碼在實現filter使用的是將熱門品類名單parallelise到集群然后利用join實現過濾。這會觸發不必要的shuffle,更好的實現進行broadcast join,將名單廣播出去后進行join。然后groupByKey的統計也是一個問題,它沒有map side聚合,容易OOM,更好的實現是采用DF的groupby + agg。得出統計數據后利用windowfunction取得各熱門品類的前十session。即一次shuffle就可以完成需求,windowfunction在這個並不需要shuffle,因為經過前面的shuffle聚合,df已經具有partitioner了,在原節點就可以計算出topn。

// 把top10CategoryId的名單發到集群
val top10CategoryIdRDD = spark.sparkContext.parallelize(top10CategoryId.map(_._1)).toDF("top10CategoryId")

// 利用broadcast實現過濾,然后進行分組統計
val top10Category2SessionAndCount =     filteredUserVisitActionDF.join(broadcast(top10CategoryIdRDD), $"click_category_id" ===  $"top10CategoryId")
      .groupBy("top10CategoryId", "sessionId")
      .agg(count("click_category_id") as "count")

// 分組取前10
// windowfunction在這個並不需要shuffle,因為經過前面的shuffle聚合,df已經具有partitioner了,在原節點就可以計算出topn。
val windowSpec = Window.partitionBy("top10CategoryId", "sessionId").orderBy(desc("count"))
val top10SessionWithinTop10Category = top10Category2SessionAndCount.select(expr("*"), rank().over(windowSpec).as("rank"))
      .filter($"rank" <= 10)

頁面單跳轉化率分析

計算關鍵頁面之間的單步跳轉轉化率。方法是先獲取目標頁面,如1,2,3,將它們拼接成1_2, 2_3得出兩個目標轉跳形式。同樣需要在df的數據中產生頁面轉跳。方法是利用windowfunction將數據按sessionid分組,訪問時間升序排序,然后利用concat_ws和window的lag函數實現當前頁面id與前一條數據的頁面id的拼接。集群數據中產生轉跳數據后,利用filter篩選出之前的目標轉跳形式。最后按這些形式分組統計數量,便得出每種轉跳的數量,將它collect為map。另外還需要計算起始頁1的數量,簡單的filter和count實現。接下來就可以根據這些數據計算轉跳率了。遍歷目標轉跳形式,從map中獲取相應的數量,然后除以起始頁/上一頁的數量,進而得出結果。

// 獲取需要查詢的頁面id,結果如"3,1,4,5,2"
val targetPageFlow = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW)
// 對需要查詢的頁面id進行分割,結果如Array("3","1","4","5","2")
val targetPages = targetPageFlow.split(",")

// 構建目標轉跳頁面id,結果如Array(3_1,1_4,4_5,5_2)
val targetPagePairs = targetPages
  .zip(targetPages.tail)
  .map(item => item._1 + "_" + item._2)
val targetPageFlowBroadcast = spark.sparkContext.broadcast(targetPagePairs)

// 設置將要用到的時間格式和window函數
val timeFmt = "yyyy-MM-dd HH:mm:ss"
val windowSpec = Window
  .partitionBy("session_id")
  .orderBy($"action_time")
val pagesPairFun = concat_ws("_", col("page_id"), lag("page_id", -1).over(windowSpec))

// 計算各目標轉跳id的數量
val pageSplitPvMap = df.na.drop(Seq("session_id"))
  .withColumn("action_time", to_timestamp($"action_time", timeFmt))
  .withColumn("pagePairs", pagesPairFun)
  // 下面filter方式,條件少時可行,多時用broadcast jion
  .filter($"pagePairs" isin (targetPageFlowBroadcast.value: _*))
  .groupBy("pagePairs")
  .agg(count("pagePairs"))
  .as[(String, Long)]
  .collect().toMap

// 計算起始頁面的點擊數
val startPage = targetPages(0)
val startPagePv = df.filter($"page_id" === startPage).count().toDouble
var lastPageSplitPv = startPagePv

// 存儲結果的map
val convertRateMap = new mutable.HashMap[String, Double]()

for(targetPage <- targetPagePairs){
  val targetPageSplitPv = pageSplitPvMap(targetPage).toDouble
  val convertRate = "%.2f".format(targetPageSplitPv / lastPageSplitPv).toDouble
  convertRateMap.put(targetPage, convertRate)
  lastPageSplitPv = targetPageSplitPv
}

各區域熱門商品統計分析

原數據沒有地區列和城市列(有城市id),所以先廣播一個地區城市表,然后根據城市id進行join。之后按照地區和商品分組進行計數。最后利用windowfunction取各地區topn。

val cityInfo = Array((0L, "北京", "華北"), (1L, "上海", "華東"),
  (2L, "南京", "華東"), (3L, "廣州", "華南"),
  (4L, "三亞", "華南"), (5L, "武漢", "華中"),
  (6L, "長沙", "華中"), (7L, "西安", "西北"),
  (8L, "成都", "西南"), (9L, "哈爾濱", "東北"))

// Row(city_id, city_name, area)
val cityInfoDF = spark.sparkContext.makeRDD(cityInfo).toDF("city_id", "city_name", "area")

// 提取 cityid 和 productid
val cityid2clickActionDF = df.select("city_id", "product_id")
  .na.drop(Seq("product_id"))
  .filter($"product_id" =!= -1L)

// (cityid, cityName, area, productid)
val area_product_clickCount_cityListDF = cityid2clickActionDF.join(broadcast(cityInfoDF), Seq("city_id"), "inner")
  .withColumn("cityId_cityName", concat_ws(":", $"city_id", $"city_name"))
  .groupBy($"area", $"product_id")
  .agg(count("*") as "click_count", collect_set("cityId_cityName") as "city_list")


// 和top10熱門session類似,利用window求topn
val windowSpec = Window
  .partitionBy("area", "product_id")
  .orderBy($"click_count".desc)

// 每個地區前三熱門商品
val areaTop3ProductDF = area_product_clickCount_cityListDF.withColumn("rank", $"click_count".over(windowSpec))
  .filter($"rank" <= 3)


// productInfo表(對json的操作)
val productInfoDF = df.select("product_id", "product_name", "extend_info")
    .withColumn("product_status", get_json_object($"extend_info", "$.product_status"))
    .withColumn("product_status", when($"product_status" === 0, "Self").otherwise("Third Party"))
    .drop("extend_info")

// 補充信息
val areaTop3ProducFullInfoDF = areaTop3ProductDF.join(productInfoDF, Seq("product_id"), "inner")

廣告點擊流量實時統計分析

經過實時黑名單過濾的每天各省各城市廣告點擊實時統計、每天各省topn熱門廣告、各廣告近1小時內每分鍾的點擊趨勢。這部分原代碼采用Spark Streaming實現,我將之改為基於Flink的實現。下面會首先介紹Spark Streaming的實現,然后到Flink。

流式數據的格式為:
timestamp	1450702800
province 	Jiangsu	
city 	Nanjing
userid 	100001
adid 	100001

總體流程

創建流,利用預先廣播的黑名單過濾信息,然后利用過濾后的信息更新黑名單、計算廣告點擊流量、統計每天每個省份top3熱門廣告、統計一個小時窗口內每分鍾各廣告的點擊量。

// 構建Spark上下文
val sparkConf = new SparkConf().setAppName("streamingRecommendingSystem").setMaster("local[*]")

// 創建Spark客戶端
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))

// 設置檢查點目錄
ssc.checkpoint("./streaming_checkpoint")

// --- 此處省略Kafka配置 --- //

// 創建DStream
val adRealTimeLogDStream = KafkaUtils.createDirectStream[String,String](...)

var adRealTimeValueDStream = adRealTimeLogDStream.map(_.value)

// 用於Kafka Stream的線程非安全問題,重新分區切斷血統
adRealTimeValueDStream = adRealTimeValueDStream.repartition(400)
// 根據動態黑名單過濾數據。利用findAll來查找MySQL中所有的黑名單用戶,然后通過join實現過濾。
val filteredAdRealTimeLogDStream = filterByBlacklist(spark, adRealTimeValueDStream)

// 業務功能一:生成動態黑名單
generateDynamicBlacklist(filteredAdRealTimeLogDStream)

// 業務功能二:計算廣告點擊流量實時統計結果(yyyyMMdd_province_city_adid,clickCount)
val adRealTimeStatDStream = calculateRealTimeStat(filteredAdRealTimeLogDStream)

// 業務功能三:實時統計每天每個省份top3熱門廣告
calculateProvinceTop3Ad(spark, adRealTimeStatDStream)

// 業務功能四:實時統計每天每個廣告在最近1小時的滑動窗口內的點擊趨勢(每分鍾的點擊量)
calculateAdClickCountByWindow(adRealTimeValueDStream)

ssc.start()
ssc.awaitTermination()

實時黑名單

實現實時的動態黑名單機制:將每天對某個廣告點擊超過100次的用戶拉黑。提取出日期(yyyyMMdd)、userid、adid,然后reduceByKey統計這一批數據的結果,並批量插入MySQL。然后過濾出新的黑名單用戶,實現為從MySQL中查找每條數據的用戶是否對某條廣告的點擊超過100次,即成為了新的黑名單用戶,找到后進行distinct操作得出新增黑名單用戶,並更新到MySQL。

// 從 adRealTimeValueDStream 中提取出下面三個值並構建(key, 1L)
val key = datekey + "_" + userid + "_" + adid
// 然后 reduceByKey(_ + _), 得到這batch每天每個用戶對每個廣告的點擊量
dailyUserAdClickCountDStream.foreachRDD{ rdd =>
      rdd.foreachPartition{ items =>
// items 是 Iterator(key, count),提取key的值,構成(date,  userid, adid, clickCount),批量寫入mysql
      ... }}
// 之后filter,每條數據到 mysql 查詢更新后的(date, userid, adid)的count是否大於100,表示當天某用戶對某個廣告是否點擊超過100次,如果是就true(留下)最后得出新黑名單blacklistDStream。去重后直接批量插入mysql
blacklistDStream.transform(_.distinct())

廣告點擊實時統計

每天各省各城市各廣告的點擊流量實時統計。分組,key為日期+省份+城市+廣告id,利用updateStateByKey實現累加。新的統計結果更新到MySQL。

// 執行updateStateByKey算子
// spark streaming特有的一種算子,在spark集群內存中,維護一份key的全局狀態
// 和黑名單一樣,先從string中提取出信息並構建key
val aggregatedDStream = dailyUserAdClickDStream.updateStateByKey[Long]{ (values:Seq[Long], old:Option[Long]) =>
  var clickCount = 0L

  // 如果說,之前是存在這個狀態的,那么就以之前的狀態作為起點,進行值的累加
  if(old.isDefined) {
    clickCount = old.get
  }

  // values代表了,batch rdd中,每個key對應的所有的值
  for(value <- values) {
    clickCount += value
  }

  Some(clickCount)
}
// 然后和黑名單中一樣,批量更新到mysql

統計每天各省top3熱門廣告

利用上一步得到的結果,即key為日期+省份+城市+廣告id,value為累積點擊量,進行統計及分組topn。reduceByKey + windowfunction

統計各廣告最近1小時內的點擊量趨勢:各廣告最近1小時內各分鍾的點擊量

同樣在累積數據的基礎上操作,提取出時間,然后利用固定窗口實現需求。

// 從原始流(未去除黑名單的數據)中提取出timeMinute、adid兩個值進行聚合統計
pairDStream.reduceByKeyAndWindow((a: Long, b: Long) => a + b, Minutes(60L), Seconds(10L))
// 下面 items 就是 Iterator(timeMinute_adid, count)
aggrRDD.foreachRDD { rdd =>
      rdd.foreachPartition { items => ...}}
// 從key中提取出date、hour和minute寫入mysql

Flink實現

Flink的思路是通過三個KeyedProcessFunction來實現的,因為他有state(累積各key的值)和timer(定時刪除state)功能。
第一個KeyedProcessFunction是記錄每個userId-adId鍵的量,當達到閾值時對這類信息進行截流,從而實現黑名單的更新和過濾。
第二個是記錄每個province的數據量,即每個省的廣告點擊量
第三個是記錄一個map,里面統計每個省的點擊量,當進行了一定數量的更新后,就輸出一次這個map的前n個kv對(以排好序的string的形式),從而實現topn功能。

// 模塊結構
├── Launcher.scala 啟動類
├── bean
│   └── AdLog.scala 三個case class
├── constant
│   └── Constant.scala 定義了一些定值字符串
├── function 處理函數,下面介紹。
│   ├── AccProvClick.scala
│   ├── BetterGenerateTopK.scala
│   └── FilterBlackListUser.scala
└── schema
    └── AdLogDeserializationSchema.scala 用於反序列化Kafka信息

Launcher類

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// kafka配置
val consumerProps = new Properties()
consumerProps.setProperty(KEY_BOOTSTRAP_SERVERS, args(0))
consumerProps.setProperty(KEY_GROUP_ID, args(1))

// kafka消費者
val consumer = new FlinkKafkaConsumer010(
  args(2),
  new AdLogDeserializationSchema(),
  consumerProps
)

// 設置數據源
val adLogStream = env.addSource(consumer)

// 對點擊某一廣告多於100的用戶進行截流,從而一次性完成黑名單過濾和黑名單更新。
val withSideOutput = adLogStream
  .keyBy(adLog => (adLog.userid, adLog.adid))
  .process(new FilterBlackListUser)

// (可選)新增的黑名單流。此處只輸出到控制台,有需要可以輸出到其他端口。
withSideOutput.getSideOutput(realTimeBlackList)
  .print()
// 在main函數外添加下面代碼才能取得sideoutput
// val realTimeBlackList: OutputTag[String] =
//    new OutputTag[String]("black_list")

// 實時統計廣告點擊量最多的前K個省份。同樣此處只輸出到控制台,有需要可以輸出到其他端口。
withSideOutput
  .keyBy(_.province)
  // 按province進行分組累加的stateful操作
  .process(new AccProvClick) // 這里也可以輸出到數據庫或者kafka等,從而對這些聚合好的數據進行不同需求的分析
  .keyBy(_.dummyKey)
  .process(new BetterGenerateTopK(10))
  .print()

env.execute("TopK_Province")

AdLog類

廣告日志類以及處理過程產生的一些新case class

// 從kafka獲取並實現反序列化后的數據
case class AdLog(userid: Int, adid: Int, province: String, city: String, timestamp: Long)
// 經過FilterBlackListUser處理后得到的數據,如果需要對adid、city都進行分組,也可以在這里加屬性
case class ProvinceWithCount(province: String, count: Int, dummyKey: Int)

Schema類

class AdLogDeserializationSchema extends DeserializationSchema[AdLog]{

  override def deserialize(bytes: Array[Byte]): AdLog = {
    val json = parse(new String(bytes))
    implicit val formats = DefaultFormats
    json.extract[AdLog]
  }

  // 可以根據接收的AdLog來判斷是否需要結束這個數據流。如果不需要這個功能就直接返回false。
  override def isEndOfStream(t: AdLog): Boolean = false

  // 告訴Flink經過反序列化后得到什么類
  override def getProducedType: TypeInformation[AdLog] = TypeInformation.of(AdLog.getClass.asInstanceOf[Class[AdLog]])
}

FilterBlackListUser類

class FilterBlackListUser extends KeyedProcessFunction[(Int, Int), AdLog, ProvinceWithCount] {

  // 存儲當前userId-adId鍵值的廣告點擊量
  var countState: ValueState[Int] = _
  // 標記當前userId-adId鍵值是否第一次進入黑名單的flag
  var firstSent: ValueState[Boolean] = _
  // 記錄當前userId-adId鍵值state的生成時間
  var resetTime: ValueState[Long] = _

  // 初始化key state
  override def open(parameters: Configuration): Unit = {

    val countDescriptor = new ValueStateDescriptor[Int]("count", classOf[Int])
    countState = getRuntimeContext
      .getState[Int](countDescriptor)

    val firstSeenDescriptor = new ValueStateDescriptor[Boolean]("firstSent", classOf[Boolean])
    firstSent = getRuntimeContext
      .getState[Boolean](firstSeenDescriptor)

    val resetTimeDescriptor = new ValueStateDescriptor[Long]("resetTime", classOf[Long])
    resetTime = getRuntimeContext
      .getState[Long](resetTimeDescriptor)

  }

  override def processElement(value: AdLog,
                              ctx: KeyedProcessFunction[(Int, Int), AdLog, ProvinceWithCount]#Context,
                              out: Collector[ProvinceWithCount]): Unit = {
    val curCount = countState.value()
    // 第一次處理登記timer,24:00清除state
    if (curCount == 0) {
      val time = (ctx.timerService().currentProcessingTime() / 86400000 + 1) * 86400000
      resetTime.update(time)
      ctx.timerService().registerProcessingTimeTimer(time)
    }

    // 加入黑名單,並在side output輸出,但只輸出一次
    if (curCount >= 100) {
      // 默認初始為false
      if (!firstSent.value()) {
        firstSent.update(true)
        ctx.output(Launcher.realTimeBlackList, value.userid.toString)
      }
      return
    }
    // 點擊次數+1
    countState.update(curCount + 1)
    out.collect(ProvinceWithCount(value.province, 1,1))
  }

  // 到達預定時間時清除state
  override def onTimer(timestamp: Long,
                       ctx: KeyedProcessFunction[(Int, Int), AdLog, ProvinceWithCount]#OnTimerContext,
                       out: Collector[ProvinceWithCount]): Unit = {
    if (timestamp == resetTime.value()) {
      firstSent.clear()
      countState.clear()
    }
  }
}

AccProvClick類

代碼形式和上面的類幾乎一樣

class AccProvClick extends KeyedProcessFunction[String, ProvinceWithCount, ProvinceWithCount] {

  // 存儲當前province鍵值的廣告點擊量
  var countState: ValueState[Int] = _
  var resetTime: ValueState[Long] = _

  override def open //和上面類似

  override def processElement(value: ProvinceWithCount,
                              ctx: KeyedProcessFunction[String, ProvinceWithCount, ProvinceWithCount]#Context,
                              out: Collector[ProvinceWithCount]): Unit = {
    // 和上面類似,如果countState值為0,先設置timer
    val curCount = countState.value() + 1
    countState.update(curCount)
    out.collect(ProvinceWithCount(value.province, curCount, 1))
  }

  override def onTimer // 和上面類似
}

BetterGenerateTopK類

class BetterGenerateTopK(n: Int) extends KeyedProcessFunction[Int, ProvinceWithCount, String] {

  // 存儲各省的廣告點擊量
  var prov2clickTable : MapState[String, Int] = _

  var resetTime: ValueState[Long] = _

  // 每積累到100條更新就發送一次排名結果
  var sendFlag : Int = 0

  override def open(parameters: Configuration): Unit = {
    val prov2clickDescriptor = new MapStateDescriptor[String, Int]("statistic", classOf[String], classOf[Int])
    prov2clickTable = getRuntimeContext
      .getMapState[String, Int](prov2clickDescriptor)

    val resetTimeDescriptor = // 上面類似
  }
  override def processElement(value: ProvinceWithCount,
                              ctx: KeyedProcessFunction[Int, ProvinceWithCount, String]#Context,
                              out: Collector[String]): Unit = {

    if (!prov2clickTable.iterator().hasNext) {
      val time = (ctx.timerService().currentProcessingTime() / 86400000 + 1) * 86400000
      resetTime.update(time)
      ctx.timerService().registerProcessingTimeTimer(time)
    }

    prov2clickTable.put(value.province, value.count)

    sendFlag += 1
    if (sendFlag % 100 == 0){
      sendFlag = 0
      val res = new StringBuilder
      prov2clickTable.iterator()
        .asScala
        .toArray
        .sortBy(_.getValue)
        .takeRight(n)
        .foreach(x => res.append(x.getKey + x.getValue))
      out.collect(res.toString())
    }
  }

  override def onTimer // 和上面類似
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM