摘要:Spark是繼Hadoop之后的新一代大數據分布式處理框架,由UC Berkeley的Matei Zaharia主導開發。我只能說是神一樣的人物造就的神器,詳情請猛擊http://www.spark-project.org/
Created 2012-05-09
Modified 2012-08-13
1 Scala安裝
當前,Spark最新版本是0.5,由於我寫這篇文檔時,版本還是0.4,因此本文下面的所有描述基於0.4版本。
不過淘寶的達人已經嘗試了0.5,並寫了相關安裝文檔在此http://rdc.taobao.com/team/jm/archives/tag/spark。
~~~~~~~~~~~~~~~以下開始我的安裝文檔~~~~~~~~~~~~~~
我使用的Spark的版本是0.4,只存在於github上,該版本使用的Scala版本是0.9.1.final。所以先到http://www.scala-lang.org/node/165下載scala-2.9.1.final.tar.gz。解壓后放到本地 /opt 下面,在 /etc/profile 里添加
export SCALA_HOME=/opt/scala-2.9.1.final
export PATH=$SCALA_HOME/bin:$PATH
2 git安裝
由於下載Spark和編譯Spark需要git,因此先安裝git,安裝方法可以到Ubuntu軟件中心直接裝,也可以apt-get裝。裝好后需要到https://github.com 去注冊一個帳號,我注冊的是JerryLead,注冊郵箱和密碼,然后根據網站上的get-start提示生成RSA密碼。
注意:如果本地之前存在rsa_id.pub,authorized_keys等,將其保存或着將原來的密碼生成為dsa形式,這樣git和原來的密碼都不沖突。
3 Spark安裝
首先下載最新的源代碼
git clone git://github.com/mesos/spark.git |
得到目錄spark后,進入spark目錄,進入conf子目錄,將 spark-env.sh-template 重命名為spark-env.sh,並添加以下代碼行:
export SCALA_HOME=/opt/scala-2.9.1.final |
回到spark目錄,開始編譯,運行
$ sbt/sbt update compile |
這條命令會聯網下載很多jar,然后會對spark進行編譯,編譯完成會提示success
[success] Total time: 1228 s, completed May 9, 2012 3:42:11 PM |
可以通過運行spark-shell來和spark進行交互。
也可以先運行測試用例./run <class> <params>
./run spark.examples.SparkLR local[2] |
./run spark.examples.SparkPi local |
在本地啟動運行Pi估計器。
更多的例子在examples/src/main/scala里面
3 Spark導出
在使用Spark之前,先將編譯好的classes導出為jar比較好,可以
$ sbt/sbt assembly |
將Spark及其依賴包導出為jar,放在
core/target/spark-core-assembly-0.4-SNAPSHOT.jar |
可以將該jar添加到CLASSPATH里,開發Spark應用了。
一般在開發Spark應用時需要導入Spark一些類和一些隱式的轉換,需要再程序開頭加入
import spark.SparkContext import SparkContext._ |
4 使用Spark交互模式
1. 運行./spark-shell.sh 2. scala> val data = Array(1, 2, 3, 4, 5) //產生data data: Array[Int] = Array(1, 2, 3, 4, 5) 3. scala> val distData = sc.parallelize(data) //將data處理成RDD distData: spark.RDD[Int] = spark.ParallelCollection@7a0ec850 (顯示出的類型為RDD) 4. scala> distData.reduce(_+_) //在RDD上進行運算,對data里面元素進行加和 12/05/10 09:36:20 INFO spark.SparkContext: Starting job... 5. 最后運行得到 12/05/10 09:36:20 INFO spark.SparkContext: Job finished in 0.076729174 s res2: Int = 15 |
5 使用Spark處理Hadoop Datasets
Spark可以從HDFS/local FS/Amazon S3/Hypertable/HBase等創建分布式數據集。Spark支持text files,SequenceFiles和其他Hadoop InputFormat。
比如從HDFS上讀取文本創建RDD
scala> val distFile = sc.textFile("hdfs://m120:9000/user/LijieXu/Demo/file01.txt") 12/05/10 09:49:01 INFO mapred.FileInputFormat: Total input paths to process : 1 distFile: spark.RDD[String] = spark.MappedRDD@59bf8a16 |
然后可以統計該文本的字符數,map負責處理文本每一行map(_size)得到每一行的字符數,多行組成一個List,reduce負責將List中的所有元素相加。
scala> distFile.map(_.size).reduce(_+_) 12/05/10 09:50:02 INFO spark.SparkContext: Job finished in 0.139610772 s res3: Int = 79 |
textFile可以通過設置第二個參數來指定slice個數(slice與Hadoop里的split/block概念對應,一個task處理一個slice)。Spark默認將Hadoop上一個block對應為一個slice,但可以調大slice的個數,但不能比block的個數小,這就需要知道HDFS上一個文件的block數目,可以通過50070的dfs的jsp來查看。
對於SequenceFile,可以使用SparkContext的sequenceFile[K,V]方法生成RDD,其中K和V肯定要是SequenceFile存放時的類型了,也就是必須是Writable的子類。Spark也允許使用native types去讀取,如sequenceFile[Int, String]。
對於復雜的SequenceFile,可以使用SparkContext.hadoopRDD方法去讀取,該方法傳入JobConf參數,包含InputFormat,key class,value class等,與Hadoop Java客戶端讀取方式一樣。
6 分布式數據集操作
分布式數據集支持兩種類型的操作:transformation和action。transformation的意思是從老數據集中生成新的數據集,action是在數據集上進行計算並將結果返回給driver program。每一個Spark應用包含一個driver program用來執行用戶的main函數,比如,map就是一個transformation,將大數據集划分處理為小數據集,reduce是action,將數據集上內容進行聚合並返回給driver program。有個例外是reduceByKey應該屬於transformation,返回的是分布式數據集。
需要注意的是,Spark的transformation是lazy的,transformation先將操作記錄下來,直到接下來的action需要將處理結果返回給driver program的時候。
另一個特性是caching,如果用戶指定cache一個數據集RDD,那么該數據集中的不同slice會按照partition被存放到相應不同節點的內存中,這樣重用該數據集的時候,效率會高很多,尤其適用於迭代型和交互式的應用。如果cache的RDD丟失,那么重新使用transformation生成。
7 共享變量
與Hadoop的MapReduce不同的是,Spark允許共享變量,但只允許兩種受限的變量:broadcast和accumulators。
Broadcast顧名思義是“廣播”,在每個節點上保持一份read-only的變量。比如,Hadoop的map task需要一部只讀詞典來處理文本時,由於不存在共享變量,每個task都需要加載一部詞典。當然也可以使用DistributedCache來解決。在Spark中,通過broadcast,每個節點存放一部詞典就夠了,這樣從task粒度上升到node粒度,節約的資源可想而知。Spark的broadcast路由算法也考慮到了通信開銷。
通過使用SparkContext.broadcast(v)來實現對變量v的包裝和共享。
scala> val broadcastVar = sc.broadcast(Array(1,2,3)) 12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Asked to add key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0) 12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Estimated size for key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0) is 12 12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Size estimation for key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0) took 0 ms 12/05/10 10:54:21 INFO spark.BoundedMemoryCache: ensureFreeSpace((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d), 12) called with curBytes=12, maxBytes=339585269 12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Adding key ((1,a5c2a151-185d-4ea4-aad1-9ec642eebc5d),0) 12/05/10 10:54:21 INFO spark.BoundedMemoryCache: Number of entries is now 2 broadcastVar: spark.broadcast.Broadcast[Array[Int]] = spark.Broadcast(a5c2a151-185d-4ea4-aad1-9ec642eebc5d) |
創建broadcast變量后,可以通過.value來訪問只讀原始變量v。
scala> broadcastVar.value res4: Array[Int] = Array(1, 2, 3) |
另一種共享變量是Accumulators,顧名思義就是可以被“added”的變量,比如MapReduce中的counters就是不斷累加的變量。Spark原生支持Int和Double類型的累加變量。
通過SparkContext.accumulator(v)來創建accumulator類型的變量,然后運行的task可以使用“+=”操作符來進行累加。但是task不能讀取到該變量,只有driver program能夠讀取(通過.value),這也是為了避免使用太多讀寫鎖吧。
創建0的accumulator版本。
scala> val accum = sc.accumulator(0) accum: spark.Accumulator[Int] = 0 |
對生成的RDD進行累加,這次不要reduce了。
scala> sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x) 12/05/10 11:05:48 INFO spark.SparkContext: Starting job... scala> accum.value res7: Int = 20 |
8 安裝Mesos
Spark-0.4推薦的Mesos版本是1205738,不是最新版的Mesos,我想最新版應該也可以,這里暫且使用1205738。
首先下載Mesos
svn checkout –r 1205738 https://svn.apache.org/repos/asf/incubator/mesos/trunk mesos |
得到mesos目錄后,先安裝編譯所需的軟件
apt-get install python2.6 python2.6-dev 很遺憾,雖然Ubuntu 11.04上有python 2.7,但webui(mesos的web界面)需要python 2.6,因此要裝 apt-get install libcppunit-dev (安裝cppunit) 確保g++版本大於4.1 如果缺automake,那么安裝 apt-get install autoconf automake libtool |
由於系統是Ubuntu 11.04 (GNU/Linux 2.6.38-8-generic x86_64)-natty,可以直接使用./configure.template.ubuntu-natty-64。但我使用的JDK是Sun的,因此修改./configure.template.ubuntu-natty-64里面--with-java-home為/opt/jdk1.6.0_27。
總體如下:
cp configure.template.ubuntu-natty-64 configure.template.ubuntu-my-natty-64 修改configure.template.ubuntu-my-natty-64得到如下內容 1 #!/bin/sh 2 export PYTHON=python2.7 3 4 $(dirname $0)/configure \ 5 --with-python-headers=/usr/include/python2.7 \ 6 --with-java-home=/opt/jdk1.6.0_27 \ 7 --with-webui \ 8 --with-included-zookeeper $@ |
編譯mesos
root@master:/opt/mesos# ./configure.template.ubuntu-my-natty-64 完了之后 root@master:/opt/mesos# make |
9 配置Mesos和Spark
先在slave1、slave2、slave3和master上安裝mesos,我這里安裝在/opt/mesos。
進入conf目錄,修改deploy-env.sh,添加MESOS_HOME
# This works with a newer version of hostname on Ubuntu. #FULL_IP="hostname --all-ip-addresses" #export LIBPROCESS_IP=`echo $FULL_IP | sed 's/\([^ ]*\) .*/\1/'` export MESOS_HOME=/opt/mesos |
修改mesos.conf,添加
# mesos-slave with --help. failover_timeout=1 |
進入/opt/spark,修改conf/spark-env.sh,添加
# variables to set are: # - MESOS_HOME, to point to your Mesos installation # - SCALA_HOME, to point to your Scala installation # - SPARK_CLASSPATH, to add elements to Spark's classpath # - SPARK_JAVA_OPTS, to add JVM options # - SPARK_MEM, to change the amount of memory used per node (this should # be in the same format as the JVM's -Xmx option, e.g. 300m or 1g). # - SPARK_LIBRARY_PATH, to add extra search paths for native libraries. export SCALA_HOME=/opt/scala-2.9.1.final export MESOS_HOME=/opt/mesos export PATH=$PATH:/opt/jdk1.6.0_27/bin export SPARK_MEM=10g (根據自己機器的內存大小設置,指示Spark可以使用的最大內存量) |