pyspark的排序


一、count

sql = """select video_id,count(video_id) as video_num from video_table group by video_id order by video_num desc"""
rdd = spark.sql(sql).rdd.map(lambda x: x["video_id"])
result = rdd.collect()

二、sortBy和sortByKey

from operator import add
sql = """select video_id from video_table """
rdd = spark.sql(sql).rdd.map(lambda x: (x["video_id"],1))..reduceByKey(add)
rdd1 = rdd.sortBy(lambda x: x[1], ascending=False)
rdd2 = rdd.sortByKey(lambda x: x, ascending=False)
result = rdd1.collect() # rdd2.collect()

1、sortBy如何實現全局排序

sortBy實際上調用sortByKey

def sortBy(self, keyfunc, ascending=True, numPartitions=None):
      return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()

2、sortBy的實現過程:

Stage 0:Sample。創建 RangePartitioner,先對輸入的數據的key做sampling來估算key的分布情況,然后按指定的排序切分出range,盡可能讓每個partition對應的range里的key的數量均勻。計算出來的 rangeBounds 是一個長為 numPartitions - 1 的list,記錄頭 numPartitions - 1 個partition對應的range的上界;最后一個partition的邊界就隱含在“剩余”當中。

 rddSize = self.count() # 統計rdd中包含的元素個數,假設rddSize=10000
 if not rddSize:
    return self  # empty RDD
maxSampleSize = numPartitions * 20.0  # 假設有4個分區,maxSampleSize=80
fraction = min(maxSampleSize / max(rddSize, 1), 1.0) # fraction=8/1000,
samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect() # 采樣 8/1000,根據采樣出的數據來估算key的分布情況。
samples = sorted(samples, key=keyfunc) # 對采樣得到的rdd collect之后得到的列表,調用python的sorted方法,完成從小到大排序,得到排好序的列表。
# 得到numPartition-1=3個邊界列表。
bounds = [samples[int(len(samples) * (i + 1) / numPartitions)]
                  for i in range(0, numPartitions - 1)]
# partitionBy根據給定的3個邊界進行分區,分區之后分區間的元素是排好序的。再調用mapPartitions,對每個分區的數據進行排序
def rangePartitioner(k):
    p = bisect.bisect_left(bounds, keyfunc(k))
    if ascending:
        return p
    else:
        return numPartitions - 1 - p

return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)

Stage 1:Shuffle Write。開始shuffle,在map side做shuffle write,根據前面計算出的rangeBounds來重新partition。
通過key值和區間邊界進行比較,如果位於改區間,則分配到該區間對應的分區。
Shuffle write出的數據中,每個partition內的數據雖然尚未排序,但partition之間已經可以保證數據是按照partition index排序的了。
Stage 2:Shuffle Read。然后到reduce side,每個reducer再對拿到的本partition內的數據做排序。這樣完成之后,partition之間的數據在map side就保證有排序,而每個partition內的數據在reduce side也保證有排序,就達到了全局排序的效果。如果在 sortByKey() 后面跟一個 collect() 調用,則它會按照partition index的順序獲取結果數據,最后再把這些數組合並起來,就在本地得到了全局排序后的大數組。

三、調用python方法

from collections import Counter
def category_trans(x):
    """
    統計每個分類下面視頻出現的次數
    :param x:
    :return:
    """
    tag = x[0]
    videos = x[1]
    result = Counter(videos)
    r = sorted(result.items(), key=lambda item: item[1],reverse=True)
    return tag,[item[0] for item in r]
rdd = rdd.map(lambda x:(x["tag"],[x["video_id"]])) # 此時rdd內的數據為[(tag1:[video_1]),(tag1,[video_2]),(tag2,[video_1]),...]
video_rdd = rdd.reduceByKey(lambda x,y: x+y) # [(tag1,[v1,v2,...]),...]
t2v = video_rdd.map(lambda x: category_trans(x))  # [(tag1,[排好序的列表]),...]
result = t2v.collectAsMap()

四、自定義類

將rdd中元素轉換為自定義類的實例

class MySort():
    """
    自定義類的__lt__()方法。python的類中已經自帶了lt,eq,ge,gt...等方法
    """
    def __init__(self,num):
        self.num = num
    def __lt__(self,other):
        return self.num<other.num

    def __repr__(self):
        return str(self.num)

rdd = sc.parallelize([(1, 1),(1, 2), (-1, 1), (-1, -0.5)])
rdd = rdd.map(MySort)
rdd = rdd.sortBy(lambda x: x,ascending=False)
rdd = rdd.foreach(lambda x: print(x))

參考資料

1、(Spark排序的原理? - RednaxelaFX的回答 - 知乎

https://www.zhihu.com/question/34771277/answer/187001059)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM