需求
假設我們有一張各個產品線URL的訪問記錄表,該表僅僅有兩個字段:product、url,我們需要統計各個產品線下訪問次數前10的URL是哪些?
解決方案
(1)模擬訪問記錄數據

模擬數據記錄共有1000條,其中包括10個產品線:product1、product2、…、product10,100個URL:url1、url2、…、url100,為了簡化生成數據的過程,產品線和URL均使用了隨機數。一條記錄為一個字符串,產品線與URL使用空格進行分隔。模擬數據存儲在一個名為“data”的列表中,通過parallelize的方式形成一個RDD:table,再使用inferSchema的方式注冊為一張臨時表“product_url”。
表“product_url”的示例數據如下:

(2)統計各個產品線下各個URL的訪問次數

這個邏輯使用Spark SQL即可以實現,示例數據如下:

可以看出,數據多出了一個字段access,用於表示某產品線下某個URL的訪問次數。
此外,如果我們有一個數據類型為Row的變量row,可以通過row.product、row.url、row.access或者row[0]、row[1]、row[2]訪問product、url、access對應的數據。
(3)“分區排序取值”
我們的統計需求有一個明顯的分界線:產品線,Top N的處理邏輯可以轉變為:
a. 根據分界線做分區,即每一個產品線的記錄進入同一個分區;
b. 每一個分區(產品線)內根據URL的訪問次數(access)排序(降序);
c. 每一個分區(產品線)內取前N條數據即可。
a、b實際就是一個“分區排序”的過程,Spark RDD也為“分區排序”提供了非常方便的API:repartitionAndSortWithinPartitions,但是該函數需要傳入的數據類型要求為(key, value),因此我們需要對(2)中的數據做一下簡單的處理:

其實就是將數據類型Row映射為元組(Row, None),示例數據如下:

此外repartitionAndSortWithinPartitions還需要兩個函數:partitionFunc、keyFunc,這兩個函數都需要接收一個參數,該參數為(key, value)中的key。
partitionFunc用於根據(key, value)中的key如何選取分區,返回值要求為整型,數值即為分區號,即0表示分區0,1表示分區1,…。

這里key的數據類型即為Row。
因為我們模擬了10個產品線,每一個產品線的數據需要被划分到同一個分區內,因此我們也需要10個分區(分區序號為0—9),根據產品線划分分區的規則為:產品線product1的分區為0,產品線product2的分區為1,…,產品線product10的分區為9。其中key[0]為產品線名稱,key[0][7]為產品線名稱中的隨機數,將key[0][7]轉換為整數並減一即可得到對應的分區號。
keyFunc用於根據(key, value)中的key如何排序,“分區內排序”時即根據該函數的返回值進行排序。

其中,key[2]為訪問次數access,我們就是需要在某個分區(產品線)內根據URL的訪問次數做排序。
函數partitionFunc、keyFunc准備好之后,我們可以開始調用repartitionAndSortWithinPartitions:

需要注意的是,numPartitions值為10,該值取決於分區(產品線)的個數;ascending值為False,該值表示分區內排序時使用降序。
“分區排序”之后我們即可以開始“取值”,取值的過程比較簡單:在每個分區內即前N(這里假設為10,即top 10)個值,將這些值“匯總”之后即可得出各個產品線下URL訪問次數的Top 10。
考慮到我們需要“匯總”的需求,因此不能使用foreachPartition,需要通過mapPartitions實現,它需要一個函數:f,函數f的參數為一個“迭代器”,通過這個“迭代器”可以遍歷分區內的所有數據。

從上面的代碼可以看出,我們就是通過“迭代器”iter獲取分區內的前10條數據(如果分區內的數據條數大於或等於10的話)。
“匯總”(collect)結果:

rows中保存着各個產品線下URL訪問次數的Top 10記錄。
(4)結果處理
計算完成之后,我們可以對結果進行一些處理,如:根據產品線、URL根據字典序排序並輸出,代碼如下:

示例數據:

總結
使用Spark解決Top N問題時,只需要經過“划分分區、分區內排序、分區內取值”三個過程即可完成。