pyspark使用及原理


1、windows環境搭建

(1)將pyspark、py4j,放到python安裝目錄下。

(2)將其他的相關jar包,放到spark jars目錄下。

(3)pycharm配置好python解析器、公司的proxy代理,pip.int放到指定目錄下。

2、linux環境搭建

(1)將pyspark、py4j,放到python安裝目錄下。

(2)將其他的相關jar包,放到spark jars目錄下。

mongo-java-driver-3.12.5.jar

mongo-spark-connector_2.11-2.4.2.jar

(3)在工程里面打包依賴zip,不然會報錯。

(4)單獨將要運行的pyspark文件拷貝出來即可。

(5)修改spark-env.sh配置,指定python版本

export PYSPARK_PYTHON=/usr/bin/python3

 

提交腳本

/boot/clouddragen/spark/bin/spark-submit \
--master spark://xxx:7077,xxx:7077 \
--py-files /boot/clouddragen/temp/crawler-spark.zip \
--executor-memory 10G \
/boot/clouddragen/temp/ReadMongodb.py

 

pysparktest.py

from pyspark import SparkConf, SparkContext

conf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("wordcount")
sc = SparkContext.getOrCreate(conf)

list = [1, 2, 3, 4, 5]
rdd = sc.parallelize(list)
print(rdd.collect())

 

3、pyspark運行原理:

轉載博客:https://mp.weixin.qq.com/s/qgfcqKMyTOC-AlQYGmk6VQ

  Spark主要是由Scala語言開發,為了方便和其他系統集成而不引入scala相關依賴,部分實現使用Java語言開發,例如External Shuffle Service等。總體來說,Spark是由JVM語言實現,會運行在JVM中。然而,Spark除了提供Scala/Java開發接口外,還提供了Python、R等語言的開發接口,為了保證Spark核心實現的獨立性,Spark僅在外圍做包裝,實現對不同語言的開發支持,本文主要介紹Python Spark的實現原理,剖析pyspark應用程序是如何運行起來的。

Spark運行時架構

  首先我們先回顧下Spark的基本運行時架構,如下圖所示,其中橙色部分表示為JVM,Spark應用程序運行時主要分為Driver和Executor,Driver負載總體調度及UI展示,Executor負責Task運行,Spark可以部署在多種資源管理系統中,例如Yarn、Mesos等,同時Spark自身也實現了一種簡單的Standalone(獨立部署)資源管理系統,可以不用借助其他資源管理系統即可運行。

 

  用戶的Spark應用程序運行在Driver上(某種程度上說,用戶的程序就是Spark Driver程序),經過Spark調度封裝成一個個Task,再將這些Task信息發給Executor執行,Task信息包括代碼邏輯以及數據信息,Executor不直接運行用戶的代碼。

PySpark運行時架構

  為了不破壞Spark已有的運行時架構,Spark在外圍包裝一層Python API,借助Py4j實現Python和Java的交互,進而實現通過Python編寫Spark應用程序,其運行時架構如下圖所示。

 

  其中白色部分是新增的Python進程,

  在Driver端,通過Py4j實現在Python中調用Java的方法,即將用戶寫的PySpark程序”映射”到JVM中,例如,用戶在PySpark中實例化一個Python的SparkContext對象,最終會在JVM中實例化Scala的SparkContext對象;

  在Executor端,則不需要借助Py4j,因為Executor端運行的Task邏輯是由Driver發過來的,那是序列化后的字節碼,雖然里面可能包含有用戶定義的Python函數或Lambda表達式,Py4j並不能實現在Java里調用Python的方法,為了能在Executor端運行用戶定義的Python函數或Lambda表達式,則需要為每個Task單獨啟一個Python進程,通過socket通信方式將Python函數或Lambda表達式發給Python進程執行。語言層面的交互總體流程如下圖所示,實線表示方法調用,虛線表示結果返回。

  下面分別詳細剖析PySpark的Driver是如何運行起來的以及Executor是如何運行Task的。

Driver端運行原理

  當我們通過spark-submmit提交pyspark程序,首先會上傳python腳本及依賴,並申請Driver資源,當申請到Driver資源后,會通過PythonRunner(其中有main方法)拉起JVM,如下圖所示。

 

PythonRunner入口main函數里主要做兩件事:

  • 開啟Py4j GatewayServer
  • 通過Java Process方式運行用戶上傳的Python腳本,用戶Python腳本起來后,首先會實例化Python版的SparkContext對象,在實例化過程中會做兩件事:
  • 實例化Py4j GatewayClient,連接JVM中的Py4j GatewayServer,后續在Python中調用Java的方法都是借助這個Py4j Gateway
  • 通過Py4j Gateway在JVM中實例化SparkContext對象

  經過上面兩步后,SparkContext對象初始化完畢,Driver已經起來了,開始申請Executor資源,同時開始調度任務。用戶Python腳本中定義的一系列處理邏輯最終遇到action方法后會觸發Job的提交,提交Job時是直接通過Py4j調用Java的PythonRDD.runJob方法完成,映射到JVM中,會轉給sparkContext.runJob方法,Job運行完成后,JVM中會開啟一個本地Socket等待Python進程拉取,對應地,Python進程在調用PythonRDD.runJob后就會通過Socket去拉取結果。

  把前面運行時架構圖中Driver部分單獨拉出來,如下圖所示,通過PythonRunner入口main函數拉起JVM和Python進程,JVM進程對應下圖橙色部分,Python進程對應下圖白色部分。Python進程通過Py4j調用Java方法提交Job,Job運行結果通過本地Socket被拉取到Python進程。還有一點是,對於大數據量,例如廣播變量等,Python進程和JVM進程是通過本地文件系統來交互,以減少進程間的數據傳輸。

 

Executor端運行原理

  為了方便闡述,以Spark On Yarn為例,當Driver申請到Executor資源時,會通過CoarseGrainedExecutorBackend(其中有main方法)拉起JVM,啟動一些必要的服務后等待Driver的Task下發,在還沒有Task下發過來時,Executor端是沒有Python進程的。當收到Driver下發過來的Task后,Executor的內部運行過程如下圖所示。

 

 

  Executor端收到Task后,會通過launchTask運行Task,最后會調用到PythonRDD的compute方法,來處理一個分區的數據,PythonRDD的compute方法的計算流程大致分三步走:

  • 如果不存在pyspark.deamon后台Python進程,那么通過Java Process的方式啟動pyspark.deamon后台進程,注意每個Executor上只會有一個pyspark.deamon后台進程,否則,直接通過Socket連接pyspark.deamon,請求開啟一個pyspark.worker進程運行用戶定義的Python函數或Lambda表達式。pyspark.deamon是一個典型的多進程服務器,來一個Socket請求,fork一個pyspark.worker進程處理,一個Executor上同時運行多少個Task,就會有多少個對應的pyspark.worker進程。
  • 緊接着會單獨開一個線程,給pyspark.worker進程喂數據,pyspark.worker則會調用用戶定義的Python函數或Lambda表達式處理計算。
  • 在一邊喂數據的過程中,另一邊則通過Socket去拉取pyspark.worker的計算結果。

  把前面運行時架構圖中Executor部分單獨拉出來,如下圖所示,橙色部分為JVM進程,白色部分為Python進程,每個Executor上有一個公共的pyspark.deamon進程,負責接收Task請求,並fork pyspark.worker進程單獨處理每個Task,實際數據處理過程中,pyspark.worker進程和JVM Task會較頻繁地進行本地Socket數據通信。

 

 

總結

  總體上來說,PySpark是借助Py4j實現Python調用Java,來驅動Spark應用程序,本質上主要還是JVM runtime,Java到Python的結果返回是通過本地Socket完成。雖然這種架構保證了Spark核心代碼的獨立性,但是在大數據場景下,JVM和Python進程間頻繁的數據通信導致其性能損耗較多,惡劣時還可能會直接卡死,所以建議對於大規模機器學習或者Streaming應用場景還是慎用PySpark,盡量使用原生的Scala/Java編寫應用程序,對於中小規模數據量下的簡單離線任務,可以使用PySpark快速部署提交。

 

4、pyspark讀取數據源

(1)讀取mongodb,生成dframe

from pyspark.sql import SparkSession
from offiineProcessOfSpark import download
import logging

input_uri = "mongodb://ip:27017/S3Libmaster.s3apis"

sc = SparkSession \
    .builder \
    .appName("") \
    .config("spark.mongodb.input.uri", input_uri) \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.2") \
    .getOrCreate()

# sc.sparkContext.setLogLevel('ERROR')

dataframeRDD = sc.read.format('com.mongodb.spark.sql.DefaultSource').load()

dataframeRDD.registerTempTable("s3apis")

scannedFullNameRDD = sc.sql("select fullName from s3apis where fork = 'false' and scanned=1")

sc.stop()

(2)讀取mysql生成dframe

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

sc = SparkSession.builder.appName("DistributeCrawler").master("local[10]") \
    .config('spark.some.config,option0', 'some-value') \
    .getOrCreate()
ctx = SQLContext(sc)
jdbcDf = ctx.read.format("jdbc").options(url="jdbc:mysql://IP:3306/ghtorrent",
                                         driver="com.mysql.jdbc.Driver",
                                         dbtable="(SELECT * FROM projects limit 10000000) projects", user="",
                                         password="").load()

jdbcDf.registerTempTable("projects")

toDownLoadRDD = sc.sql("select id,url from projects where language is not null and forked_from is null and deleted = 0")

# toDownLoadRDD = sc.sql("select id,url from projects")

# toDownLoadRDD.collect()

print(toDownLoadRDD.count())

sc.stop()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM