pyspark執行邏輯流程


這里記錄pyspark的執行邏輯圖

代碼版本為 spark 2.2.0

1.執行邏輯

這里簡述一下代碼調用流程
1. 用戶通過spark-submit提交python代碼,spark-submit檢測到此次提交任務類型是python類型則會設置mainClass為PythonRunner

2. PythonRunner 會負責(通過process的方式)執行用戶的python代碼,並且啟動py4j的gatewayServer 把啟動端口通過環境變量通告到python進程中

3. 用戶代碼啟動sparkContext 通過py4j 初始化jvm中的sparkContext

4. jvm中的sparkContext開始執行,將dagScheduler,taskScheduler等初始化,啟動executor等角色

5. 用戶代碼執行rdd的action(pyspark的action都是jvm中collect觸發的)動作,runJob開始真正執行任務,通過對應的Task開始PythonRDD的執行

6. executor執行PythonRDD的compute,compute中創建對應的python進程執行pyspark.daemon,並且建立socket用於跟python進行數據傳輸

7. socket建立完成之后啟動writerThread線程將PythonRDD中從jvm中產生的父RDD開始執行,將數據寫入到socket,寫入完成后socket.flush

8. pyspark.daemon 調用pyspark.work 從socket中讀取要執行的python函數和數據,開始真正的數據處理邏輯

9. 數據處理完成之后將處理結果寫回socket,jvm中通過PythonRDD的read方法讀取,並返回結果

10. 最終executor將PythonRDD的執行結果上報到drive上,返回給用戶

具體執行邏輯圖和框架說明看這個博客整理的內容,其中邏輯圖畫的很明確,這里不再贅述,直接引用他的鏈接

2.計算代碼到執行邏輯的映射

    rdd = sc.textFile(....)        
    #rdd = RDD(_jrdd = HadoopRDD)
    #生成的RDD是通過py4j調用jvm中對應的方法產生的rdd,並且用python 代碼中的RDD進行包裝而成
    #最初的RDD是數據源的RDD,此次是HadoopRDD,這個hadoopRDD最終在上一小節第7步中被執行

    rdd1 = rdd.map(lambda x: x.split(" "))        
    #rdd1 = PipelinedRDD(_jrdd = PythonRDD(parent=HadoopRDD,PythonFunction(lambda)))
    #此處的rdd是python中PipelinedRDD對上一個生成的RDD和用戶提供的方法進行整合的RDD,類型是jvm中的PythonRDD
    #PipelinedRDD將父rdd進行傳遞,保證數據源的RDD始終都是新RDD的父RDD

    rdd2 = rdd1.flatMap(lambda x: [(i,1) for i in x])         
    #rdd2 = PipelinedRDD(_jrdd = PythonRDD(parent=HadoopRDD,PythonFunction(func(_,lambda))))
    rdd3 = rdd2.reduceByKey(lambda x,y: x + y)        
    #rdd3 = PipelinedRDD(_jrdd = PythonRDD(parent=HadoopRDD,PythonFunction(mergeCombiners)))
    #無論進行多少次PipelinedRDD的轉換,始終都是對應在PythonRDD上,將父rdd傳遞到此,將方法一層一層嵌套傳進來
    #保證最重PythonRDD被調用的時候父rdd是數據源rdd,pythonFunction是用戶的所有處理邏輯

    rdd4 = rdd3.collect()        
    #rdd4 = [(word,Int)]
    #此處生成的最終的結果,在collect的時候調用jvm中對應的PythonRDD.collectAndServer開始提交任務執行
    #任務執行結果被傳回jvm,上報到drive,最重返回給用戶

3.代碼鏈接目錄


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM