本人的開發環境:
1.虛擬機centos 6.5
2.jdk 1.8
3.spark2.2.0
4.scala 2.11.8
5.maven 3.5.2
在開發和搭環境時必須注意版本兼容的問題,不然會出現很多莫名其妙的問題
1.啟動master進程
./sbin/start-master.sh
2.啟動worker進程
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://hadoop000:7077
【注意,spark://hadoop000:7077,是在啟動master進程后,通過localhost:8080登陸到spark WebUI上查看的。】
第一第二點是運行環境的前提條件,下面是開發環境。
1.idea結合maven開發spark,下面以NetWorldCount為例子
package com.spark import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Spark Streaming處理Socket數據 * 測試: nc */ object NetworkWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWordCount") /** * 創建StreamingContext需要兩個參數:SparkConf和batch interval */ val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream("localhost", 6789) val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) result.print() ssc.start() ssc.awaitTermination() } }
2.生成jar包



3.上傳jar包
4.提交任務前先啟動監聽端口,在終端輸入以下命令
nc -lk 6789
5.提交任務
./spark-submit --master local[2] --class com.spark.NetworkWordCount --name NetworkWordCount /home/hadoop/tmp/spark.jar
運行程序,出現下面的錯誤:
a.local這里出錯。原因簡單來說,local模式下只開啟一條線程,reciver占用一條線程后,沒有資源用來計算處理數據了。
解決辦法:local--->local[2]
b.缺少com.fasterxml.jackson.scala這個方法
解決辦法:
1.查看這個類的版本:view--->maven project--->
--->
.然后在pom.xml增加對應的dependency


<dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.6.5</version> </dependency>
重新reimport,再次運行。出現以下錯誤
去maven reposition查找對應的依賴:
在這里,使用1.3.0版本的。
在pom.xml添加以下的 dependency
<dependency> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> <version>1.3.0</version> </dependency>
重新reimport,再次運行。這次程序正常運行。
輸入數據:
接受數據:
至此,windows下,idea結合maven開發spark+調試過程 完整跑了一遍。
下面分析
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
在本地調試中,輸入源除了 fileStream外,必須local[n], n >= 2 。
在spark中,輸入源除了
fileStream ,其他的都繼承自 ReceiverInputDStream ,因此其他都需要至少兩條線程(針對local模式)以上來供程序使用。
def fileStream[ K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory) }
例如本例子中使用的
socketTextStream
def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) }