spark之scala程序開發(本地運行模式):單詞出現次數統計


准備工作:

將運行Scala-Eclipse的機器節點(CloudDeskTop)內存調整至4G,因為需要在該節點上跑本地(local)Spark程序,本地Spark程序會啟動Worker進程耗用大量內存資源

本地運行模式(主要用於調試)


1、首先將Spark的所有jar包拷貝到hadoop用戶家目錄下

[hadoop@CloudDeskTop spark-2.1.1]$ pwd
/software/spark-2.1.1
[hadoop@CloudDeskTop spark-2.1.1]$ cp -a jars /home/hadoop/bigdata/lib/Spark2.1.1-All
[hadoop@CloudDeskTop spark-2.1.1]$ ls ~/bigdata/lib
Hadoop2.7.3-All  HBase1.2.6-All  RPC  Spark2.1.1-All

2、在Scala4.4的Eclipse版本中,新建一個Scala的工程

然后在Eclipse中創建一個Spark2.1.1-All的用戶庫,將~/Spark2.1.1-All目錄中的所有jar包導入Spark2.1.1-All用戶庫中,並將此用戶庫添加到當前的Scala工程中:Window==>Preferences==>Java==>Build Path==>User Libraries==>New

3、在本地創建測試數據

[hadoop@CloudDeskTop scala]$ pwd
/home/hadoop/test/scala
[hadoop@CloudDeskTop scala]$ ls
information  information02
[hadoop@CloudDeskTop scala]$ cat information
zhnag san shi yi ge hao ren
jin tian shi yi ge hao tian qi 
wo zai zhe li zuo le yi ge ce shi 
yi ge guan yu scala de ce shi 
welcome to mmzs
歡迎 歡迎
[hadoop@CloudDeskTop scala]$ cat information02 
zhnag san shi yi ge hao ren
jin tian shi yi ge hao tian qi 
wo zai zhe li zuo le yi ge ce shi 
yi ge guan yu scala de ce shi 
welcome to mmzs
歡迎 歡迎

4、創建一個帶main方法的入口類

File—>New—>Scala Object—>在彈出對話框的Name欄輸入:com.mmzs.bigdata.spark.rdd.local.SparkRDD,其中SparkRDD是類名、com.mmzs.bigdata.spark.rdd.local是包名

object WordCount {
  //建立一個main方法
  def main(args: Array[String]): Unit = {
    ......
  }
}

完整的代碼如下:

package com.mmzs.bigdata.spark.rdd.local

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions


object SparkRDD {
  def main(args: Array[String]): Unit = {
    //讀取spark配置文件
    val conf:SparkConf = new SparkConf();
    //本地測試模式
    conf.setMaster("local");//Local Scala Spark RDD
    //設定應用名字
    conf.setAppName("sparkTestLocal");
    
    //根據配置獲取spark上下文對象
    val sc:SparkContext = new SparkContext(conf);
    
    //使用sparkContext讀取文件到內存並生成RDD對象,
    //指定一個輸入目錄即可,目錄中的所有文件都將作為輸入文件
    val lineRdd:RDD[String] = sc.textFile("/home/hadoop/test/scala");
    //使用空格切分每一行的數據為單詞數組,並將單詞數組中的單詞子串釋放到外層的RDD集合中
    val flatRdd:RDD[String] = lineRdd.flatMap { line => line.split(" "); }
    //將RDD中的每一個單詞字串轉化為元組,以完成單詞計數
    val mapRDD:RDD[Tuple2[String, Integer]] = flatRdd.map(word=>(word,1));
    //按RDD集合中每一個元組的第一個元素(即單詞字串)進行分組並完成單詞計數
    val reduceRDD:RDD[(String, Integer)] = mapRDD.reduceByKey((pre:Integer, next:Integer)=>pre+next);
    //交換元素中的key和value的位置便於后續排序
    val reduceRDD02:RDD[(Integer, String)] = reduceRDD.map(tuple=>(tuple._2,tuple._1));
    //根據key進行排序,第二個參數表示啟動的Task數量,設置大了可能會拋出內存溢出的異常
    val sortRDD:RDD[(Integer, String)]  = reduceRDD02.sortByKey(false, 1);
    //排好序之后將順序換回來
    val sortRDD02:RDD[(Integer, String)] = reduceRDD.map(tuple=>(tuple._2,tuple._1));
    
    //指定一個輸出目錄,如果輸出目錄已經事先存在則應該將它刪除掉
    //val f:File=new File("/home/hadoop/test/scala/output");
    //if(f.exists()&&f.isDirectory()) f.delete();
    //將結果保存到指定路徑下
    reduceRDD.saveAsTextFile("/home/hadoop/test/scala/output/");
    
    //停止使用spark上下文對象
    sc.stop();
  }
}

5、運行結果如下:

[hadoop@CloudDeskTop scala]$ ll
總用量 8
-rw-rw-r-- 1 hadoop hadoop 156 2月   7 15:27 information
-rw-rw-r-- 1 hadoop hadoop 156 2月   7 15:29 information02
[hadoop@CloudDeskTop scala]$ ll
總用量 12
-rw-rw-r-- 1 hadoop hadoop  156 2月   7 15:27 information
-rw-rw-r-- 1 hadoop hadoop  156 2月   7 15:29 information02
drwxrwxr-x 2 hadoop hadoop 4096 2月   7 15:54 output
[hadoop@CloudDeskTop scala]$ cd output/
[hadoop@CloudDeskTop output]$ ls
part-00000  part-00001  _SUCCESS
[hadoop@CloudDeskTop output]$ cat part-00000
(scala,2)
(zuo,2)
(tian,4)
(shi,8)
(ce,4)
(zai,2)
(歡迎,4)
(wo,2)
(zhnag,2)
(san,2)
(welcome,2)
(yi,8)
(ge,8)
(hao,4)
(qi,2)
(yu,2)
[hadoop@CloudDeskTop output]$ cat part-00001
(guan,2)
(jin,2)
(ren,2)
(de,2)
(le,2)
(to,2)
(zhe,2)
(li,2)
(mmzs,2)

在Scala4.4的Eclipse的控制台顯示信息如下:

  1 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
  2 18/02/07 15:54:12 INFO SparkContext: Running Spark version 2.1.1
  3 18/02/07 15:54:12 WARN SparkContext: Support for Java 7 is deprecated as of Spark 2.0.0
  4 18/02/07 15:54:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  5 18/02/07 15:54:14 INFO SecurityManager: Changing view acls to: hadoop
  6 18/02/07 15:54:14 INFO SecurityManager: Changing modify acls to: hadoop
  7 18/02/07 15:54:14 INFO SecurityManager: Changing view acls groups to: 
  8 18/02/07 15:54:14 INFO SecurityManager: Changing modify acls groups to: 
  9 18/02/07 15:54:14 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
 10 18/02/07 15:54:14 INFO Utils: Successfully started service 'sparkDriver' on port 37639.
 11 18/02/07 15:54:14 INFO SparkEnv: Registering MapOutputTracker
 12 18/02/07 15:54:14 INFO SparkEnv: Registering BlockManagerMaster
 13 18/02/07 15:54:14 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
 14 18/02/07 15:54:14 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
 15 18/02/07 15:54:14 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-b7a711cf-321e-43af-95ca-c504982e56e4
 16 18/02/07 15:54:14 INFO MemoryStore: MemoryStore started with capacity 348.0 MB
 17 18/02/07 15:54:15 INFO SparkEnv: Registering OutputCommitCoordinator
 18 18/02/07 15:54:15 INFO Utils: Successfully started service 'SparkUI' on port 4040.
 19 18/02/07 15:54:15 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.154.134:4040
 20 18/02/07 15:54:15 INFO Executor: Starting executor ID driver on host localhost
 21 18/02/07 15:54:15 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59713.
 22 18/02/07 15:54:15 INFO NettyBlockTransferService: Server created on 192.168.154.134:59713
 23 18/02/07 15:54:15 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
 24 18/02/07 15:54:15 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.154.134, 59713, None)
 25 18/02/07 15:54:15 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.154.134:59713 with 348.0 MB RAM, BlockManagerId(driver, 192.168.154.134, 59713, None)
 26 18/02/07 15:54:15 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.154.134, 59713, None)
 27 18/02/07 15:54:15 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.154.134, 59713, None)
 28 18/02/07 15:54:17 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 193.9 KB, free 347.8 MB)
 29 18/02/07 15:54:17 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.9 KB, free 347.8 MB)
 30 18/02/07 15:54:17 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.154.134:59713 (size: 22.9 KB, free: 348.0 MB)
 31 18/02/07 15:54:17 INFO SparkContext: Created broadcast 0 from textFile at SparkRDD.scala:24
 32 18/02/07 15:54:18 INFO FileInputFormat: Total input paths to process : 2
 33 18/02/07 15:54:18 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
 34 18/02/07 15:54:18 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
 35 18/02/07 15:54:18 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
 36 18/02/07 15:54:18 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
 37 18/02/07 15:54:18 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
 38 18/02/07 15:54:18 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
 39 18/02/07 15:54:18 INFO SparkContext: Starting job: saveAsTextFile at SparkRDD.scala:42
 40 18/02/07 15:54:18 INFO DAGScheduler: Registering RDD 3 (map at SparkRDD.scala:28)
 41 18/02/07 15:54:18 INFO DAGScheduler: Got job 0 (saveAsTextFile at SparkRDD.scala:42) with 2 output partitions
 42 18/02/07 15:54:18 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at SparkRDD.scala:42)
 43 18/02/07 15:54:18 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
 44 18/02/07 15:54:18 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
 45 18/02/07 15:54:18 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at SparkRDD.scala:28), which has no missing parents
 46 18/02/07 15:54:18 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.4 KB, free 347.8 MB)
 47 18/02/07 15:54:18 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.5 KB, free 347.8 MB)
 48 18/02/07 15:54:18 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.154.134:59713 (size: 2.5 KB, free: 348.0 MB)
 49 18/02/07 15:54:18 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
 50 18/02/07 15:54:18 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at SparkRDD.scala:28)
 51 18/02/07 15:54:18 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
 52 18/02/07 15:54:18 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5984 bytes)
 53 18/02/07 15:54:18 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
 54 18/02/07 15:54:18 INFO HadoopRDD: Input split: file:/home/hadoop/test/scala/information:0+156
 55 18/02/07 15:54:19 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1748 bytes result sent to driver
 56 18/02/07 15:54:19 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 5986 bytes)
 57 18/02/07 15:54:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
 58 18/02/07 15:54:19 INFO HadoopRDD: Input split: file:/home/hadoop/test/scala/information02:0+156
 59 18/02/07 15:54:19 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 527 ms on localhost (executor driver) (1/2)
 60 18/02/07 15:54:19 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1661 bytes result sent to driver
 61 18/02/07 15:54:19 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 137 ms on localhost (executor driver) (2/2)
 62 18/02/07 15:54:19 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
 63 18/02/07 15:54:19 INFO DAGScheduler: ShuffleMapStage 0 (map at SparkRDD.scala:28) finished in 0.649 s
 64 18/02/07 15:54:19 INFO DAGScheduler: looking for newly runnable stages
 65 18/02/07 15:54:19 INFO DAGScheduler: running: Set()
 66 18/02/07 15:54:19 INFO DAGScheduler: waiting: Set(ResultStage 1)
 67 18/02/07 15:54:19 INFO DAGScheduler: failed: Set()
 68 18/02/07 15:54:19 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at saveAsTextFile at SparkRDD.scala:42), which has no missing parents
 69 18/02/07 15:54:19 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 72.4 KB, free 347.7 MB)
 70 18/02/07 15:54:19 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 26.1 KB, free 347.7 MB)
 71 18/02/07 15:54:19 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.154.134:59713 (size: 26.1 KB, free: 347.9 MB)
 72 18/02/07 15:54:19 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:996
 73 18/02/07 15:54:19 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at saveAsTextFile at SparkRDD.scala:42)
 74 18/02/07 15:54:19 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
 75 18/02/07 15:54:19 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, ANY, 5757 bytes)
 76 18/02/07 15:54:19 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
 77 18/02/07 15:54:19 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
 78 18/02/07 15:54:19 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 11 ms
 79 18/02/07 15:54:19 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
 80 18/02/07 15:54:19 INFO FileOutputCommitter: Saved output of task 'attempt_20180207155418_0001_m_000000_2' to file:/home/hadoop/test/scala/output/_temporary/0/task_20180207155418_0001_m_000000
 81 18/02/07 15:54:19 INFO SparkHadoopMapRedUtil: attempt_20180207155418_0001_m_000000_2: Committed
 82 18/02/07 15:54:19 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1977 bytes result sent to driver
 83 18/02/07 15:54:19 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, ANY, 5757 bytes)
 84 18/02/07 15:54:19 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
 85 18/02/07 15:54:19 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 298 ms on localhost (executor driver) (1/2)
 86 18/02/07 15:54:19 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
 87 18/02/07 15:54:19 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
 88 18/02/07 15:54:19 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
 89 18/02/07 15:54:19 INFO FileOutputCommitter: Saved output of task 'attempt_20180207155418_0001_m_000001_3' to file:/home/hadoop/test/scala/output/_temporary/0/task_20180207155418_0001_m_000001
 90 18/02/07 15:54:19 INFO SparkHadoopMapRedUtil: attempt_20180207155418_0001_m_000001_3: Committed
 91 18/02/07 15:54:19 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1890 bytes result sent to driver
 92 18/02/07 15:54:19 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkRDD.scala:42) finished in 0.437 s
 93 18/02/07 15:54:19 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 143 ms on localhost (executor driver) (2/2)
 94 18/02/07 15:54:19 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
 95 18/02/07 15:54:19 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkRDD.scala:42, took 1.566889 s
 96 18/02/07 15:54:19 INFO SparkUI: Stopped Spark web UI at http://192.168.154.134:4040
 97 18/02/07 15:54:19 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
 98 18/02/07 15:54:19 INFO MemoryStore: MemoryStore cleared
 99 18/02/07 15:54:19 INFO BlockManager: BlockManager stopped
100 18/02/07 15:54:19 INFO BlockManagerMaster: BlockManagerMaster stopped
101 18/02/07 15:54:19 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
102 18/02/07 15:54:19 INFO SparkContext: Successfully stopped SparkContext
103 18/02/07 15:54:19 INFO ShutdownHookManager: Shutdown hook called
104 18/02/07 15:54:19 INFO ShutdownHookManager: Deleting directory /tmp/spark-1ce79d12-6eb7-403c-b9e8-3d836d821c90
Console界面顯示的信息:

6、說明

  如果master主機設置為local則表示本地運行,本地運行時Spark會啟動一個集群模擬器來運行Job作業,本地運行只能在Eclipse、或直接使用java命令等運行它,不能使用spark-submit來提交運行,因為該命令會裝載Spark環境配置並連接到Spark集群,對於本地模式下是不需要連接集群的,本地模式僅僅適合於在本地開發環境測試代碼。

 

使用scala命令運行本地模式:

Scala使用命令行方式運行jar包還存在一些bug(不能使用java命令來運行scala開發的jar包),主要體現在無法協調處理類路徑classpath的導入問題


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM