在開發PySpark
程序時通常會需要用到Java的對象,而PySpark
本身也是建立在Java API之上,通過Py4j來創建JavaSparkContext
。
這里有幾點是需要注意的
1. Py4j
只運行在driver
也就是說worker
目前來說引入不了第三方的jar包。因為worker
結點的PySpark是沒有啟動Py4j的通信進程的,相應的jar包自然也加載不了。之前沒有詳細看這部分文檔,系統設計時企圖在worker
結點利用client模式直連Hbase來獲取部分數據,從而避免對整個表的JOIN操作,當然對於python來說這樣的操作只有通過引入jar包來實現(不考慮thrift方式)。但是測試的jar寫好之后,一直不成功,最后只有修改方案,后來才去查了官方文檔。
2. PythonRDD
的原型是 JavaRDD[String]
所有的經過PythonRDD傳遞的數據都通過BASE64編碼
3. PySpark
中的方法和匿名函數是通過cloudpickle
序列化
為何函數需要被序列化,因為做map
或者flatMap
時,此時的函數或者lambda表達式是需要傳遞到各個worder
的,如果函數里有用到閉包,cloudpickle
也能巧妙的序列化。但是,需要傳遞的函數里請不要是用self
關鍵字,因為傳遞過去后,self
的指代關系已經不明確了。
文檔還提到
PythonRDD
的序列化是可定制的了,但是目前沒這個需求,所有沒測試
代碼示例
java 測試代碼, 編譯生成 pyspark-test.jar
package org.valux.py4j; public class Calculate { public int sqAdd(int x){ return x * x + 1; } }
Python 測試代碼,放在文件 driver.py
from pyspark import SparkContext from py4j.java_gateway import java_import sc = SparkContext(appName="Py4jTesting") java_import(sc._jvm, "org.valux.py4j.Calculate") func = sc._jvm.Calculate() print func.sqAdd(5) """ [OUTPUT] > 26 """
""" !!![錯誤用法] 這里是想在每個work上調用自定義的方法, 前面已經提到過PySpark目前是不支持的 """ rdd = sc.parallelize([1, 2, 3]) result = rdd.map(func.sqAdd).collect() """ !!![錯誤用法] 之前還有個錯誤的思路是想在work單獨 import 相應的 jar """ def foo(x): java_import(sc._jvm, "org.valux.py4j.Calculate") func = sc._jvm.Calculate() func.sqAdd(x) rdd = sc.parallelize([1, 2, 3]) rdd.map(foo).collect()
測試時,提交程序需要記得帶上jar包
> bin/spar-submit --driver-class-path pyspark-test.jar driver.py
這里又有一個坑,之前提交為了方便,一直都用的是 --jars 參數
--driver-class-path 附加的 jar 只會在
driver
引入 --jars 附加的jar會在所有worker
引入
幫助文檔里面還提到
--jars Comma-separated list of local jars to include on the driver and executor classpaths.
所有就偷個懶用了 --jars ,結果一直報如下錯誤:
py4j.protocol.Py4JError: Trying to call a package.
測試了好久終於解決了
參考文檔
https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals