Spark 中的join方式(pySpark)


     spark基礎知識請參考spark官網:http://spark.apache.org/docs/1.2.1/quick-start.html

    無論是mapreduce還是spark ,分布式框架的性能優化方向大致分為:負載均衡、網絡傳輸和磁盤I/O 這三塊。而spark是基於內存的計算框架,因此在編寫應用時需要充分利用其內存計算特征。本篇主要針對

spark應用中的join問題進行討論,關於集群參數的優化會在另一篇文章中提及。

    在傳統的數據庫平台和分布式計算平台,join的性能消耗都是很可觀的,對spark來說如果join的表比較大,那么在shuffle時網絡及磁盤壓力會明顯提升,嚴重時可能會造成excutor失敗導致任務無法進行下去,

對這種join的優化方法主要是采用map和filter來改變join的實現方式,減少shuffle階段的網絡和磁盤I/O。下面以表的數據量大小分兩部分來討論。

   大表:數據量較大的表

   小表:數據量較小的表

一、大表與小表之間的join

   這種join是大部分業務場景的主要join方式,將小表以broadcast的形式分發到每個executor后對大表進行filter操作,以下對每種join進行示例說明(兼容表中ID不唯一的情況)。

  1、leftOuterJoin 

  >>>d1=sc.parallelize([(1,2),(2,3),(2,4),(3,4)])

  >>>d2=sc.parallelize([(1,'a'),(2,'b'),(1,'d'),(5,'2')])

  原生實現方式:

  >>>d1.leftOuterJoin(d2).collect()

  >>>[(1, (2, 'a')), (1, (2, 'd')), (2, (4, 'b')), (2, (3, 'b')), (3, (4, None))]

   map實現方式(小表在右的實現方式,小表在左的情況會稍微復雜些,需要多一些操作操作,實際場景中不多見):

def  doJoin(row):
    result=[]
    if row[1][1] is not None:
        for i in row[1][1]:
            result+=[(row[0],(row[1][0],i))]
  else:
            result+=[row]
  return result

d2_map={}
for i in d2.groupByKey().collect():
    d2_map[i[0]]=i[1]
d2_broadcast=sc.broadcast(d2_map)
d2_dict=d2_broadcast.value
d1.map(lambda row:(row[0],(row[1],d2_dict.get(row[0])))).flatMap(doJoin).collect()               

>>>[(1, (2, 'd')), (1, (2, 'a')), (2, (3, 'b')), (2, (4, 'b')), (3, (4, None))]

2、join

這里的join指的是innerjoin即只取出匹配到的數據項,只需要在上面的實現方式中加個filter即可

d1.map(lambda row:(row[0],(row[1],d2_dict.get(row[0])))).filter(lambda row:row[1][1] is not None).flatMap(doJoin).collect()

>>>[(1, (2, 'd')), (1, (2, 'a')), (2, (3, 'b')), (2, (4, 'b'))]

 

二、大表與大表之間的join(Reduce-join)

大表之間的join無法通過緩存數據來達到優化目的,因此需要把優化的重點放在分區效率及key的設計上

1、join的key值盡量使用數值類型,減少分區及shuffle的操作時間,在join時數值類型的key值在匹配時更快

2、將過濾條件放在join之前,使得join的數據量盡量最少

3、在join之前將兩個表按相同分區數進行重新分區

reduce-join:指將兩個表按key值進行分區,相同key的數據會被分在同一個分區,最后使用mapPartition進行join操作。

 4、如果需要減少分區和並行度,請使用coalesce 而非repartition 方法。

* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.

三、其它優化方式

1、同一份數據被多次用到,在讀入時進行緩存,后面直接使用,例如配置表,如果數據量不大則進行broadcast,否則使用cache

2、盡量減少重復計算,同樣的計算邏輯只計算一次

3、幾個優化參數

spark.akka.frameSize 1000                       集群間通信 一幀數據的大小,設置太小可能會導致通信延遲

spark.akka.timeout 100                             通信等待最長時間(秒為單位)
spark.akka.heartbeat.pauses 600                 心跳失敗最大間隔(秒為單位)
spark.serializer org.apache.spark.serializer.KryoSerializer    序列化方式(sprak自己的實現方式)
spark.sql.autoBroadcastJoinThreshold -1           禁止自動broadcast表
spark.shuffle.consolidateFiles true             shuffle 自動合並小文件

 

四、后續優化方向

1、內存優化:對象所占用的內存,訪問對象的消耗以及垃圾回收(garbage collection)所占用的開銷

2、優化數據結構

3、優化RDD存儲

4、並行度

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM