Spark安裝與學習


      摘要:Spark是繼Hadoop之后的新一代大數據分布式處理框架,由UC Berkeley的Matei Zaharia主導開發。我只能說是神一樣的人物造就的神器,詳情請猛擊http://www.spark-project.org/

        Created 2012-05-09

        Modified 2012-08-13

1 Scala安裝

       當前,Spark最新版本是0.5,由於我寫這篇文檔時,版本還是0.4,因此本文下面的所有描述基於0.4版本。

不過淘寶的達人已經嘗試了0.5,並寫了相關安裝文檔在此http://rdc.taobao.com/team/jm/archives/tag/spark

~~~~~~~~~~~~~~~以下開始我的安裝文檔~~~~~~~~~~~~~~

        我使用的Spark的版本是0.4,只存在於github上,該版本使用的Scala版本是0.9.1.final。所以先到http://www.scala-lang.org/node/165下載scala-2.9.1.final.tar.gz。解壓后放到本地 /opt 下面,在 /etc/profile 里添加

export SCALA_HOME=/opt/scala-2.9.1.final

export PATH=$SCALA_HOME/bin:$PATH

2 git安裝

由於下載Spark和編譯Spark需要git,因此先安裝git,安裝方法可以到Ubuntu軟件中心直接裝,也可以apt-get裝。裝好后需要到https://github.com 去注冊一個帳號,我注冊的是JerryLead,注冊郵箱和密碼,然后根據網站上的get-start提示生成RSA密碼。

注意:如果本地之前存在rsa_id.pub,authorized_keys等,將其保存或着將原來的密碼生成為dsa形式,這樣git和原來的密碼都不沖突。

3 Spark安裝

首先下載最新的源代碼

git clone git://github.com/mesos/spark.git

得到目錄spark后,進入spark目錄,進入conf子目錄,將 spark-env.sh-template 重命名為spark-env.sh,並添加以下代碼行:

export SCALA_HOME=/opt/scala-2.9.1.final

回到spark目錄,開始編譯,運行

$ sbt/sbt update compile

這條命令會聯網下載很多jar,然后會對spark進行編譯,編譯完成會提示success

[success] Total time: 1228 s, completed May 9, 2012 3:42:11 PM

可以通過運行spark-shell來和spark進行交互。

也可以先運行測試用例./run <class> <params>

./run spark.examples.SparkLR local[2]

在本地啟動兩個線程運行線性回歸。

./run spark.examples.SparkPi local

在本地啟動運行Pi估計器。

更多的例子在examples/src/main/scala里面

3 Spark導出

在使用Spark之前,先將編譯好的classes導出為jar比較好,可以

$ sbt/sbt assembly

將Spark及其依賴包導出為jar,放在

core/target/spark-core-assembly-0.4-SNAPSHOT.jar

可以將該jar添加到CLASSPATH里,開發Spark應用了。

一般在開發Spark應用時需要導入Spark一些類和一些隱式的轉換,需要再程序開頭加入

import spark.SparkContext

import SparkContext._

4 使用Spark交互模式

1. 運行./spark-shell.sh

2. scala> val data = Array(1, 2, 3, 4, 5) //產生data

data: Array[Int] = Array(1, 2, 3, 4, 5)

3. scala> val distData = sc.parallelize(data) //將data處理成RDD

distData: spark.RDD[Int] = spark.ParallelCollection@7a0ec850 (顯示出的類型為RDD)

4. scala> distData.reduce(_+_) //在RDD上進行運算,對data里面元素進行加和

12/05/10 09:36:20 INFO spark.SparkContext: Starting job...

5. 最后運行得到

12/05/10 09:36:20 INFO spark.SparkContext: Job finished in 0.076729174 s

res2: Int = 15

5 使用Spark處理Hadoop Datasets

Spark可以從HDFS/local FS/Amazon S3/Hypertable/HBase等創建分布式數據集。Spark支持text files,SequenceFiles和其他Hadoop InputFormat。

比如從HDFS上讀取文本創建RDD

scala> val distFile = sc.textFile("hdfs://m120:9000/user/LijieXu/Demo/file01.txt")

12/05/10 09:49:01 INFO mapred.FileInputFormat: Total input paths to process : 1

distFile: spark.RDD[String] = spark.MappedRDD@59bf8a16

然后可以統計該文本的字符數,map負責處理文本每一行map(_size)得到每一行的字符數,多行組成一個List,reduce負責將List中的所有元素相加。

scala> distFile.map(_.size).reduce(_+_)

12/05/10 09:50:02 INFO spark.SparkContext: Job finished in 0.139610772 s

res3: Int = 79

textFile可以通過設置第二個參數來指定slice個數(slice與Hadoop里的split/block概念對應,一個task處理一個slice)。Spark默認將Hadoop上一個block對應為一個slice,但可以調大slice的個數,但不能比block的個數小,這就需要知道HDFS上一個文件的block數目,可以通過50070的dfs的jsp來查看。

對於SequenceFile,可以使用SparkContext的sequenceFile[K,V]方法生成RDD,其中K和V肯定要是SequenceFile存放時的類型了,也就是必須是Writable的子類。Spark也允許使用native types去讀取,如sequenceFile[Int, String]。

對於復雜的SequenceFile,可以使用SparkContext.hadoopRDD方法去讀取,該方法傳入JobConf參數,包含InputFormat,key class,value class等,與Hadoop Java客戶端讀取方式一樣。

6 分布式數據集操作

分布式數據集支持兩種類型的操作:transformation和action。transformation的意思是從老數據集中生成新的數據集,action是在數據集上進行計算並將結果返回給driver program。每一個Spark應用包含一個driver program用來執行用戶的main函數,比如,map就是一個transformation,將大數據集划分處理為小數據集,reduce是action,將數據集上內容進行聚合並返回給driver program。有個例外是reduceByKey應該屬於transformation,返回的是分布式數據集。

需要注意的是,Spark的transformation是lazy的,transformation先將操作記錄下來,直到接下來的action需要將處理結果返回給driver program的時候。

另一個特性是caching,如果用戶指定cache一個數據集RDD,那么該數據集中的不同slice會按照partition被存放到相應不同節點的內存中,這樣重用該數據集的時候,效率會高很多,尤其適用於迭代型和交互式的應用。如果cache的RDD丟失,那么重新使用transformation生成。

7 共享變量

與Hadoop的MapReduce不同的是,Spark允許共享變量,但只允許兩種受限的變量:broadcast和accumulators。

Broadcast顧名思義是“廣播”,在每個節點上保持一份read-only的變量。比如,Hadoop的map task需要一部只讀詞典來處理文本時,由於不存在共享變量,每個task都需要加載一部詞典。當然也可以使用DistributedCache來解決。在Spark中,通過broadcast,每個節點存放一部詞典就夠了,這樣從task粒度上升到node粒度,節約的資源可想而知。Spark的broadcast路由算法也考慮到了通信開銷。

通過使用SparkContext.broadcast(v)來實現對變量v的包裝和共享。

scala> val broadcastVar = sc.broadcast(Array(1,2,3))

12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Asked to add key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0)

12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Estimated size for key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0) is 12

12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Size estimation for key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0) took 0 ms

12/05/10 10:54:21 INFO spark.BoundedMemoryCache: ensureFreeSpace((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d), 12) called with curBytes=12, maxBytes=339585269

12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Adding key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0)

12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Number of entries is now 2

broadcastVar: spark.broadcast.Broadcast[Array[Int]] = spark.Broadcast(a5c2a151-185d-4ea4-aad1-9ec642eebc5d)

創建broadcast變量后,可以通過.value來訪問只讀原始變量v。

scala> broadcastVar.value

res4: Array[Int] = Array(1, 2, 3)

另一種共享變量是Accumulators,顧名思義就是可以被“added”的變量,比如MapReduce中的counters就是不斷累加的變量。Spark原生支持Int和Double類型的累加變量。

通過SparkContext.accumulator(v)來創建accumulator類型的變量,然后運行的task可以使用“+=”操作符來進行累加。但是task不能讀取到該變量,只有driver program能夠讀取(通過.value),這也是為了避免使用太多讀寫鎖吧。

創建0的accumulator版本。

scala> val accum = sc.accumulator(0)

accum: spark.Accumulator[Int] = 0

對生成的RDD進行累加,這次不要reduce了。

scala> sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x)

12/05/10 11:05:48 INFO spark.SparkContext: Starting job...

scala> accum.value

res7: Int = 20

8 安裝Mesos

Spark-0.4推薦的Mesos版本是1205738,不是最新版的Mesos,我想最新版應該也可以,這里暫且使用1205738。

首先下載Mesos

svn checkout –r 1205738 https://svn.apache.org/repos/asf/incubator/mesos/trunk mesos

得到mesos目錄后,先安裝編譯所需的軟件

apt-get install python2.6 python2.6-dev

很遺憾,雖然Ubuntu 11.04上有python 2.7,但webui(mesos的web界面)需要python 2.6,因此要裝

apt-get install libcppunit-dev (安裝cppunit)

確保g++版本大於4.1

如果缺automake,那么安裝

apt-get install autoconf automake libtool

由於系統是Ubuntu 11.04 (GNU/Linux 2.6.38-8-generic x86_64)-natty,可以直接使用./configure.template.ubuntu-natty-64。但我使用的JDK是Sun的,因此修改./configure.template.ubuntu-natty-64里面--with-java-home為/opt/jdk1.6.0_27。

總體如下:

cp configure.template.ubuntu-natty-64 configure.template.ubuntu-my-natty-64

修改configure.template.ubuntu-my-natty-64得到如下內容

1 #!/bin/sh

2 export PYTHON=python2.7

3

4 $(dirname $0)/configure \

5 --with-python-headers=/usr/include/python2.7 \

6 --with-java-home=/opt/jdk1.6.0_27 \

7 --with-webui \

8 --with-included-zookeeper $@

編譯mesos

root@master:/opt/mesos# ./configure.template.ubuntu-my-natty-64

完了之后

root@master:/opt/mesos# make

9 配置Mesos和Spark

先在slave1、slave2、slave3和master上安裝mesos,我這里安裝在/opt/mesos。

進入conf目錄,修改deploy-env.sh,添加MESOS_HOME

# This works with a newer version of hostname on Ubuntu.

#FULL_IP="hostname --all-ip-addresses"

#export LIBPROCESS_IP=`echo $FULL_IP | sed 's/\([^ ]*\) .*/\1/'`

export MESOS_HOME=/opt/mesos

修改mesos.conf,添加

# mesos-slave with --help.

failover_timeout=1

進入/opt/spark,修改conf/spark-env.sh,添加

# variables to set are:

# - MESOS_HOME, to point to your Mesos installation

# - SCALA_HOME, to point to your Scala installation

# - SPARK_CLASSPATH, to add elements to Spark's classpath

# - SPARK_JAVA_OPTS, to add JVM options

# - SPARK_MEM, to change the amount of memory used per node (this should

# be in the same format as the JVM's -Xmx option, e.g. 300m or 1g).

# - SPARK_LIBRARY_PATH, to add extra search paths for native libraries.

export SCALA_HOME=/opt/scala-2.9.1.final

export MESOS_HOME=/opt/mesos

export PATH=$PATH:/opt/jdk1.6.0_27/bin

export SPARK_MEM=10g (根據自己機器的內存大小設置,指示Spark可以使用的最大內存量)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM