[Spark Core] Spark 使用第三方 Jar 包的方式



 

0. 說明

  Spark 下運行job,使用第三方 Jar 包的 3 種方式。

 

 


 

1. 方式一

  將第三方 Jar 包分發到所有的 spark/jars 目錄下

  

 


 

2. 方式二

  將第三方 Jar 打散,和我們自己的 Jar 包打到一起

  類似的例子可以參考  在 Spark 集群上運行程序  中的打包部分

 


 

 

3. 方式三

  在 spark-submit 命令中,通過 --jars 指定使用的第三方 Jar 包

 

  

 

  【案例:使用 spark-shell 執行 taggen】

  1. 啟動 spark-shell,指定 fastjson 類庫。
  定位到 fastjson jar 包

D:\maven_repository\com\alibaba\fastjson\1.2.47\fastjson-1.2.47.jar

 

  2. 啟動spark-shell

spark-shell --master spark://s101:7077 --jars /home/centos/fastjson-1.2.47.jar

 

  3. 定義函數 extractTag

 

// 定義函數,抽取標簽列表
def extractTag(json: String) = {
import com.alibaba.fastjson.JSON
var list: scala.List[String] = Nil
// 將字符串解析成 json 對象
val obj = JSON.parseObject(json)
val arr = obj.getJSONArray("extInfoList")
if (arr != null && arr.size > 0) { // 得到數組的第一個 json 對象
val firstObj = arr.getJSONObject(0)
val values = firstObj.getJSONArray("values")
if (values != null && values.size > 0) {
var i = 0
while (i < values.size) {
val tag = values.getString(i)
list = tag :: list
i += 1;
}
}
}
list
}

 

 

  4. 加載文件

// 1. 加載文件
val rdd1 = sc.textFile("/user/centos/temptags.txt")

 

  5. 解析每行的 json 數據成為集合

// 2. 解析每行的json數據成為集合
val rdd2 = rdd1.map(line => {
val arr: Array[String] = line.split("\t")
// 商家id
val busid: String = arr(0)
// json
val json: String = arr(1)
val list: scala.List[String] = extractTag(json)
(busid, list)
})

 

  6. 過濾空集合

// 3. 過濾空集合 (85766086,[干凈衛生, 服務熱情, 價格實惠, 味道贊])
val rdd3 = rdd2.filter(t => {
!t._2.isEmpty
})

 

  7. 將值壓扁

//4. 將值壓扁
val rdd4 = rdd3.flatMapValues(list=>{
list
})

 

  8. 濾除數字的tag

//5. 濾除數字的tag
val rdd5 = rdd4.filter(t=>{
try{
//
Integer.parseInt(t._2)
false
}
catch {
case _ => true
}
})

 

  9. 標1成對

//6. 標1成對
val rdd6 = rdd5.map(t=>{
(t,1)
})

 

  10. 聚合

//7. 聚合
val rdd7 = rdd6.reduceByKey(_+_)

 

  11. 重組

//8. 重組
val rdd8 = rdd7.map(t=>{
(t._1._1,(t._1._2 , t._2)::Nil)
})

 

  12. reduceByKey

//9. reduceByKey
val rdd9 =rdd8.reduceByKey(_ ::: _)

 

  13. 分組內排序

//10. 分組內排序
val rdd10=rdd9.mapValues(list=>{
list.sortBy(t=>{
-t._2
}).take(5)
})

 

  14. 商家間排序

//11. 商家間排序
val rdd11= rdd10.sortBy(t=>{
t._2(0)._2
} ,false)

 

  15. collect

rdd11.collect()

 

  16. 查看 Web UI
  http://s101:8080/

 

  17. DAG 視圖

  

 

 


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM