pipe(command, [envVars])
對於每個分區,都執行一個perl或者shell腳本,返回輸出的RDD
1 |
|
使用Spark Pipe來給你的既有分析任務提速
專業:計算機。
有同學問我,怎么用Spark來調用外部程序,我想到了pipe可以做這個事情。文章封面圖就是PySpark的實現方案,其中就用到了pipe這個機制。
同學的需求和問題如下:
- 他有2萬個文件,每個10G,放在HDFS上了,總量200TB的數據需要分析。
- 分析程序本身已經寫好了,程序接受一個參數:文件路徑
- 如何用spark完成集群整個分析任務?
我靈機一動 想到了Spark Pipe 應該可以完成。查了一下資料,歸納和整理如下:
總的來說就是Spark有一個pipe的編程接口,用的是Unix的標准輸入和輸出,類似於 Unix的 |
管道,例如: ls | grep ^d
第一步:創建RDD
這一個步驟主要是羅列輸入的任務,即,包含哪些文件。
// 此處文件的List可以從另一個HDFS上的文件讀取過來 val data = List("hdfs://xxx/xxx/xxx/1.txt","hdfs://xxx/xxx/xxx/2.txt",...) val dataRDD = sc.makeRDD(data) //sc 是你的 SparkContext
第二步:創建一個Shell腳本啟動分析任務
我們已經有了RDD了,那么接下來寫一個啟動launch.sh
腳本來啟動我們的分析程序
#!/bin/sh echo "Running launch.sh shell script..." while read LINE; do echo "啟動分析任務, 待分析文件路徑為: ${LINE}" bash hdfs://xxx/xxx/xx/analysis_program.sh ${LINE} done
第三步:RDD對接到啟動腳本
下面的步驟就是整合步驟了
val scriptPath = "hdfs://xxx/xxx/launch.sh"
val pipeRDD = dataRDD.pipe(scriptPath)
pipeRDD.collect()
總結一下,
dataRDD
里面包含了我們要分析的文件列表,這個列表會被分發到spark集群,- 然后spark的工作節點會分別啟動一個
launch.sh
腳本,接受文件列表作為輸入參數, - 在
launch.sh
腳本的循環體用這些文件列表啟動具體的分析任務
這樣之后的好處是:
- 既有程序
analysis_program.sh
不需要任何修改,做到了重用,這是最大的好處 - 使用集群來做分析,速度比以前更快了(線性提升)
- 提高了機器的利用率(以前可能是一台機器分析)
附:如何用ansible 搭建一個standalone的spark集群 https://github.com/lresende/ansible-spark-cluster#deploying-spark-standalone