[Spark Core] 在 Spark 集群上運行程序



0. 說明

  將 IDEA 下的項目導出為 Jar 包,部署到 Spark 集群上運行。

 

 


 

1. 打包程序

  1.0 前提

  搭建好 Spark 集群,完成代碼的編寫。

 

  1.1 修改代碼

  【添加內容,判斷參數的有效性】

    // 判斷參數的有效性
    if (args == null || args.length == 0) {
      throw new Exception("需要指定文件路徑") ;
    }

 

 

 

  【注釋掉 conf.setMaster("...")】

 

    // 不用寫,在提交代碼的時候通過 spark-submit --master ... 自動生成
    // conf.setMaster("spark://s101:7077")

 

 

 

   【將加載文件部分由固定路徑改為讀取傳入的路徑參數】

    // 1. 加載文件
    val rdd1 = sc.textFile(args(0))

 

  【原代碼參考】

  Spark 實現標簽生成  中 Scala 代碼部分

 

  【修改過的代碼如下】

import java.util
import com.share.util.TagUtil
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  * 標簽生成
  */
object TaggenCluster {
  def main(args: Array[String]): Unit = {
    // 判斷參數的有效性
    if (args == null || args.length == 0) {
      throw new Exception("需要指定文件路徑") ;
    }
    // 創建 spark 配置對象
    val conf = new SparkConf()
    conf.setAppName("TaggenScalaApp")

    // 不用寫,在提交代碼的時候通過 spark-submit --master ... 自動生成
    // conf.setMaster("spark://s101:7077")

    // 創建上下文
    val sc = new SparkContext(conf)

    // 1. 加載文件
    val rdd1 = sc.textFile(args(0))

    // 2. 解析每行的json數據成為集合
    val rdd2: RDD[(String, java.util.List[String])] = rdd1.map(line => {
      val arr: Array[String] = line.split("\t")
      // 商家id
      val busid: String = arr(0)
      // json
      val json: String = arr(1)
      val list: java.util.List[String] = TagUtil.extractTag(json)
      Tuple2[String, java.util.List[String]](busid, list)
    })

    // 3. 過濾空集合 (85766086,[干凈衛生, 服務熱情, 價格實惠, 味道贊])
    val rdd3: RDD[(String, util.List[String])] = rdd2.filter((t: Tuple2[String, java.util.List[String]]) => {
      !t._2.isEmpty
    })

    // 4. 將值壓扁  (78477325,味道贊)
    val rdd4: RDD[(String, String)] = rdd3.flatMapValues((list: java.util.List[String]) => {
      // 導入隱式轉換
      import scala.collection.JavaConversions._
      list
    })

    // 5. 濾除數字的tag  (78477325,菜品不錯)
    val rdd5 = rdd4.filter((t: Tuple2[String, String]) => {
      try {
        Integer.parseInt(t._2)
        false
      } catch {
        case _ => true
      }
    })

    // 6. 標1成對  ((70611801,環境優雅),1)
    val rdd6: RDD[Tuple2[Tuple2[String, String], Int]] = rdd5.map((t: Tuple2[String, String]) => {
      Tuple2[Tuple2[String, String], Int](t, 1)
    })

    // 7. 聚合  ((78477325,味道贊),8)
    val rdd7: RDD[Tuple2[Tuple2[String, String], Int]] = rdd6.reduceByKey((a: Int, b: Int) => {
      a + b
    })

    // 8. 重組 (83073343,List((性價比高,8)))
    val rdd8: RDD[Tuple2[String, List[Tuple2[String, Int]]]] = rdd7.map((t: Tuple2[Tuple2[String, String], Int]) => {
      Tuple2[String, List[Tuple2[String, Int]]](t._1._1, Tuple2[String, Int](t._1._2, t._2) :: Nil)
    })

    // 9. reduceByKey  (71039150,List((環境優雅,1), (價格實惠,1), (朋友聚會,1), (團建,1), (體驗好,1)))
    val rdd9: RDD[Tuple2[String, List[Tuple2[String, Int]]]] = rdd8.reduceByKey((a: List[Tuple2[String, Int]], b: List[Tuple2[String, Int]]) => {
      a ::: b
    })

    // 10. 分組內排序  (88496862,List((回頭客,5), (服務熱情,4), (味道贊,4), (分量足,3), (性價比高,2)))
    val rdd10: RDD[Tuple2[String, List[Tuple2[String, Int]]]] = rdd9.mapValues((list: List[Tuple2[String, Int]]) => {
      val list2: List[Tuple2[String, Int]] = list.sortBy((t: Tuple2[String, Int]) => {
        -t._2
      })
      list2.take(5)
    })

    // 11. 商家間排序 (75144086,List((服務熱情,38), (效果贊,30), (無辦卡,22), (環境優雅,22), (性價比高,21)))
    val rdd11: RDD[Tuple2[String, List[Tuple2[String, Int]]]] = rdd10.sortBy((t: Tuple2[String, List[Tuple2[String, Int]]]) => {
      t._2(0)._2
    }, false)

    rdd11.collect().foreach(println)
  }
}

 

  1.2 導出 Jar 包,並添加依賴的第三方類庫

  【打開 Project Structure】

  

 

 

  【添加模塊】

  

 

 

  【移除第三方類庫】

  

 

  【添加第三方類庫 fastjson】 

  

 

  【導入完成】

  

 

   【構建 Jar 包】

      

 

 

  【得到 Jar 包】

  

 

 

  

 


 

 

2. 運行程序

 

  2.0 將 Jar 包傳輸到服務器

  通過 Xftp 將 myspark.jar 傳到服務器,過程略。

 

  2.1 上傳文件到 HDFS 中

hdfs dfs -put temptags.txt /user/centos

 

  2.2 使用 spark-submit 提交應用(Scala)

spark-submit --class com.share.scala.mr.TaggenCluster --master spark://s101:7077 myspark.jar /user/centos/temptags.txt

 

  2.3 使用 spark-submit 提交應用(Java)

 

spark-submit --class com.share.java.mr.TaggenCluster --master spark://s101:7077 myspark.jar /user/centos/temptags.txt

 

 

 

 

 

 


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM