PySpark調用自定義jar包


在開發PySpark程序時通常會需要用到Java的對象,而PySpark本身也是建立在Java API之上,通過Py4j來創建JavaSparkContext

Cached / shuffled

這里有幾點是需要注意的

1. Py4j只運行在driver

也就是說worker目前來說引入不了第三方的jar包。因為worker結點的PySpark是沒有啟動Py4j的通信進程的,相應的jar包自然也加載不了。之前沒有詳細看這部分文檔,系統設計時企圖在worker結點利用client模式直連Hbase來獲取部分數據,從而避免對整個表的JOIN操作,當然對於python來說這樣的操作只有通過引入jar包來實現(不考慮thrift方式)。但是測試的jar寫好之后,一直不成功,最后只有修改方案,后來才去查了官方文檔。

2. PythonRDD 的原型是 JavaRDD[String]

所有的經過PythonRDD傳遞的數據都通過BASE64編碼

3. PySpark 中的方法和匿名函數是通過cloudpickle序列化

為何函數需要被序列化,因為做map或者flatMap時,此時的函數或者lambda表達式是需要傳遞到各個worder的,如果函數里有用到閉包,cloudpickle也能巧妙的序列化。但是,需要傳遞的函數里請不要是用self關鍵字,因為傳遞過去后,self的指代關系已經不明確了。

文檔還提到PythonRDD的序列化是可定制的了,但是目前沒這個需求,所有沒測試

代碼示例

java 測試代碼, 編譯生成 pyspark-test.jar

package org.valux.py4j;
public class Calculate {
    public int sqAdd(int x){
        return x * x + 1;
    }
}

Python 測試代碼,放在文件 driver.py

from pyspark import SparkContext
from py4j.java_gateway import java_import

sc = SparkContext(appName="Py4jTesting")
java_import(sc._jvm, "org.valux.py4j.Calculate")
func = sc._jvm.Calculate()
print func.sqAdd(5)
"""
[OUTPUT] > 26
"""

 

"""
 !!![錯誤用法]
 這里是想在每個work上調用自定義的方法,
 前面已經提到過PySpark目前是不支持的
"""
rdd = sc.parallelize([1, 2, 3])
result = rdd.map(func.sqAdd).collect()

"""
 !!![錯誤用法]
 之前還有個錯誤的思路是想在work單獨 import 相應的 jar    
"""
def foo(x):
    java_import(sc._jvm, "org.valux.py4j.Calculate")
    func = sc._jvm.Calculate()
    func.sqAdd(x)
rdd = sc.parallelize([1, 2, 3])
rdd.map(foo).collect()

 

測試時,提交程序需要記得帶上jar包
> bin/spar-submit --driver-class-path pyspark-test.jar driver.py

這里又有一個坑,之前提交為了方便,一直都用的是 --jars 參數

--driver-class-path 附加的 jar 只會在 driver引入 --jars 附加的jar會在所有worker引入

幫助文檔里面還提到

--jars Comma-separated list of local jars to include on the driver and executor classpaths.

所有就偷個懶用了 --jars ,結果一直報如下錯誤:

py4j.protocol.Py4JError: Trying to call a package.

測試了好久終於解決了

參考文檔

https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM