0. 說明
Spark 下運行job,使用第三方 Jar 包的 3 種方式。
1. 方式一
將第三方 Jar 包分發到所有的 spark/jars 目錄下
2. 方式二
將第三方 Jar 打散,和我們自己的 Jar 包打到一起
類似的例子可以參考 在 Spark 集群上運行程序 中的打包部分
3. 方式三
在 spark-submit 命令中,通過 --jars 指定使用的第三方 Jar 包
【案例:使用 spark-shell 執行 taggen】
1. 啟動 spark-shell,指定 fastjson 類庫。
定位到 fastjson jar 包
D:\maven_repository\com\alibaba\fastjson\1.2.47\fastjson-1.2.47.jar
2. 啟動spark-shell
spark-shell --master spark://s101:7077 --jars /home/centos/fastjson-1.2.47.jar
3. 定義函數 extractTag
// 定義函數,抽取標簽列表 def extractTag(json: String) = { import com.alibaba.fastjson.JSON var list: scala.List[String] = Nil // 將字符串解析成 json 對象 val obj = JSON.parseObject(json) val arr = obj.getJSONArray("extInfoList") if (arr != null && arr.size > 0) { // 得到數組的第一個 json 對象 val firstObj = arr.getJSONObject(0) val values = firstObj.getJSONArray("values") if (values != null && values.size > 0) { var i = 0 while (i < values.size) { val tag = values.getString(i) list = tag :: list i += 1; } } } list }
4. 加載文件
// 1. 加載文件 val rdd1 = sc.textFile("/user/centos/temptags.txt")
5. 解析每行的 json 數據成為集合
// 2. 解析每行的json數據成為集合 val rdd2 = rdd1.map(line => { val arr: Array[String] = line.split("\t") // 商家id val busid: String = arr(0) // json val json: String = arr(1) val list: scala.List[String] = extractTag(json) (busid, list) })
6. 過濾空集合
// 3. 過濾空集合 (85766086,[干凈衛生, 服務熱情, 價格實惠, 味道贊]) val rdd3 = rdd2.filter(t => { !t._2.isEmpty })
7. 將值壓扁
//4. 將值壓扁 val rdd4 = rdd3.flatMapValues(list=>{ list })
8. 濾除數字的tag
//5. 濾除數字的tag val rdd5 = rdd4.filter(t=>{ try{ // Integer.parseInt(t._2) false } catch { case _ => true } })
9. 標1成對
//6. 標1成對 val rdd6 = rdd5.map(t=>{ (t,1) })
10. 聚合
//7. 聚合 val rdd7 = rdd6.reduceByKey(_+_)
11. 重組
//8. 重組 val rdd8 = rdd7.map(t=>{ (t._1._1,(t._1._2 , t._2)::Nil) })
12. reduceByKey
//9. reduceByKey val rdd9 =rdd8.reduceByKey(_ ::: _)
13. 分組內排序
//10. 分組內排序 val rdd10=rdd9.mapValues(list=>{ list.sortBy(t=>{ -t._2 }).take(5) })
14. 商家間排序
//11. 商家間排序 val rdd11= rdd10.sortBy(t=>{ t._2(0)._2 } ,false)
15. collect
rdd11.collect()
16. 查看 Web UI
http://s101:8080/
17. DAG 視圖