Spark實戰 - 如何進行選擇去重


背景

業務上有一份行車軌跡的數據 carRecord.csv 如下:

id;carNum;orgId;capTime
1;粵A321;0002;20200512 102010
2;雲A321;0001;20200512 102010
3;粵A321;0001;20200512 103010
4;雲A321;0002;20200512 103010
5;粵A321;0003;20200512 114010
6;京A321;0003;20200512 114011

其中各字段含義分別為記錄id,車牌號,抓拍卡口,抓拍時間。現在需要篩選出所有車輛最后出現的一條記錄,得到每輛車最后經過的抓拍點信息,也就是要將其他日期的數據過濾掉,我們可以使用選擇去重。下面分別展示通過 dataframe 和 rdd 如果實現。

DataFrame實現

具體實現:

  1. 導入行車數據;
  2. 首先使用 withColumn() 添加 num 字段,num 字段是由 row_number() + Window() + orderBy() 實現的:開窗函數中進行去重,先對車牌carNum 進行分組,倒序排序,然后取窗口內排在第一位的則為最后的行車記錄,使用 where 做過濾,最后drop掉不再使用的 num 字段;
  3. 通過 explain 打印 dataFrame 的物理執行過程,show() 作為 action算子觸發了以上的系列運算。
val carDF = spark.read.format("csv")
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .csv(basePath + "/car.csv")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
// This import is needed to use the $-notation
import spark.implicits._

val lastPassCar = carDF.withColumn("num",
   row_number().over(
     Window.partitionBy($"carNum")
           .orderBy($"capTime" desc)
   )
).where($"num" === 1).drop($"num")
lastPassCar.explain()
lastPassCar.show()

執行計划如下:

== Physical Plan ==
*(3) Project [id#10, carNum#11, orgId#12, capTime#13]
+- *(3) Filter (isnotnull(num#19) && (num#19 = 1))
   +- Window [row_number() windowspecdefinition(carNum#11, capTime#13 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS num#19], [carNum#11], [capTime#13 DESC NULLS LAST]
      +- *(2) Sort [carNum#11 ASC NULLS FIRST, capTime#13 DESC NULLS LAST], false, 0
         +- Exchange hashpartitioning(carNum#11, 200)
            +- *(1) FileScan csv [id#10,carNum#11,orgId#12,capTime#13]

結果如下:

// 獲得其中每輛車最后經過的卡口等信息
+---+------+-----+---------------+
| id|carNum|orgId|        capTime|
+---+------+-----+---------------+
|  5|粵A321|    3|20200512 114010|
|  6|京A321|    3|20200512 114011|
|  4|雲A321|    2|20200512 103010|
+---+------+-----+---------------+

RDD實現

思路:

  1. 加載源數據並封裝到 CarRecord 樣例類中,生成RDD;
  2. 首先通過 groupBy 對 數據做分組后生成 RDD[(String, Iterable[CarRecord])]對象,隨即使用 map 對每個 key 對應的多組記錄(Iterable[CarRecord])進行reduce操作(maxBy),最后在 maxBy 算子傳入一個字面量函數(也可寫為x=>x.capTime),即提取該carNum下每條記錄中的 capTime 進行比對,然后選出最新時間記錄(maxBy 為高階函數,依賴 reduceLeft 實現);
case class CarRecord(id: Int, carNum: String, orgId: Int, capTime: String)

// 構造 schema RDD
val carRDD: RDD[CarRecord] =
    carDF.rdd.map(x => 
        CarRecord(x.getInt(0), x.getString(1), x.getInt(2), x.getString(3)))
val res = carRDD.groupBy(_.carNum).map{
    x => {
        // x._2 是 iter,取其中 capTime 最大的記錄
        x._2.maxBy { _.capTime }
    }
}
res.toDebugString
res.collect.foreach(x => println(x))

總結

實現選擇去重的兩種常用方法:

  1. 通過開窗函數 row_number+window+orderBy 進行聚合后去重;
  2. 通過 groupby + maxBy 等算子進行聚合后去重。

擴展

Hive 又如何實現選擇去重呢?與上文兩種方法一樣,請自行實現。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM