綜述:
在高層中,每個spark應用由一個運行用戶主函數的driver program和執行各種集群上的parallel operations所組成。spark最主要的概念:RDD彈性分布式數據集,它是一個跨越“可並行操作集群”所有節點的基本分區的集合。RDDs可被多種方式創建:hadoop文件系統(或者其他hadoop支持的文件系統),或者現有的在主程序上的scala集合。用戶也要求spark存一個RDD在內存,允許它被高效的反復使用。最后,RDDs可以自動恢復。
在spark中第二個概念是shared variables(共享變量)可以被用在並行操作。默認情況下,spark在不同節點上並行運行函數的時候,它將函數中的每一個變量復制到每個任務中。有時候,一個變量需要跨越任務去共享,或者在任務和程序之間共享。spark支持兩種方式的共享:廣播變量broadcast variables(可以用來緩存在所有節點內存上的值),累加器accumulators(只有加操作,用於計數和求和)
這個指導顯示spark所支持的語言的特點,如果你有spark交互式界面,可以簡單地跟隨學習。
連接Spark=Linking with Spark:
spark2.2.0支持lambda表達式來寫函數,否則你可以使用在 org.apache.spark.api.java.function 中的類。
注意在spark2.2.0中刪除對java7的支持。
用java去寫一個spark應用,你需要添加對spark的依賴,spark可以通過maven central獲得:
groupId = org.apache.spark artifactId = spark-core_2.11 version = 2.2.0
此外,如果你希望訪問HDFS集群,你需要為你的HDFS版本添加hadoop-client依賴項:
groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version>
最后,你需要導入一些spark類到你的程序中,增加以下幾行:
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf;
初始化spark=Initializing Spark
spark程序必須要做的第一件事情是創建一個JavaSparkContext對象,告訴spark如何去訪問集群。創建一個sparkContext你首先需要去構造一個SparkConf對象(包含了你的應用的信息)。
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf);
appName參數是你的應用展示在集群UI上的名字,master是一個spark、mesos、yarn集群的URL,或者是一個特殊的本地字符串(運行在本地模式)。在實踐中,當運行在集群的時候,你不希望在程序中硬編碼master,而是通過spark-submit啟動程序並接收它。然而,對於本地測試和單元測試,你可以通過“local”來運行spark進程。
彈性分布式數據集RDDs
spark圍繞着一個稱作RDD(resilient distributed dataset)的概念,它是一個可並行操作的容錯集合。創建RDD有兩種方式:1.在你的driver program中並行化一個已有的集合,2.在外部存儲系統中引用數據集,例如共享文件系統、HDFS、任何提供Hadoop InputFormat的數據源。
並行集合
並行集合 (Parallelized collections) 的創建是通過在一個已有的集合(Scala Seq)上調用 SparkContext 的 parallelize 方法實現的。集合中的元素被復制到一個可並行操作的分布式數據集中。在這里演示了如何在一個包含數字1-5的集合中創建並行集合:
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
一旦被創造,這個分布式數據集(distData)就可以被並行操作。例如,我們可以調用distData.reduce((a,b)->a+b)去進行列表元素的求和。以后我們再講分布式上面的操作。
並行集合的一個重要參數是分區數(partitions),表示一個數據集被切分的分數。spark會在集群上的每一個分區運行一個任務。一般給集群上的CPU設置2-4個分區partition。通常spark會根據你的集群自動設置分區數。當然你也可以利用parallelize的第二個參數手動設置(例如:sc.parallelize(data,10)).注意:在一些代碼中會使用術語“slices”切片“”(分區的同義詞)去保持向下兼容性。
外部數據集
spark可以從任何hadoop支持的外部存儲源來創建數據集,包括你的本地文件系統,HDFS,cassandra
,HBase、Amazon S3……。spark支持文本文件,SequenceFiles、任何其他Hadoop imputeformat。
文本文件RDDs可以使用SparkContext的textFile方法創建,在這個方法里傳入文件的URI(本地路徑,HDFS……),並且把它作為一個行集合讀入。舉例調用:
JavaRDD<String> distFile = sc.textFile("data.txt");
一旦創建,就可以通過數據集操作去使用distFile。例如,我們可以將所有行的長度相加:
distFile.map(s->s.length()).reduce((a,b)->a+b)
注意,spark讀文件時:
1.如果使用本地文件系統的路徑,文件必須能夠被worker節點訪問,要么復制文件到所有worker節點,要么使用網絡方式共享文件系統。
2.所有基於文件的方法,包括textFile,支持文件目錄,壓縮文件、通配符。例如:你可以使用
textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")
3.textFile第二參數可以控制分區數,默認的spark為每一個文件塊創建一個分區(HDFS文件塊大小默認為128MB)。你也可以設置更大數字的分區數,但是不能比文件塊的數目小。
除了文本文件,spark JAVA API還支持其他數據格式:
1.JavaSparkContext.wholeTextFiles讓你讀取一個目錄里多個小的文本文件,並且返回他們的每一個(名字,內容)對。而textFile記錄的是文本文件的每一行。
2.對於 SequenceFiles,可以使用 SparkContext 的 sequenceFile[K, V] 方法創建,K 和 V 分別對應的是 key 和 values 的類型。像 IntWritable 與 Text 一樣,它們必須是 Hadoop 的 Writable 接口的子類。另外,對於幾種通用的 Writables,Spark 允許你指定原聲類型來替代。例如: sequenceFile[Int, String] 將會自動讀取 IntWritables 和 Text。
3.對於其他的 Hadoop InputFormats,你可以使用 JavaSparkContext.hadoopRDD 方法,它可以指定任意的 JobConf,輸入格式(InputFormat),key 類型,values 類型。你可以跟設置 Hadoop job 一樣的方法設置輸入源。你還可以在新的 MapReduce 接口(org.apache.hadoop.mapreduce)基礎上使用JavaSparkContext.newAPIHadoopRDD
4.JavaRDD.saveAsObjectFile 和 JavaSparkContext.objectFile支持保存一個RDD,保存格式是一個簡單的 Java 對象序列化格式。這是一種效率不高的專有格式,如 Avro,它提供了簡單的方法來保存任何一個 RDD。
RDD操作
RDDs 支持 2 種類型的操作:轉換(transformations) 從已經存在的數據集中創建一個新的數據集;動作(actions) 在數據集上進行計算之后返回一個值到驅動程序。例如,map 是一個轉換操作,它將每一個數據集元素傳遞給一個函數並且返回一個新的 RDD。另一方面,reduce 是一個動作,它使用相同的函數來聚合 RDD 的所有元素,並且將最終的結果返回到驅動程序(不過也有一個並行 reduceByKey 能返回一個分布式數據集)。
在 Spark 中,所有的轉換(transformations)都是惰性(lazy)的,它們不會馬上計算它們的結果。相反的,它們僅僅記錄轉換操作是應用到哪些基礎數據集(例如一個文件)上的。轉換僅僅在這個時候計算:當動作(action) 需要一個結果返回給驅動程序的時候。這個設計能夠讓 Spark 運行得更加高效。例如,我們可以實現:通過 map 創建一個新數據集在 reduce 中使用,並且僅僅返回 reduce 的結果給 driver,而不是整個大的映射過的數據集。
默認情況下,每一個轉換過的 RDD 會在每次執行動作(action)的時候重新計算一次。然而,你也可以使用 persist (或 cache)方法持久化(persist)一個 RDD 到內存中。在這個情況下,Spark 會在集群上保存相關的元素,在你下次查詢的時候會變得更快。在這里也同樣支持持久化 RDD 到磁盤,或在多個節點間復制。
基礎:
為了說明RDD基礎知識,思考下面的程序;
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
第一行是定義來自於外部文件的 RDD。這個數據集並沒有加載到內存或做其他的操作:lines 僅僅是一個指向文件的指針。第二行是定義 lineLengths,它是 map 轉換(transformation)的結果。同樣,lineLengths 由於懶惰模式也沒有立即計算。最后,我們執行 reduce,它是一個動作(action)。在這個地方,Spark 把計算分成多個任務(task),並且讓它們運行在多個機器上。每台機器都運行自己的 map 部分和本地 reduce 部分。然后僅僅將結果返回給驅動程序。
如果我們想要再次使用 lineLengths,我們可以添加:
lineLengths.persist(StorageLevel.MEMORY_ONLY());
在 reduce 之前,它會導致 lineLengths 在第一次計算完成之后保存到內存中。
傳遞函數給 Spark
Spark的API很大程度上依賴於驅動程序中的傳遞函數來在集群上運行。在Java中,函數由實現org.apache.spark.api.java.function包中的接口的類來表示 。創建這樣的功能有兩種方法:
- 在您自己的類中實現Function接口,作為匿名內部類或命名的內部類,並將其實例傳遞給Spark。
- 使用lambda表達式 來簡潔地定義一個實現。
雖然本指南的大部分使用lambda語法來簡潔,但是可以輕松地使用long-form中的所有相同的API。例如,我們可以寫代碼如下:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
請注意,Java中的匿名內部類也可以訪問封閉范圍中的變量,只要它們被標記final。Spark將會將這些變量的副本以與其他語言一樣的方式運送到每個工作節點。
理解閉包
在集群中執行代碼時,一個關於 Spark 更難的事情是理解的變量和方法的范圍和生命周期。
修改其范圍之外的變量 RDD 操作可以混淆的常見原因。在下面的例子中,我們將看一下使用的 foreach() 代碼遞增累加計數器,但類似的問題,也可能會出現其他操作上。
示例
考慮一個簡單的 RDD 元素求和,以下行為可能不同,具體取決於是否在同一個 JVM 中執行。
一個常見的例子是當 Spark 運行在本地模式(--master = local[n])時,與部署 Spark 應用到群集(例如,通過 spark-submit 到 YARN):
