這里記錄pyspark的執行邏輯圖
代碼版本為 spark 2.2.0
1.執行邏輯
這里簡述一下代碼調用流程
1. 用戶通過spark-submit提交python代碼,spark-submit檢測到此次提交任務類型是python類型則會設置mainClass為PythonRunner
2. PythonRunner 會負責(通過process的方式)執行用戶的python代碼,並且啟動py4j的gatewayServer 把啟動端口通過環境變量通告到python進程中
3. 用戶代碼啟動sparkContext 通過py4j 初始化jvm中的sparkContext
4. jvm中的sparkContext開始執行,將dagScheduler,taskScheduler等初始化,啟動executor等角色
5. 用戶代碼執行rdd的action(pyspark的action都是jvm中collect觸發的)動作,runJob開始真正執行任務,通過對應的Task開始PythonRDD的執行
6. executor執行PythonRDD的compute,compute中創建對應的python進程執行pyspark.daemon,並且建立socket用於跟python進行數據傳輸
7. socket建立完成之后啟動writerThread線程將PythonRDD中從jvm中產生的父RDD開始執行,將數據寫入到socket,寫入完成后socket.flush
8. pyspark.daemon 調用pyspark.work 從socket中讀取要執行的python函數和數據,開始真正的數據處理邏輯
9. 數據處理完成之后將處理結果寫回socket,jvm中通過PythonRDD的read方法讀取,並返回結果
10. 最終executor將PythonRDD的執行結果上報到drive上,返回給用戶
具體執行邏輯圖和框架說明看這個博客整理的內容,其中邏輯圖畫的很明確,這里不再贅述,直接引用他的鏈接
2.計算代碼到執行邏輯的映射
rdd = sc.textFile(....)
#rdd = RDD(_jrdd = HadoopRDD)
#生成的RDD是通過py4j調用jvm中對應的方法產生的rdd,並且用python 代碼中的RDD進行包裝而成
#最初的RDD是數據源的RDD,此次是HadoopRDD,這個hadoopRDD最終在上一小節第7步中被執行
rdd1 = rdd.map(lambda x: x.split(" "))
#rdd1 = PipelinedRDD(_jrdd = PythonRDD(parent=HadoopRDD,PythonFunction(lambda)))
#此處的rdd是python中PipelinedRDD對上一個生成的RDD和用戶提供的方法進行整合的RDD,類型是jvm中的PythonRDD
#PipelinedRDD將父rdd進行傳遞,保證數據源的RDD始終都是新RDD的父RDD
rdd2 = rdd1.flatMap(lambda x: [(i,1) for i in x])
#rdd2 = PipelinedRDD(_jrdd = PythonRDD(parent=HadoopRDD,PythonFunction(func(_,lambda))))
rdd3 = rdd2.reduceByKey(lambda x,y: x + y)
#rdd3 = PipelinedRDD(_jrdd = PythonRDD(parent=HadoopRDD,PythonFunction(mergeCombiners)))
#無論進行多少次PipelinedRDD的轉換,始終都是對應在PythonRDD上,將父rdd傳遞到此,將方法一層一層嵌套傳進來
#保證最重PythonRDD被調用的時候父rdd是數據源rdd,pythonFunction是用戶的所有處理邏輯
rdd4 = rdd3.collect()
#rdd4 = [(word,Int)]
#此處生成的最終的結果,在collect的時候調用jvm中對應的PythonRDD.collectAndServer開始提交任務執行
#任務執行結果被傳回jvm,上報到drive,最重返回給用戶