spark RDD pipe 調用外部腳本


 

pipe(command, [envVars])
對於每個分區,都執行一個perl或者shell腳本,返回輸出的RDD

1
2
3
4
5
6
7
8
9
10
11
scala> val rdd = sc.makeRDD(List("wangguo","yangxiu","xiaozhou","kangkang"),3)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at makeRDD at <console>:24

scala> rdd.pipe("/opt/test/spark/pipe.sh").collect
res4: Array[String] = Array(wangcen, wangguohehe, wangcen, yangxiuhehe, wangcen, xiaozhouhehe, kangkanghehe)

scala> val rdd = sc.makeRDD(List("wangguo","yangxiu","xiaozhou","kangkang"),4)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at makeRDD at <console>:24

scala> rdd.pipe("/opt/test/spark/pipe.sh").collect
res5: Array[String] = Array(wangcen, wangguohehe, wangcen, yangxiuhehe, wangcen, xiaozhouhehe, wangcen, kangkanghehe)

使用Spark Pipe來給你的既有分析任務提速

有同學問我,怎么用Spark來調用外部程序,我想到了pipe可以做這個事情。文章封面圖就是PySpark的實現方案,其中就用到了pipe這個機制。

同學的需求和問題如下:

  1. 他有2萬個文件,每個10G,放在HDFS上了,總量200TB的數據需要分析。
  2. 分析程序本身已經寫好了,程序接受一個參數:文件路徑
  3. 如何用spark完成集群整個分析任務?

我靈機一動 想到了Spark Pipe 應該可以完成。查了一下資料,歸納和整理如下:

總的來說就是Spark有一個pipe的編程接口,用的是Unix的標准輸入和輸出,類似於 Unix的 | 管道,例如: ls | grep ^d

第一步:創建RDD

這一個步驟主要是羅列輸入的任務,即,包含哪些文件。

// 此處文件的List可以從另一個HDFS上的文件讀取過來 val data = List("hdfs://xxx/xxx/xxx/1.txt","hdfs://xxx/xxx/xxx/2.txt",...) val dataRDD = sc.makeRDD(data) //sc 是你的 SparkContext

第二步:創建一個Shell腳本啟動分析任務

我們已經有了RDD了,那么接下來寫一個啟動launch.sh腳本來啟動我們的分析程序

#!/bin/sh echo "Running launch.sh shell script..." while read LINE; do echo "啟動分析任務, 待分析文件路徑為: ${LINE}" bash hdfs://xxx/xxx/xx/analysis_program.sh ${LINE} done

第三步:RDD對接到啟動腳本

下面的步驟就是整合步驟了

val scriptPath = "hdfs://xxx/xxx/launch.sh"
val pipeRDD = dataRDD.pipe(scriptPath)
pipeRDD.collect()

 

總結一下,

  1. dataRDD里面包含了我們要分析的文件列表,這個列表會被分發到spark集群,
  2. 然后spark的工作節點會分別啟動一個launch.sh腳本,接受文件列表作為輸入參數,
  3. launch.sh腳本的循環體用這些文件列表啟動具體的分析任務

這樣之后的好處是:

  1. 既有程序analysis_program.sh 不需要任何修改,做到了重用,這是最大的好處
  2. 使用集群來做分析,速度比以前更快了(線性提升)
  3. 提高了機器的利用率(以前可能是一台機器分析)

 

附:如何用ansible 搭建一個standalone的spark集群 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM