mac下Spark的安裝與使用


每次接觸一個新的知識之前我都抱有恐懼之心,因為總認為自己沒有接觸到的知識都很高大上,比如上篇介紹到的Hadoop的安裝與使用與本篇要介紹的Spark,其實在自己真正琢磨以后才發現本以為高大上的知識其實也不過如此。

由於Spark是最新火起來的處理大數據的框架,國內教程資源少之甚少,所以本篇文章是本人在看了Spark官網的快速入門教程后總結下來的經驗,由於Spark同Hadoop一樣可以運行在多種模式下,而本人又比較窮只有一台電腦,所以本篇文章為大家介紹如何在mac系統的本地模式下安裝Spark以及安裝后如何用Spark來進行交互式分析。

本文結構:前部分介紹Spark的一點點(詳情介紹請自行google)基礎概念以及安裝過程,后部分通過一個demo讓大家快速學會使用Spark基本Api。

1.Spark的運行模式

在正式安裝Spark之前,先給大家介紹下Spark可以在哪幾種模式下運行。同上篇Hadoop的安裝與使用中介紹的Hadoop可以運行在其3種模式中的任意一種模式之上,Spark也可以運行在多種模式之上,主要有以下4種運行模式:

  • 1.local: 本地單進程模式,用於本地開發測試Spark代碼。
  • 2.standalone:分布式集群模式,Master-Worker架構,Master負責調度,Worker負責具體Task的執行。
  • 3.on yarn/mesos:運行在yarn/mesos等資源管理框架之上,yarn/mesos提供資源管理,spark提供計算調度,並可與其他計算框架(如MapReduce/MPI/Storm)共同運行在同一個集群之上。
  • 4.on cloud(EC2): 運行在AWS的EC2之上

由於博主比較窮,所以下面為大家介紹本地模式下Spark的安裝與使用。

2.Spark的安裝

2.1准備工作

第一步:安裝Java JDK 1.7及以上版本,並配置好環境變量。本電腦安裝的jdk是1.7.0_79版本的。

第二步:安裝Hadoop。本電腦安裝的Hadoop是2.7.3版本的。

疑惑:上篇文章說到可以不學Hadoop直接學習Spark,那為什么還要安裝Hadoop?親,我的意思是不用學習Hadoop的相關知識例如它的API啥的,但是沒說不用先搭建Hadoop的環境呀!
合理解釋:Spark會用到HDFS與YARN,因此請先安裝Hadoop,關於Hadoop的安裝請參考我的上篇博文mac下Hadoop的安裝與使用,在此就不再復述。

第三步:安裝Scala 2.9.3以上版本。這里介紹下Scala在mac下的安裝與環境變量的配置。點擊鏈接進入scala官方網站的下載頁,下載2.11.8版本(第一次操作的時候我下載了最新版2.12.1,后來測試spark-shell命令時發現最新版本的scala與1.7版本的jdk不兼容,所以后來換成了2.11.8版本)的Scala:

點擊圖上下載鏈接會自動將scala下載到Dowmloads目錄下,文件名為:scala-2.12.1.tgz,還有一種下載方法就是直接在命令行使用homebrew命令(作為一個Linux開發人員我建議使用這種方式)進行下載:brew install scala,該命令會自動幫你把scala下載到/usr/local目錄下。使用在官網點擊鏈接下載的方式的話,我們也要將該scala文件加到/usr/local目錄下,你可以直接拷貝過去,當然作為一個Linux開發人員你可以直接使用一條命令完成將該壓縮包進行解壓移動/usr/local/目錄下:sudo tar -zxf ~/downloads/scala-2.12.1.tgz -C /usr/local/,然后使用命令cd /usr/local進入到該目錄下,由於解壓后的文件名為:scala-2.12.1,所以為了之后配置的方便我們使用命令:sudo mv ./scala-2.12.1 ./scala將文件名修改為scala。因為該目錄屬於管理員級別的目錄所以如果當前用戶不是管理員的話應該在命令前面使用sudo關鍵字表示使用管理員權限。

這樣scala的安裝便完成,但是還要配置scala的環境變量,使用命令:sudo vim ./etc/profile打開系統中配置環境變量的文件,在里面添加如下內容:

1
2
export SCALA_HOME=/usr/local/scala
export PATH=$PATH:$SCALA_HOME/bin

 

然后:wq!保存並退出該文件,輸入命令使該文件的內容立刻生效:source /etc/profile,接下來在(根目錄下)命令行輸入:scala並敲擊回車,看到控制台打印如下信息說明我們的scala安裝並成功配置了環境變量:

圖中信息即表明我們使用的scala版本為2.11.8,jdk版本為1.7.0_79。

使用命令行快捷鍵control+c或者:quit退出scala shell環境(網上教程有說使用exit命令可以退出scala shell環境,我試了但貌似不行)。

疑惑:既然Spark提供了Scala、Python、Java三種程序設計語言的API,那么我直接用java不就好了,為什么還要下載Scala呢?是因為等會我們會使用Spark shell連接到Spark引擎進行交互式數據分析,而Spark shell只支持Scala和Python兩種語言。Java不支持交互式的Shell,因此這一功能暫未在Java語言中實現(當然你也可以不使用shell編程,直接在IDE中用java編程語言連接到Spark引擎進行交互式數據分析也是可以的)。所以建議大家還是老老實實在電腦上面下載好scala並配置好環境變量,反正也占不了多大空間啊,而且萬一哪天用到這東西了呢?所以下面我都是采用的scala支持的shell來配置的Spark,之后我也會使用scala運行spark-shell進行交互式數據分析的一個小示例帶大家快速入門。

准備好如上環境后,接下來就可以進行Spark的安裝與相關配置操作了。

2.2安裝Spark並配置

接下來才是正題,進入Apache Spark官方網站進行Spark的下載,看到如下頁面:

第2條你要是選擇的是Hadoop2.7的話,你要保證你之前安裝的Hadoop版本也是2.7版本。選擇第4條的下載鏈接即可(當然你也可以直接用Homebrew命令進行下載),系統會將下好的文件放在Dowmloads文件目錄下,文件名為:spark-2.0.2-bin-hadoop2.7.tgz,同scala的安裝方法一樣,我們使用命令:sudo tar -zxf ~/Dowmloads/spark-2.0.2-bin-hadoop2.7.tgz -C /usr/local/直接將該壓縮包解壓並移動到/usr/local/目錄下,然后我們cd /usr/local進入到/usr/local目錄下,使用命令更改該目錄下的spark文件名:sudo mv ./spark-2.0.2-bin-hadoop2.7 ./spark將文件名改為spark

經過上述步驟從官網下載到Spark的文件,這樣我們便完成了Spark的安裝,但是Spark也是要進行相應的環境變量配置的,所以接下來我們進行Spark環境變量的配置。

使用命令:sudo vim /etc/profile,在文件中加入Spark的環境變量:

1
2
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin

 

然后我們進入到Spark目錄的conf配置文件中:cd /usr/local/spark/conf,執行命令:cp spark-env.sh.template spark-env.sh將spark-env.sh.template拷貝一份,然后打開拷貝后的spark-env.sh文件:vim spark-env.sh,在里面加入如下內容:

1
2
3
4
5
export SCALA_HOME=/usr/local/scala

export SPARK_MASTER_IP=localhost

export SPARK_WORKER_MEMORY=4g

 

這樣我們便完成了Spark環境變量的配置,接下來測試測試一下Spark,在根目錄(因為我們配置了spark環境變量,所以可以直接在根目錄)下輸入命令:spark-shell,看到控制台輸出如下信息:

恭喜你,盡情享受Spark吧。

3.安裝過程出現的問題分析

1.運行spark-shell命令時控制台出現:

1
..我忘了是啥報錯了.connection out.中間報錯信息是這個..

 

的錯誤,說明沒有配置SSH,配置SSH請參考我上篇文章中Hadoop安裝的配置過程。

2.運行命令:scala時控制台出現:

的錯誤信息,表示scala沒有成功安裝,或者安裝的scala與jdk不兼容,所以這里我建議你們就按本教程的2.11.8scala版本與1.7jdk版本來操作吧。這些坑我都試過了,所以才能為你們總結經驗。(大哭臉)

3.其他錯誤,有以下原因,你們一定要一一進行檢查:

  • 1.關於JDK:JDK版本不對,所以我建議大家用1.7;或者是JDK版本正確但是沒有成功配置它的環境變量,我配置時更改了兩個文件的環境變量:一個是/etc/profile目錄下的,一個是.bash_profile文件中的,配置環境變量信息如下:

    1
    2
    export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home
    export PATH=$JAVA_HOME/bin:$PATH
  • 2.關於scala:Scala版本不對,所以我建議大家用2.11.8;或者是沒有成功配置scala的環境變量,配置環境變量按照文中介紹的即可。

  • 3.Hadoop版本與Spark版本不兼容:所以大家在Spark官網下載Spark的時候一定要注意下載Spark時選擇的第二條信息的Hadoop版本要與電腦上面已經安裝的Hadoop一致才行。

4.快速入門Spark基礎Api

這里我介紹兩種使用Spark基礎Api的方式,一種是在spark-shell中進行簡單的測試,一種是在開發工具IDEA中進行代碼的編寫來教大家快速學習Spark基礎API。

4.1使用spark-shell完成單詞統計功能

由於spark-shell只支持scala和python兩種語言的編寫,不支持Java,所以我在spark-shell中通過scala的語法來進行簡單測試。

在配置好Spark環境變量之后,我們打開命令行,直接在當前用戶目錄下輸入命令spark-shell進入scala編寫環境(當然前提是你首先使用命令start-all.sh命令開啟了Spark):

我們從 /usr/local/spark/README.md 文件新建一個 RDD,代碼如下(本文出現的 Spark 交互式命令代碼中,第一行為代碼及其解釋,第二行及以后是控制台返回的結果):

1
2
scala> val textFile = sc.textFile("file:///usr/local/spark/README.md")
>textFile: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/README.md MapPartitionsRDD[1] at textFile at <console>:24

 

代碼中通過 file:// 前綴或者不加 file:// 前綴表示指定讀取本地文件。如果你這里傳入的路徑寫的是HDFS上的文件路徑,例如hdfs://遠程主機名:Hadoop端口號我/文件名代表你要是讀取的是 HDFS 中的文件,你需要先上傳文件到 HDFS 中(至於如何上傳,后面的demo中我們會進行講解),否則會有org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/README.md的錯誤。這里我們就以讀取本地文件進行講解。

RDDs 支持兩種類型的操作:1.actions: 在數據集上運行計算后返回值。2.transformations: 轉換, 從現有數據集上創建一個新的數據集。

使用上述命令創建好的RDD對象,下面我們就來通過該對象演示 count() 和 first() 操作:

1
2
3
4
5
textFile.count()  // RDD 中的 item 數量,對於文本文件,就是總行數
>res0: Long = 95

textFile.first() // RDD 中的第一個 item,對於文本文件,就是第一行內容
>res1: String = # Apache Spark

 

接着演示 transformation,通過 filter transformation 來返回一個新的 RDD,代碼如下:

1
2
3
4
5
val linesWithSpark = textFile.filter(line => line.contains("Spark"))   // 篩選出包含 Spark 的行
>linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:26

linesWithSpark.count() // 統計行數
>res4: Long = 17

 

上述我們完成了RDD的簡單計算,而RDD 的 actions 和 transformations 其實可用在更復雜的計算中,例如通過如下代碼可以找到包含單詞最多的那一行內容共有幾個單詞:

1
2
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
>res1: Int = 22

 

代碼首先將每一行內容 map 為一個整數,這將創建一個新的 RDD,並在這個 RDD 中執行 reduce 操作,找到最大的數。map()、reduce() 中的參數是 Scala 的函數字面量(function literals,也稱為閉包 closures),並且可以使用語言特征或 Scala/Java 的庫。例如,通過使用 Math.max() 函數(需要導入 Java 的 Math 庫),可以使上述代碼更容易理解:

1
2
3
4
import java.lang.Math //先導入Math函數

textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
>res6: Int = 14

 

Hadoop MapReduce 是常見的數據流模式,在 Spark 中同樣可以實現(下面這個例子也就是 WordCount):

1
2
3
4
5
val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)   // 實現單詞統計
>wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:29

wordCounts.collect() // 輸出單詞統計結果
>res7: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), (Because,1), (The,1)...)

 

上述我們通過spark-shell完成單詞的統計簡單對我們spark的基礎Api進行了熟悉,采用的是scala語言,由於我是一個java開發人員,所以接下來就在開發工具IDEA中通過編寫Java代碼來實現HDFS中某個路徑下文件內容中單詞的統計功能。

4.2在IDEA中編寫Java代碼完成HDFS中某個文件中的單詞統計功能

既然要統計HDFS中某個文件中的單詞,那么我們首先要將文件上傳到HDFS上吧!如何上傳?聽我慢慢道來。

使用命令:quit退出scala命令環境,首先在本地電腦的當前用戶目錄下創建一個文件,我這里創建了一個叫hello的txt文件,里面寫上內容hello world hello you hello hello ,內容可以隨便打啦,然后輸入hadoop的命令(關於Hadoop的更多命令請自行google):hadoop fs -put ~/hello /,實現將本機目錄下的hello文件推至遠程主機的根目錄下,然后便可以開始編寫我們的java代碼了。

使用IDEA創建一個Maven項目(便於管理我們的jar包嘛!),在pom.xml中添加上Spark相應jar包坐標:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.codingxiaxw.spark</groupId>
<artifactId>spark-mvn</artifactId>
<packaging>war</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-mvn Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>

<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>

</dependencies>


<build>
<finalName>spark-mvn</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

 

然后創建一個WordCount.java文件,代碼如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class Simple
{
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {

// if (args.length < 1) {
// System.err.println("Usage: JavaWordCount <file>");
// System.exit(1);
// }


//創建一個RDD對象
SparkConf conf=new SparkConf().setAppName("Simple").setMaster("local");

//創建spark上下文對象,是數據的入口
JavaSparkContext spark=new JavaSparkContext(conf);

//獲取數據源
JavaRDD<String> lines = spark.textFile("hdfs://localhost:8020/hello");

/**
* 對於從數據源得到的DStream,用戶可以在其基礎上進行各種操作,
* 對於當前時間窗口內從數據源得到的數據首先進行分割,
* 然后利用Map和ReduceByKey方法進行計算,當然最后還有使用print()方法輸出結果;
*/
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) {
return Arrays.asList(SPACE.split(s)).iterator();
}
});


//使用RDD的map和reduce方法進行計算
JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});


JavaPairRDD<String, Integer> counts = ones.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
//輸出計算結果
System.out.println(tuple._1() + ": " + tuple._2());
}


spark.stop();
}
}

 

各行代碼意思見代碼旁的注釋,上述代碼都是從官方文檔抄的,但是貌似要注釋掉官方文檔的:

1
2
3
4
// if (args.length < 1) {
// System.err.println("Usage: JavaWordCount <file>");
// System.exit(1);
// }

 

然后運行程序,控制台輸出結果如下圖:

成功統計出HDFS上文件內容的單詞個數。到此,我們便簡單熟悉了Spark的相關API,下篇文章我將介紹通過Spark的Streaming Api實現對流式數據的處理為大家介紹Spark中Streming庫的相關Api操作。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM