pyspark spark 快速入門 懶人版本
安裝
docker 安裝方式
最簡單的是直接docker,有一下幾個比較快速的安裝方式參考:
- https://github.com/actionml/docker-spark
- https://github.com/wongnai/docker-spark-standalone
- https://github.com/epahomov/docker-spark
- https://towardsdatascience.com/a-journey-into-big-data-with-apache-spark-part-1-5dfcc2bccdd2
如果是進一步的簡化,那就選擇安裝spark standalone version, 不依賴hadoop
安裝完成后,就可以啟動 sbin的腳本
./start-all.sh
其web的界面效果(這里有兩個worker):
注意:
由於上述的docker用的是基礎鏡像 openjdk:8-alpine,比較適合scala的環境,pyspark 需要python3 ,所以要在Dockerfile,另外處理python3 的安裝
直接官網下載bin
https://spark.apache.org/downloads.html
運行例子
最簡單直接在安裝的目標主機上,運行,(否則把localhost替換成spark host)
# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
--master spark://localhost:7077 \
examples/src/main/python/pi.py \
1000
觀察CPU,利用了多個核
pyspark
./bin/pyspark
運行pyspark的wordcount (helloworld)
>>> p='/usr/local/spark/README.md'
>>> text_file = sc.textFile(p)
>>> counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
>>> counts
PythonRDD[6] at RDD at PythonRDD.scala:53
>>> counts.collect()
[Stage 0:> (0 + 2) / 2]/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py:60: UserWarning: Please install psutil to have better support with spilling
/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py:60: UserWarning: Please install psutil to have better support with spilling
[('#', 1), ('Apache', 1), ('Spark', 15), ('', 73), ('is', 7), ('unified', 1), ('analytics', 1), ('engine', 2), ('It', 2), ('provides', 1), ('high-level', 1), ('APIs', 1), ('in', 6), ('Scala,', 1), ('Java,', 1), ('an', 4), ('optimized', 1), ('supports', 2), ('computation', 1), ('analysis.', 1), ('set', 2), ('of', 5), ('tools', 1), ('SQL', 2
...
注意點:
- counts 的數據類型是RDD
- counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) 的時候沒有真正跑, count.collect()的時候才真正運行任務。
這是spark特色:需要真正用到變量的時候才運行 - sc 是啟動pyspark 后自動初始化的變量
RDD (resilient distributed dataset )
RDD 是提供一直操作collection里面每個元素的方法,該方法可以在集群的各個node 並行運行。
例子教程:
注意:看起來這個RDD用處像是dataframe (除了跨node 並行計算),他是spark dataframe ,但實際語法與pandas的dataframe是不一樣的。
這就帶來一個問題,我需要重新熟悉學習他的用法。
為了解決這個問題,databricks社區有一個方案 Koalas
用了這個之后,就可以轉化到類普通pandas 的熟悉的dataframe處理
kdf = ks.from_pandas(pdf) #pandas的dataframe -> koalas
kdf = sdf.to_koalas() # spark dataframe ->koalas的dataframe
參考:
- https://www.iteblog.com/archives/2549.html
- https://koalas.readthedocs.io/en/latest/getting_started/10min.html
提交任務
上面最簡單的例子是提交一個python 文件到spark , 但實際我們需要多個files ,並且有依賴,事實上這里比較多坑
- 最簡單的依賴例子1 - 無第三方依賴的case:
[klg@ira-r740 wade-test]$ ls
application requirements.txt t.py
[klg@ira-r740 wade-test]$ tree application
application
└── myfile.py
0 directories, 1 file
t.py
from pyspark.sql import SparkSession
SPARK_MASTER_HOST = 'ira-r740'
SPARK_MASTER_PORT = 7077
def test(x):
from application.myfile import simple_function
return simple_function(x)
if __name__ == "__main__":
spark = SparkSession\
.builder\
.config('spark.master', 'spark://{spark_master_host}:{spark_master_port}' \
.format(spark_master_host=SPARK_MASTER_HOST, spark_master_port=SPARK_MASTER_PORT)) \
.appName("PythonPiwade")\
.getOrCreate()
# test by mapping function over RDD
rdd = spark.sparkContext.parallelize(range(1,100))
result = rdd.map(lambda x: test(x)).collect()
print("result", result)
spark.stop()
先壓縮,后提交
$cd wade-test
$zip -rq application.zip
$./bin/spark-submit --py-files /home/klg/pyspark/wade-test/application.zip /home/klg/pyspark/wade-test/t.py
- 最簡單的依賴例子 - 有第三方依賴的case
t.py
...
if __name__ == "__main__":
...
rdd = spark.sparkContext.parallelize(range(1,100))
result = rdd.map(lambda x: test(x)).collect()
print("result", result)
import bs4 ### 變動在這里
print(bs4.__version__) ### 變動在這里
spark.stop()
[klg@ira-r740 wade-test]$ ls
application application.zip requirements.txt t.py venv venv.zip
先壓縮,后提交
$cd wade-test
$python3 -m venv venv # 建立虛擬環境
$source venv/bin/activate
$pip install -r requirements.txt
$zip -rq venv.zip venv # 打包依賴
$export PYSPARK_PYTHON="venv/bin/python"
$spark-submit \
--name "Sample Spark Application" \
--conf "spark.yarn.appMasterEnv.SPARK_HOME=$SPARK_HOME" \
--conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=$PYSPARK_PYTHON" \
--archives "venv.zip#venv" \
--py-files "application.zip" \
t.py
注意:這里有個命令 export PYSPARK_PYTHON="venv/bin/python" , 指定了PYSPARK_PYTHON 的變量,以便運行的時候指定了python的路徑、依賴的包的路徑。不然會有找不到依賴包
-
最簡單的依賴例子 - 有第三方依賴的case (推薦的解決方案) , 弊端是多worker 需要重復操作。看自己balance, 為快速懶人入門:
- 只在少量worker or甚至1台worker 的情況下,直接在宿主機安裝spark
- 安裝后需要依賴報的,直接在宿主機的 運行pip3 install -r requirements.txt
更多參考:https://github.com/massmutual/sample-pyspark-application
實戰例子
處理csv :
csv.py
import pandas as pd
SPARK_MASTER_HOST = 'ira-r740'
SPARK_MASTER_PORT = 7077
spark = SparkSession\
.builder\
.config('spark.master', 'spark://{spark_master_host}:{spark_master_port}' \
.format(spark_master_host=SPARK_MASTER_HOST, spark_master_port=SPARK_MASTER_PORT)) \
.appName("PythonSmsWade")\
.getOrCreate()
def trs2(x):
try:
if( x and (x['smsContent'] )):
return (str(x['smsContent']).upper(),0)
else:
return ('',-1)
except Exception as e:
return ('',-1)
#normal quick way
df1=(spark.read.format("csv").options(header="true").load("/home/klg/pyspark/py/100k.csv")) #normal
result=spark.sparkContext.parallelize(df1.collect()).map(lambda x: trs2(x)).collect()
print(len(result))
設置環境spark 的path變量后(簡化提交),
$spark-submit csv.py
注意:
官網例子的文件路徑大部分是 hdfs,這里為了簡化懶人,簡化架構,不依賴hive 分布式存儲,file 可以有這么的選項:
- copy 到各個worker 機器上
- 從db 讀取(推薦)
- 放在某一台機器上后,其他worker mount 到這個目錄 (推薦)
自己需要balance的點: 沒有利用到分布式存儲對速度上的優勢,但大部分case 下面我們需要的處理時間在於多核的cpu 計算時間。
從db 讀取數據
import pandas as pd
def test2( x):
# method 1 : we reinit the var here , simple
# method 2 : we use global share var
from pymongo import MongoClient
client = MongoClient()
client = MongoClient('10.10.20.60', 31111)
query = {}
docs = list(client['cash_loan']['user'].find(query).limit(200).skip(x*10))
# do the job
# ...
return docs
SPARK_MASTER_HOST = 'ira-r740'
SPARK_MASTER_PORT = 7077
spark = SparkSession\
.builder\
.config('spark.master', 'spark://{spark_master_host}:{spark_master_port}' \
.format(spark_master_host=SPARK_MASTER_HOST, spark_master_port=SPARK_MASTER_PORT)) \
.appName("Pythonsmswade")\
.getOrCreate()
n=5
result=spark.sparkContext.parallelize(range(1,n)).map(lambda x: test2(x)).collect()
注意:
- return 的might be list of list if test2 return list [limit]*[n] (因為test2方法里面list 化返回值)
- 在test2 里面重新初始MongoClient 變量,以避免全局、共享變量問題
全局變量(共享變量)
To be continue...
那些年踩過的坑
To be continue...